Skip to content

Use Antichain for MutableAntichain::frontier. #485

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ impl<T: Timestamp> CapabilitySet<T> {
/// let mut cap = CapabilitySet::from_elem(default_cap);
/// let mut vector = Vec::new();
/// move |input, output| {
/// cap.downgrade(&input.frontier().frontier());
/// cap.downgrade(input.frontier().frontier());
/// while let Some((time, data)) = input.next() {
/// data.swap(&mut vector);
/// }
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/generic/notificator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::progress::frontier::{AntichainRef, MutableAntichain};
use crate::progress::frontier::{Antichain, MutableAntichain};
use crate::progress::Timestamp;
use crate::dataflow::operators::Capability;
use crate::logging::TimelyLogger as Logger;
Expand Down Expand Up @@ -40,7 +40,7 @@ impl<'a, T: Timestamp> Notificator<'a, T> {
}

/// Reveals the elements in the frontier of the indicated input.
pub fn frontier(&self, input: usize) -> AntichainRef<T> {
pub fn frontier(&self, input: usize) -> &Antichain<T> {
self.frontiers[input].frontier()
}

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl<G: Scope, C: Container> InspectCore<G, C> for StreamCore<G, C> {
let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum());
let mut vector = Default::default();
self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |input, output| {
if input.frontier.frontier() != frontier.borrow() {
if input.frontier.frontier() != &frontier {
frontier.clear();
frontier.extend(input.frontier.frontier().iter().cloned());
func(Err(frontier.elements()));
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl<T: Timestamp> Handle<T> {
/// ```
#[inline]
pub fn with_frontier<R, F: FnMut(AntichainRef<T>)->R>(&self, mut function: F) -> R {
function(self.frontier.borrow().frontier())
function(self.frontier.borrow().frontier().borrow())
}
}

Expand Down
32 changes: 19 additions & 13 deletions timely/src/progress/frontier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,14 @@ impl<T> ::std::iter::IntoIterator for Antichain<T> {
}
}

impl<'a, T> ::std::iter::IntoIterator for &'a Antichain<T> {
type Item = &'a T;
type IntoIter = ::std::slice::Iter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
self.elements.iter()
}
}

/// An antichain based on a multiset whose elements frequencies can be updated.
///
/// The `MutableAntichain` maintains frequencies for many elements of type `T`, and exposes the set
Expand All @@ -316,7 +324,7 @@ impl<T> ::std::iter::IntoIterator for Antichain<T> {
pub struct MutableAntichain<T> {
dirty: usize,
updates: Vec<(T, i64)>,
frontier: Vec<T>,
frontier: Antichain<T>,
changes: ChangeBatch<T>,
}

Expand All @@ -336,7 +344,7 @@ impl<T> MutableAntichain<T> {
MutableAntichain {
dirty: 0,
updates: Vec::new(),
frontier: Vec::new(),
frontier: Antichain::new(),
changes: ChangeBatch::new(),
}
}
Expand Down Expand Up @@ -380,20 +388,20 @@ impl<T> MutableAntichain<T> {
/// assert!(frontier.frontier().len() == 0);
///```
#[inline]
pub fn frontier(&self) -> AntichainRef<'_, T> {
pub fn frontier(&self) -> &Antichain<T> {
debug_assert_eq!(self.dirty, 0);
AntichainRef::new(&self.frontier)
&self.frontier
}

/// Creates a new singleton `MutableAntichain`.
///
/// # Examples
///
///```
/// use timely::progress::frontier::{AntichainRef, MutableAntichain};
/// use timely::progress::frontier::{Antichain, MutableAntichain};
///
/// let mut frontier = MutableAntichain::new_bottom(0u64);
/// assert!(frontier.frontier() == AntichainRef::new(&[0u64]));
/// assert!(frontier.frontier() == &Antichain::from_elem(0u64));
///```
#[inline]
pub fn new_bottom(bottom: T) -> MutableAntichain<T>
Expand All @@ -403,7 +411,7 @@ impl<T> MutableAntichain<T> {
MutableAntichain {
dirty: 0,
updates: vec![(bottom.clone(), 1)],
frontier: vec![bottom],
frontier: Antichain::from_elem(bottom),
changes: ChangeBatch::new(),
}
}
Expand Down Expand Up @@ -483,15 +491,15 @@ impl<T> MutableAntichain<T> {
/// # Examples
///
///```
/// use timely::progress::frontier::{AntichainRef, MutableAntichain};
/// use timely::progress::frontier::{Antichain, MutableAntichain};
///
/// let mut frontier = MutableAntichain::new_bottom(1u64);
/// let changes =
/// frontier
/// .update_iter(vec![(1, -1), (2, 7)])
/// .collect::<Vec<_>>();
///
/// assert!(frontier.frontier() == AntichainRef::new(&[2]));
/// assert!(frontier.frontier() == &Antichain::from_elem(2));
/// assert!(changes == vec![(1, -1), (2, 1)]);
///```
#[inline]
Expand Down Expand Up @@ -557,16 +565,14 @@ impl<T> MutableAntichain<T> {
self.updates.retain(|x| x.1 != 0);
}

for time in self.frontier.drain(..) {
for time in self.frontier.elements.drain(..) {
self.changes.update(time, -1);
}

// build new frontier using strictly positive times.
// as the times are sorted, we don't need to worry that we might displace frontier elements.
for time in self.updates.iter().filter(|x| x.1 > 0) {
if !self.frontier.iter().any(|f| f.less_equal(&time.0)) {
self.frontier.push(time.0.clone());
}
self.frontier.insert(time.0.clone());
}

for time in self.frontier.iter() {
Expand Down