diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index bae14c7b3..d86fb5089 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::TraceReader; /// Reports a number of extensions to a stream of prefixes. /// @@ -23,8 +23,6 @@ where G::Timestamp: Lattice, Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Default, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, R: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::Key+Clone+'static, P: ExchangeData, diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 8f6f4d0cf..338a994aa 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -42,7 +42,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; use differential_dataflow::difference::{Monoid, Semigroup}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::{Cursor, TraceReader}; use differential_dataflow::consolidation::{consolidate, consolidate_updates}; /// A binary equijoin that responds to updates on only its first input. @@ -81,8 +81,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+ExchangeData, Tr::Val: Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, Tr::R: Monoid+ExchangeData, FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, @@ -137,8 +135,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+ExchangeData, Tr::Val: Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, Tr::R: Monoid+ExchangeData, FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index 671e24921..029e1f942 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -10,7 +10,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; use differential_dataflow::difference::{Semigroup, Monoid}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::{Cursor, TraceReader}; /// Proposes extensions to a stream of prefixes. /// @@ -32,8 +32,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable, Tr::Val: Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, Tr::R: Monoid+ExchangeData, F: FnMut(&D, &mut Tr::Key)+Clone+'static, D: ExchangeData, diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index cf7e793c8..2a5e6747a 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::TraceReader; /// Proposes extensions to a prefix stream. /// @@ -25,8 +25,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Default, Tr::Val: Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, Tr::R: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::Key+Clone+'static, P: ExchangeData, @@ -58,8 +56,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Default, Tr::Val: Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, Tr::R: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::Key+Clone+'static, P: ExchangeData, diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index 7152f1876..41efa8df2 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -6,7 +6,7 @@ use differential_dataflow::{ExchangeData, Collection}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::TraceReader; /// Proposes extensions to a stream of prefixes. /// @@ -24,8 +24,6 @@ where Tr: TraceReader+Clone+'static, K: Ord+Hash+Clone+Default, V: ExchangeData+Hash+Default, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, Tr::R: Monoid+Multiply+ExchangeData, F: Fn(&P)->K+Clone+'static, P: ExchangeData, diff --git a/src/algorithms/graphs/bfs.rs b/src/algorithms/graphs/bfs.rs index 2931922a8..d80ad3c75 100644 --- a/src/algorithms/graphs/bfs.rs +++ b/src/algorithms/graphs/bfs.rs @@ -30,8 +30,6 @@ where G::Timestamp: Lattice+Ord, N: ExchangeData+Hash, Tr: TraceReader+Clone+'static, - Tr::Batch: crate::trace::BatchReader+'static, - Tr::Cursor: crate::trace::Cursor+'static, { // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); @@ -46,4 +44,4 @@ where .concat(&nodes) .reduce(|_, s, t| t.push((s[0].0.clone(), 1))) }) -} \ No newline at end of file +} diff --git a/src/algorithms/graphs/bijkstra.rs b/src/algorithms/graphs/bijkstra.rs index 91e02dbde..eb803f160 100644 --- a/src/algorithms/graphs/bijkstra.rs +++ b/src/algorithms/graphs/bijkstra.rs @@ -46,8 +46,6 @@ where G::Timestamp: Lattice+Ord, N: ExchangeData+Hash, Tr: TraceReader+Clone+'static, - Tr::Batch: crate::trace::BatchReader+'static, - Tr::Cursor: crate::trace::Cursor+'static, { forward .stream diff --git a/src/algorithms/graphs/propagate.rs b/src/algorithms/graphs/propagate.rs index c94e49814..62bd92283 100644 --- a/src/algorithms/graphs/propagate.rs +++ b/src/algorithms/graphs/propagate.rs @@ -65,8 +65,6 @@ where R: From, L: ExchangeData, Tr: TraceReader+Clone+'static, - Tr::Batch: crate::trace::BatchReader+'static, - Tr::Cursor: crate::trace::Cursor+'static, F: Fn(&L)->u64+Clone+'static, { // Morally the code performs the following iterative computation. However, in the interest of a simplified diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 09a48e248..1978a84b3 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -65,8 +65,6 @@ impl Clone for Arranged where G::Timestamp: Lattice+Ord, Tr: TraceReader + Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, { fn clone(&self) -> Self { Arranged { @@ -83,8 +81,6 @@ impl Arranged where G::Timestamp: Lattice+Ord, Tr: TraceReader + Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, { /// Brings an arranged collection into a nested scope. /// @@ -425,8 +421,6 @@ impl<'a, G: Scope, Tr> Arranged, Tr> where G::Timestamp: Lattice+Ord, Tr: TraceReader + Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, { /// Brings an arranged collection out of a nested region. /// @@ -463,7 +457,6 @@ where R: ExchangeData, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - Tr::Cursor: Cursor, { self.arrange_named("Arrange") } @@ -480,7 +473,6 @@ where R: ExchangeData, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - Tr::Cursor: Cursor, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); self.arrange_core(exchange, name) @@ -496,7 +488,6 @@ where P: ParallelizationContract, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - Tr::Cursor: Cursor, ; } @@ -513,7 +504,6 @@ where P: ParallelizationContract, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - Tr::Cursor: Cursor, { // The `Arrange` operator is tasked with reacting to an advancing input // frontier by producing the sequence of batches whose lower and upper @@ -685,7 +675,6 @@ where P: ParallelizationContract, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - Tr::Cursor: Cursor, { self.map(|k| (k, ())) .arrange_core(pact, name) diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 978d9596f..76079ae11 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -146,7 +146,6 @@ where Tr::Val: ExchangeData, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - Tr::Cursor: Cursor, { let mut reader: Option> = None; diff --git a/src/operators/count.rs b/src/operators/count.rs index f119d648e..47864622b 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -74,8 +74,6 @@ where T1: TraceReader+Clone+'static, T1::Key: ExchangeData, T1::R: ExchangeData+Semigroup, - T1::Batch: BatchReader, - T1::Cursor: Cursor, { fn count_total_core>(&self) -> Collection { diff --git a/src/operators/join.rs b/src/operators/join.rs index ea72b32f6..482cc3836 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -190,8 +190,6 @@ where Tr::Key: Data+Hashable, Tr::Val: Data, Tr::R: Semigroup, - Tr::Batch: BatchReader+'static, - Tr::Cursor: Cursor+'static, { fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> where Tr::Key: ExchangeData, Tr::R: Multiply, >::Output: Semigroup, L: FnMut(&Tr::Key, &Tr::Val, &V2)->D+'static { @@ -258,8 +256,6 @@ pub trait JoinCore where G::Time fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, - Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, R: Multiply, @@ -311,8 +307,6 @@ pub trait JoinCore where G::Time fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, - Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, D: Data, @@ -334,8 +328,6 @@ where fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, - Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, R: Multiply, @@ -351,8 +343,6 @@ where fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, - Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, R: Semigroup, @@ -374,15 +364,11 @@ impl JoinCore for Arranged T1::Key: Ord+Debug+'static, T1::Val: Ord+Clone+Debug+'static, T1::R: Semigroup, - T1::Batch: BatchReader+'static, - T1::Cursor: Cursor+'static, { fn join_core(&self, other: &Arranged, mut result: L) -> Collection>::Output> where Tr2::Val: Ord+Clone+Debug+'static, Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, - Tr2::Cursor: Cursor+'static, Tr2::R: Semigroup, T1::R: Multiply, >::Output: Semigroup, @@ -401,8 +387,6 @@ impl JoinCore for Arranged fn join_core_internal_unsafe (&self, other: &Arranged, mut result: L) -> Collection where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, - Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, D: Data, diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index f8d9edeb1..9d56813c9 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -90,8 +90,6 @@ impl Reduce for Arrang where G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, - T1::Batch: BatchReader, - T1::Cursor: Cursor, { fn reduce_named(&self, name: &str, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { @@ -179,8 +177,6 @@ impl Threshold for Arranged+Clone+'static, - T1::Batch: BatchReader, - T1::Cursor: Cursor, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { self.reduce_abelian::<_,DefaultKeyTrace<_,_,_>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) @@ -236,8 +232,6 @@ impl Count for Arranged where G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, - T1::Batch: BatchReader, - T1::Cursor: Cursor, { fn count_core>(&self) -> Collection { self.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) @@ -283,7 +277,6 @@ pub trait ReduceCore where G::Timestam T2::Val: Data, T2::R: Abelian, T2::Batch: Batch, - T2::Cursor: Cursor, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static, { self.reduce_core::<_,T2>(name, move |key, input, output, change| { @@ -306,7 +299,6 @@ pub trait ReduceCore where G::Timestam T2::Val: Data, T2::R: Semigroup, T2::Batch: Batch, - T2::Cursor: Cursor, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static ; } @@ -325,7 +317,6 @@ where T2::R: Semigroup, T2: Trace+TraceReader+'static, T2::Batch: Batch, - T2::Cursor: Cursor, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -337,8 +328,6 @@ impl ReduceCore for Ar where G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, - T1::Batch: BatchReader, - T1::Cursor: Cursor, { fn reduce_core(&self, name: &str, mut logic: L) -> Arranged> where @@ -346,7 +335,6 @@ where T2::Val: Data, T2::R: Semigroup, T2::Batch: Batch, - T2::Cursor: Cursor, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { let mut result_trace = None; diff --git a/src/operators/threshold.rs b/src/operators/threshold.rs index 427093a04..cbe1b8d8f 100644 --- a/src/operators/threshold.rs +++ b/src/operators/threshold.rs @@ -108,8 +108,6 @@ where T1: TraceReader+Clone+'static, T1::Key: ExchangeData, T1::R: ExchangeData+Semigroup, - T1::Batch: BatchReader, - T1::Cursor: Cursor, { fn threshold_semigroup(&self, mut thresh: F) -> Collection where diff --git a/src/trace/wrappers/freeze.rs b/src/trace/wrappers/freeze.rs index fa24b91d7..6b5d270ab 100644 --- a/src/trace/wrappers/freeze.rs +++ b/src/trace/wrappers/freeze.rs @@ -41,8 +41,6 @@ where T::Key: 'static, T::Val: 'static, T::R: 'static, - T::Batch: BatchReader, - T::Cursor: Cursor, F: Fn(&G::Timestamp)->Option+'static, { let func1 = Rc::new(func);