diff --git a/examples/monoid-bfs.rs b/examples/monoid-bfs.rs index 9b1b367d2..59eddc608 100644 --- a/examples/monoid-bfs.rs +++ b/examples/monoid-bfs.rs @@ -131,7 +131,7 @@ where G::Timestamp: Lattice+Ord { use differential_dataflow::operators::iterate::SemigroupVariable; use differential_dataflow::operators::reduce::ReduceCore; - use differential_dataflow::trace::implementations::KeySpine; + use differential_dataflow::trace::implementations::{KeySpine, KeyBuilder}; use timely::order::Product; @@ -146,7 +146,7 @@ where G::Timestamp: Lattice+Ord { .join_map(&edges, |_k,&(),d| *d) .concat(&roots) .map(|x| (x,())) - .reduce_core::<_,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| { + .reduce_core::<_,KeyBuilder<_,_,_>,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| { if output.is_empty() || input[0].1 < output[0].1 { updates.push(((), input[0].1)); } diff --git a/examples/spines.rs b/examples/spines.rs index e4cd50bea..bc3e4ce32 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -28,46 +28,46 @@ fn main() { match mode.as_str() { "new" => { - use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeySpine}; - let data = data.arrange::, ColKeySpine<_,_,_>>(); - let keys = keys.arrange::, ColKeySpine<_,_,_>>(); + use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeyBuilder, ColKeySpine}; + let data = data.arrange::, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); + let keys = keys.arrange::, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "old" => { - use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, OrdKeySpine}; - let data = data.arrange::, OrdKeySpine<_,_,_>>(); - let keys = keys.arrange::, OrdKeySpine<_,_,_>>(); + use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine}; + let data = data.arrange::, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); + let keys = keys.arrange::, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "rhh" => { - use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecSpine}; - let data = data.map(|x| HashWrapper { inner: x }).arrange::, VecSpine<_,(),_,_>>(); - let keys = keys.map(|x| HashWrapper { inner: x }).arrange::, VecSpine<_,(),_,_>>(); + use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine}; + let data = data.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); + let keys = keys.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "slc" => { - use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredSpine}; + use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredBuilder, PreferredSpine}; let data = data.map(|x| (x.clone().into_bytes(), x.into_bytes())) - .arrange::, PreferredSpine<[u8],[u8],_,_>>() - .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); + .arrange::, PreferredBuilder<[u8],[u8],_,_>, PreferredSpine<[u8],[u8],_,_>>() + .reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); let keys = keys.map(|x| (x.clone().into_bytes(), 7)) - .arrange::, PreferredSpine<[u8],u8,_,_>>() - .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); + .arrange::, PreferredBuilder<[u8],u8,_,_>, PreferredSpine<[u8],u8,_,_>>() + .reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>,PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "flat" => { - use differential_dataflow::trace::implementations::ord_neu::{FlatKeyBatcherDefault, FlatKeySpineDefault}; - let data = data.arrange::, FlatKeySpineDefault>(); - let keys = keys.arrange::, FlatKeySpineDefault>(); + use differential_dataflow::trace::implementations::ord_neu::{FlatKeyBatcherDefault, FlatKeyBuilderDefault, FlatKeySpineDefault}; + let data = data.arrange::, FlatKeyBuilderDefault, FlatKeySpineDefault>(); + let keys = keys.arrange::, FlatKeyBuilderDefault, FlatKeySpineDefault>(); keys.join_core(&data, |_k, (), ()| Option::<()>::None) .probe_with(&mut probe); } diff --git a/experiments/src/bin/deals.rs b/experiments/src/bin/deals.rs index ab7871ccb..f9776d17a 100644 --- a/experiments/src/bin/deals.rs +++ b/experiments/src/bin/deals.rs @@ -6,7 +6,7 @@ use differential_dataflow::input::Input; use differential_dataflow::Collection; use differential_dataflow::operators::*; -use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, ValBatcher}; +use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, KeyBuilder, ValBatcher, ValBuilder}; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::Arrange; @@ -41,7 +41,7 @@ fn main() { let (input, graph) = scope.new_collection(); // each edge should exist in both directions. - let graph = graph.arrange::, ValSpine<_,_,_,_>>(); + let graph = graph.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); match program.as_str() { "tc" => tc(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("tc count: {:?}", x)).probe(), @@ -94,10 +94,10 @@ fn tc>(edges: &EdgeArranged) -> C let result = inner .map(|(x,y)| (y,x)) - .arrange::, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&edges, |_y,&x,&z| Some((x, z))) .concat(&edges.as_collection(|&k,&v| (k,v))) - .arrange::, KeySpine<_,_,_>>() + .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -121,12 +121,12 @@ fn sg>(edges: &EdgeArranged) -> C let result = inner - .arrange::, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&edges, |_,&x,&z| Some((x, z))) - .arrange::, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&edges, |_,&x,&z| Some((x, z))) .concat(&peers) - .arrange::, KeySpine<_,_,_>>() + .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; diff --git a/experiments/src/bin/graspan1.rs b/experiments/src/bin/graspan1.rs index 41266c4b4..e93bb5381 100644 --- a/experiments/src/bin/graspan1.rs +++ b/experiments/src/bin/graspan1.rs @@ -6,7 +6,7 @@ use timely::order::Product; use differential_dataflow::difference::Present; use differential_dataflow::input::Input; -use differential_dataflow::trace::implementations::{ValBatcher, ValSpine}; +use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine}; use differential_dataflow::operators::*; use differential_dataflow::operators::arrange::Arrange; use differential_dataflow::operators::iterate::SemigroupVariable; @@ -31,7 +31,7 @@ fn main() { let (n_handle, nodes) = scope.new_collection(); let (e_handle, edges) = scope.new_collection(); - let edges = edges.arrange::, ValSpine<_,_,_,_>>(); + let edges = edges.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // a N c <- a N b && b E c // N(a,c) <- N(a,b), E(b, c) @@ -46,7 +46,7 @@ fn main() { let next = labels.join_core(&edges, |_b, a, c| Some((*c, *a))) .concat(&nodes) - .arrange::, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() // .distinct_total_core::(); .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }); diff --git a/experiments/src/bin/graspan2.rs b/experiments/src/bin/graspan2.rs index cde55b71c..2aa08dd00 100644 --- a/experiments/src/bin/graspan2.rs +++ b/experiments/src/bin/graspan2.rs @@ -10,7 +10,7 @@ use differential_dataflow::Collection; use differential_dataflow::input::Input; use differential_dataflow::operators::*; use differential_dataflow::operators::arrange::Arrange; -use differential_dataflow::trace::implementations::{ValSpine, KeySpine, ValBatcher, KeyBatcher}; +use differential_dataflow::trace::implementations::{ValSpine, KeySpine, ValBatcher, KeyBatcher, ValBuilder, KeyBuilder}; use differential_dataflow::difference::Present; type Node = u32; @@ -47,7 +47,7 @@ fn unoptimized() { .flat_map(|(a,b)| vec![a,b]) .concat(&dereference.flat_map(|(a,b)| vec![a,b])); - let dereference = dereference.arrange::, ValSpine<_,_,_,_>>(); + let dereference = dereference.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); let (value_flow, memory_alias, value_alias) = scope @@ -60,14 +60,14 @@ fn unoptimized() { let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let value_flow_arranged = value_flow.arrange::, ValSpine<_,_,_,_>>(); - let memory_alias_arranged = memory_alias.arrange::, ValSpine<_,_,_,_>>(); + let value_flow_arranged = value_flow.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let memory_alias_arranged = memory_alias.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // VA(a,b) <- VF(x,a),VF(x,b) // VA(a,b) <- VF(x,a),MA(x,y),VF(y,b) let value_alias_next = value_flow_arranged.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))); let value_alias_next = value_flow_arranged.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) - .arrange::, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&value_alias_next); @@ -77,16 +77,16 @@ fn unoptimized() { let value_flow_next = assignment .map(|(a,b)| (b,a)) - .arrange::, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) .concat(&assignment.map(|(a,b)| (b,a))) - .arrange::, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&nodes.map(|n| (n,n))); let value_flow_next = value_flow_next - .arrange::, KeySpine<_,_,_>>() + .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -95,12 +95,12 @@ fn unoptimized() { let memory_alias_next: Collection<_,_,Present> = value_alias_next .join_core(&dereference, |_x,&y,&a| Some((y,a))) - .arrange::, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&dereference, |_y,&a,&b| Some((a,b))); let memory_alias_next: Collection<_,_,Present> = memory_alias_next - .arrange::, KeySpine<_,_,_>>() + .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -172,7 +172,7 @@ fn optimized() { .flat_map(|(a,b)| vec![a,b]) .concat(&dereference.flat_map(|(a,b)| vec![a,b])); - let dereference = dereference.arrange::, ValSpine<_,_,_,_>>(); + let dereference = dereference.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); let (value_flow, memory_alias) = scope @@ -185,8 +185,8 @@ fn optimized() { let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let value_flow_arranged = value_flow.arrange::, ValSpine<_,_,_,_>>(); - let memory_alias_arranged = memory_alias.arrange::, ValSpine<_,_,_,_>>(); + let value_flow_arranged = value_flow.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let memory_alias_arranged = memory_alias.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // VF(a,a) <- // VF(a,b) <- A(a,x),VF(x,b) @@ -194,13 +194,13 @@ fn optimized() { let value_flow_next = assignment .map(|(a,b)| (b,a)) - .arrange::, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) .concat(&assignment.map(|(a,b)| (b,a))) - .arrange::, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&nodes.map(|n| (n,n))) - .arrange::, KeySpine<_,_,_>>() + .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -209,9 +209,9 @@ fn optimized() { let value_flow_deref = value_flow .map(|(a,b)| (b,a)) - .arrange::, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&dereference, |_x,&a,&b| Some((a,b))) - .arrange::, ValSpine<_,_,_,_>>(); + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // MA(a,b) <- VFD(x,a),VFD(y,b) // MA(a,b) <- VFD(x,a),MA(x,y),VFD(y,b) @@ -222,10 +222,10 @@ fn optimized() { let memory_alias_next = memory_alias_arranged .join_core(&value_flow_deref, |_x,&y,&a| Some((y,a))) - .arrange::, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_deref, |_y,&a,&b| Some((a,b))) .concat(&memory_alias_next) - .arrange::, KeySpine<_,_,_>>() + .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; diff --git a/interactive/src/plan/mod.rs b/interactive/src/plan/mod.rs index 45a6002cd..faad5e562 100644 --- a/interactive/src/plan/mod.rs +++ b/interactive/src/plan/mod.rs @@ -158,7 +158,7 @@ impl Render for Plan { Plan::Distinct(distinct) => { use differential_dataflow::operators::arrange::ArrangeBySelf; - use differential_dataflow::trace::implementations::KeySpine; + use differential_dataflow::trace::implementations::{KeyBuilder, KeySpine}; let input = if let Some(mut trace) = arrangements.get_unkeyed(&self) { @@ -170,7 +170,7 @@ impl Render for Plan { input_arrangement }; - let output = input.reduce_abelian::<_,_,_,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1))); + let output = input.reduce_abelian::<_,_,_,KeyBuilder<_,_,_>,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1))); arrangements.set_unkeyed(&self, &output.trace); output.as_collection(|k,&()| k.clone()) diff --git a/src/algorithms/graphs/propagate.rs b/src/algorithms/graphs/propagate.rs index 9aef8e08b..aeb57b922 100644 --- a/src/algorithms/graphs/propagate.rs +++ b/src/algorithms/graphs/propagate.rs @@ -84,7 +84,7 @@ where use crate::operators::reduce::ReduceCore; use crate::operators::iterate::SemigroupVariable; - use crate::trace::implementations::ValSpine; + use crate::trace::implementations::{ValBuilder, ValSpine}; use timely::order::Product; @@ -96,7 +96,7 @@ where let labels = proposals .concat(&nodes) - .reduce_abelian::<_,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8)))); + .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8)))); let propagate: Collection<_, (N, L), R> = labels diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index c0615d8c0..7d10d3dfc 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -30,7 +30,7 @@ use crate::{Data, ExchangeData, Collection, AsCollection, Hashable}; use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor}; -use crate::trace::implementations::{KeyBatcher, KeySpine, ValBatcher, ValSpine}; +use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine}; use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; @@ -289,7 +289,7 @@ where T1: TraceReader + Clone + 'static, { /// A direct implementation of `ReduceCore::reduce_abelian`. - pub fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> + pub fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> where for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, T2: for<'a> Trace= T1::Key<'a>, Time=T1::Time>+'static, @@ -298,10 +298,11 @@ where for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, T2::Diff: Abelian, T2::Batch: Batch, - ::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, + Bu: Builder, + Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, { - self.reduce_core::<_,K,V,T2>(name, move |key, input, output, change| { + self.reduce_core::<_,K,V,Bu,T2>(name, move |key, input, output, change| { if !input.is_empty() { logic(key, input, change); } @@ -311,7 +312,7 @@ where } /// A direct implementation of `ReduceCore::reduce_core`. - pub fn reduce_core(&self, name: &str, logic: L) -> Arranged> + pub fn reduce_core(&self, name: &str, logic: L) -> Arranged> where for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, T2: for<'a> Trace=T1::Key<'a>, Time=T1::Time>+'static, @@ -319,11 +320,12 @@ where V: Data, for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, T2::Batch: Batch, - ::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, + Bu: Builder, + Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; - reduce_trace::<_,_,_,_,V,_>(self, name, logic) + reduce_trace::<_,_,Bu,_,_,V,_>(self, name, logic) } } @@ -353,23 +355,23 @@ where G::Timestamp: Lattice, { /// Arranges updates into a shared trace. - fn arrange(&self) -> Arranged> + fn arrange(&self) -> Arranged> where Ba: Batcher + 'static, + Bu: Builder, Tr: Trace + 'static, Tr::Batch: Batch, - Tr::Builder: Builder, { - self.arrange_named::("Arrange") + self.arrange_named::("Arrange") } /// Arranges updates into a shared trace, with a supplied name. - fn arrange_named(&self, name: &str) -> Arranged> + fn arrange_named(&self, name: &str) -> Arranged> where Ba: Batcher + 'static, + Bu: Builder, Tr: Trace + 'static, Tr::Batch: Batch, - Tr::Builder: Builder, ; } @@ -381,15 +383,15 @@ where V: ExchangeData, R: ExchangeData + Semigroup, { - fn arrange_named(&self, name: &str) -> Arranged> + fn arrange_named(&self, name: &str) -> Arranged> where Ba: Batcher, Time=G::Timestamp> + 'static, + Bu: Builder, Tr: Trace + 'static, Tr::Batch: Batch, - Tr::Builder: Builder, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - arrange_core::<_, _, Ba, _>(&self.inner, exchange, name) + arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name) } } @@ -398,16 +400,16 @@ where /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// It uses the supplied parallelization contract to distribute the data, which does not need to /// be consistently by key (though this is the most common). -pub fn arrange_core(stream: &StreamCore, pact: P, name: &str) -> Arranged> +pub fn arrange_core(stream: &StreamCore, pact: P, name: &str) -> Arranged> where G: Scope, G::Timestamp: Lattice, P: ParallelizationContract, Ba: Batcher + 'static, Ba::Input: Container, + Bu: Builder, Tr: Trace+'static, Tr::Batch: Batch, - Tr::Builder: Builder, { // The `Arrange` operator is tasked with reacting to an advancing input // frontier by producing the sequence of batches whose lower and upper @@ -515,7 +517,7 @@ where } // Extract updates not in advance of `upper`. - let batch = batcher.seal::(upper.clone()); + let batch = batcher.seal::(upper.clone()); writer.insert(batch.clone(), Some(capability.time().clone())); @@ -543,7 +545,7 @@ where } else { // Announce progress updates, even without data. - let _batch = batcher.seal::(input.frontier().frontier().to_owned()); + let _batch = batcher.seal::(input.frontier().frontier().to_owned()); writer.seal(input.frontier().frontier().to_owned()); } @@ -562,15 +564,15 @@ impl Arrange(&self, name: &str) -> Arranged> + fn arrange_named(&self, name: &str) -> Arranged> where Ba: Batcher, Time=G::Timestamp> + 'static, + Bu: Builder, Tr: Trace + 'static, Tr::Batch: Batch, - Tr::Builder: Builder, { let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); - arrange_core::<_,_,Ba,_>(&self.map(|k| (k, ())).inner, exchange, name) + arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name) } } @@ -601,7 +603,7 @@ where } fn arrange_by_key_named(&self, name: &str) -> Arranged>> { - self.arrange_named::, _>(name) + self.arrange_named::,ValBuilder<_,_,_,_>,_>(name) } } @@ -636,6 +638,6 @@ where fn arrange_by_self_named(&self, name: &str) -> Arranged>> { self.map(|k| (k, ())) - .arrange_named::, _>(name) + .arrange_named::,KeyBuilder<_,_,_>,_>(name) } } diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 867bc055c..8e0ee89a3 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -55,11 +55,11 @@ //! worker.dataflow(|scope| { //! //! use timely::dataflow::operators::Input; -//! use differential_dataflow::trace::implementations::ValSpine; +//! use differential_dataflow::trace::implementations::{ValBuilder, ValSpine}; //! use differential_dataflow::operators::arrange::upsert; //! //! let stream = scope.input_from(&mut input); -//! let arranged = upsert::arrange_from_upsert::<_, _, _, ValSpine>(&stream, &"test"); +//! let arranged = upsert::arrange_from_upsert::<_, _, _, ValBuilder, ValSpine>(&stream, &"test"); //! //! arranged //! .as_collection(|k,v| (k.clone(), v.clone())) @@ -126,7 +126,7 @@ use super::TraceAgent; /// This method is only implemented for totally ordered times, as we do not yet /// understand what a "sequence" of upserts would mean for partially ordered /// timestamps. -pub fn arrange_from_upsert( +pub fn arrange_from_upsert( stream: &Stream, G::Timestamp)>, name: &str, ) -> Arranged> @@ -139,7 +139,7 @@ where for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>, Tr::Time: TotalOrder+ExchangeData, Tr::Batch: Batch, - Tr::Builder: Builder>, + Bu: Builder, Output = Tr::Batch>, { let mut reader: Option> = None; @@ -240,7 +240,7 @@ where // Prepare a cursor to the existing arrangement, and a batch builder for // new stuff that we add. let (mut trace_cursor, trace_storage) = reader_local.cursor(); - let mut builder = Tr::Builder::new(); + let mut builder = Bu::new(); for (key, mut list) in to_process.drain(..) { // The prior value associated with the key. diff --git a/src/operators/arrange/writer.rs b/src/operators/arrange/writer.rs index acc049d30..5a2f72003 100644 --- a/src/operators/arrange/writer.rs +++ b/src/operators/arrange/writer.rs @@ -6,7 +6,7 @@ use std::rc::{Rc, Weak}; use std::cell::RefCell; -use timely::progress::{Antichain, Timestamp}; +use timely::progress::Antichain; use crate::trace::{Trace, Batch, BatchReader}; use crate::trace::wrappers::rc::TraceBox; @@ -93,10 +93,7 @@ where /// Inserts an empty batch up to `upper`. pub fn seal(&mut self, upper: Antichain) { if self.upper != upper { - use crate::trace::Builder; - let builder = Tr::Builder::new(); - let batch = builder.done(self.upper.clone(), upper, Antichain::from_elem(Tr::Time::minimum())); - self.insert(batch, None); + self.insert(Tr::Batch::empty(self.upper.clone(), upper), None); } } } diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index e3a551246..e4b718123 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -47,22 +47,22 @@ where /// }); /// ``` pub fn consolidate(&self) -> Self { - use crate::trace::implementations::{KeyBatcher, KeySpine}; - self.consolidate_named::, KeySpine<_,_,_>>("Consolidate") + use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine}; + self.consolidate_named::,KeyBuilder<_,_,_>, KeySpine<_,_,_>>("Consolidate") } /// As `consolidate` but with the ability to name the operator and specify the trace type. - pub fn consolidate_named(&self, name: &str) -> Self + pub fn consolidate_named(&self, name: &str) -> Self where Ba: Batcher, Time=G::Timestamp> + 'static, Tr: crate::trace::Trace+'static, for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = D>, Tr::Batch: crate::trace::Batch, - Tr::Builder: Builder, + Bu: Builder, { use crate::operators::arrange::arrangement::Arrange; self.map(|k| (k, ())) - .arrange_named::(name) + .arrange_named::(name) .as_collection(|d, _| d.into_owned()) } diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 2818340ec..0b014c645 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -25,7 +25,7 @@ use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgen use crate::lattice::Lattice; use crate::trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic}; use crate::trace::cursor::CursorList; -use crate::trace::implementations::{KeySpine, ValSpine}; +use crate::trace::implementations::{KeySpine, KeyBuilder, ValSpine, ValBuilder}; use crate::trace::TraceReader; @@ -93,7 +93,7 @@ where { fn reduce_named(&self, name: &str, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { - self.reduce_abelian::<_,K,V2,ValSpine<_,_,_,_>>(name, logic) + self.reduce_abelian::<_,K,V2,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(name, logic) .as_collection(|k,v| (k.clone(), v.clone())) } } @@ -170,7 +170,7 @@ where for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { - self.reduce_abelian::<_,K,(),KeySpine>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) + self.reduce_abelian::<_,K,(),KeyBuilder,KeySpine>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) .as_collection(|k,_| k.clone()) } } @@ -221,7 +221,7 @@ where for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, { fn count_core + 'static>(&self) -> Collection { - self.reduce_abelian::<_,K,R,ValSpine>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) + self.reduce_abelian::<_,K,R,ValBuilder,ValSpine>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) .as_collection(|k,c| (k.clone(), c.clone())) } } @@ -239,30 +239,30 @@ pub trait ReduceCore where /// use differential_dataflow::input::Input; /// use differential_dataflow::operators::reduce::ReduceCore; /// use differential_dataflow::trace::Trace; - /// use differential_dataflow::trace::implementations::ValSpine; + /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine}; /// /// ::timely::example(|scope| { /// /// let trace = /// scope.new_collection_from(1 .. 10u32).1 /// .map(|x| (x, x)) - /// .reduce_abelian::<_,ValSpine<_,_,_,_>>( + /// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>( /// "Example", /// move |_key, src, dst| dst.push((*src[0].0, 1)) /// ) /// .trace; /// }); /// ``` - fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> + fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> where T2: for<'a> Trace= &'a K, Time=G::Timestamp>+'static, for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder>, + Bu: Builder, Output = T2::Batch>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, { - self.reduce_core::<_,T2>(name, move |key, input, output, change| { + self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| { if !input.is_empty() { logic(key, input, change); } @@ -276,12 +276,12 @@ pub trait ReduceCore where /// Unlike `reduce_arranged`, this method may be called with an empty `input`, /// and it may not be safe to index into the first element. /// At least one of the two collections will be non-empty. - fn reduce_core(&self, name: &str, logic: L) -> Arranged> + fn reduce_core(&self, name: &str, logic: L) -> Arranged> where T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, T2::Batch: Batch, - T2::Builder: Builder>, + Bu: Builder, Output = T2::Batch>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, ; } @@ -294,24 +294,24 @@ where V: ExchangeData, R: ExchangeData+Semigroup, { - fn reduce_core(&self, name: &str, logic: L) -> Arranged> + fn reduce_core(&self, name: &str, logic: L) -> Arranged> where V: Data, T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, T2::Batch: Batch, - T2::Builder: Builder>, + Bu: Builder, Output = T2::Batch>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) - .reduce_core(name, logic) + .reduce_core::<_,_,_,Bu,_>(name, logic) } } /// A key-wise reduction of values in an input trace. /// /// This method exists to provide reduce functionality without opinions about qualifying trace types. -pub fn reduce_trace(trace: &Arranged, name: &str, mut logic: L) -> Arranged> +pub fn reduce_trace(trace: &Arranged, name: &str, mut logic: L) -> Arranged> where G: Scope, T1: TraceReader + Clone + 'static, @@ -321,7 +321,8 @@ where V: Data, for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, T2::Batch: Batch, - ::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, + Bu: Builder, + Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { let mut result_trace = None; @@ -457,10 +458,10 @@ where let mut builders = Vec::new(); for cap in capabilities.iter() { buffers.push((cap.time().clone(), Vec::new())); - builders.push(T2::Builder::new()); + builders.push(Bu::new()); } - let mut buffer = ::Input::default(); + let mut buffer = Bu::Input::default(); // cursors for navigating input and output traces. let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index b7414b986..eeabc312a 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -51,8 +51,10 @@ pub mod chunker; // Opinionated takes on default spines. pub use self::ord_neu::OrdValSpine as ValSpine; pub use self::ord_neu::OrdValBatcher as ValBatcher; +pub use self::ord_neu::RcOrdValBuilder as ValBuilder; pub use self::ord_neu::OrdKeySpine as KeySpine; pub use self::ord_neu::OrdKeyBatcher as KeyBatcher; +pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder; use std::borrow::{ToOwned}; use std::convert::TryInto; diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 35c2941dc..bb1485cb1 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -26,82 +26,79 @@ pub use self::val_batch::{OrdValBatch, OrdValBuilder}; pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine< - Rc>>, - RcBuilder, Vec<((K,V),T,R)>>>, ->; +pub type OrdValSpine = Spine>>>; /// A batcher using ordered lists. pub type OrdValBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>, T>; +/// A builder using ordered lists. +pub type RcOrdValBuilder = RcBuilder, Vec<((K,V),T,R)>>>; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; /// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine< - Rc>>, - RcBuilder, TimelyStack<((K,V),T,R)>>>, ->; +pub type ColValSpine = Spine>>>; /// A batcher for columnar storage. pub type ColValBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>, T>; +/// A builder for columnar storage. +pub type ColValBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; /// A trace implementation backed by flatcontainer storage. -pub type FlatValSpine = Spine< - Rc>, - RcBuilder>>, ->; +pub type FlatValSpine = Spine>>; /// A batcher for flatcontainer storage. pub type FlatValBatcher = MergeBatcher>, FlatcontainerMerger, ::TimeOwned>; +/// A builder for flatcontainer storage. +pub type FlatValBuilder = RcBuilder>>; + /// A trace implementation backed by flatcontainer storage, using [`FlatLayout`] as the layout. pub type FlatValSpineDefault = FlatValSpine< FlatLayout<::Region, ::Region, ::Region, ::Region>, - TupleABCRegion::Region, ::Region>, ::Region, ::Region>, >; /// A batcher for flatcontainer storage, using [`FlatLayout`] as the layout. pub type FlatValBatcherDefault = FlatValBatcher::Region, ::Region>, ::Region, ::Region>, C>; +/// A builder for flatcontainer storage, using [`FlatLayout`] as the layout. +pub type FlatValBuilderDefault = FlatValBuilder::Region, ::Region, ::Region, ::Region>, TupleABCRegion::Region, ::Region>, ::Region, ::Region>>; + /// A trace implementation using a spine of ordered lists. -pub type OrdKeySpine = Spine< - Rc>>, - RcBuilder, Vec<((K,()),T,R)>>>, ->; +pub type OrdKeySpine = Spine>>>; /// A batcher for ordered lists. pub type OrdKeyBatcher = MergeBatcher, VecChunker<((K,()),T,R)>, VecMerger<((K, ()), T, R)>, T>; +/// A builder for ordered lists. +pub type RcOrdKeyBuilder = RcBuilder, Vec<((K,()),T,R)>>>; + // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; /// A trace implementation backed by columnar storage. -pub type ColKeySpine = Spine< - Rc>>, - RcBuilder, TimelyStack<((K,()),T,R)>>>, ->; +pub type ColKeySpine = Spine>>>; /// A batcher for columnar storage pub type ColKeyBatcher = MergeBatcher, ColumnationChunker<((K,()),T,R)>, ColumnationMerger<((K,()),T,R)>, T>; +/// A builder for columnar storage +pub type ColKeyBuilder = RcBuilder, TimelyStack<((K,()),T,R)>>>; /// A trace implementation backed by flatcontainer storage. -pub type FlatKeySpine = Spine< - Rc>, - RcBuilder>>, ->; +pub type FlatKeySpine = Spine>>; /// A batcher for flatcontainer storage. pub type FlatKeyBatcher = MergeBatcher>, FlatcontainerMerger, ::TimeOwned>; +/// A builder for flatcontainer storage. +pub type FlatKeyBuilder = RcBuilder>>; /// A trace implementation backed by flatcontainer storage, using [`FlatLayout`] as the layout. pub type FlatKeySpineDefault = FlatKeySpine< FlatLayout<::Region, <() as RegionPreference>::Region, ::Region, ::Region>, - TupleABCRegion::Region, <() as RegionPreference>::Region>, ::Region, ::Region>, >; /// A batcher for flatcontainer storage, using [`FlatLayout`] as the layout. pub type FlatKeyBatcherDefault = FlatValBatcher::Region, <() as RegionPreference>::Region>, ::Region, ::Region>, C>; +/// A builder for flatcontainer storage, using [`FlatLayout`] as the layout. +pub type FlatKeyBuilderDefault = FlatKeyBuilder::Region, <() as RegionPreference>::Region, ::Region, ::Region>, TupleABCRegion::Region, <() as RegionPreference>::Region>, ::Region, ::Region>>; /// A trace implementation backed by columnar storage. -pub type PreferredSpine = Spine< - Rc>>, - RcBuilder, TimelyStack<((::Owned,::Owned),T,R)>>>, ->; +pub type PreferredSpine = Spine>>>; /// A batcher for columnar storage. pub type PreferredBatcher = MergeBatcher::Owned,::Owned),T,R)>, ColumnationChunker<((::Owned,::Owned),T,R)>, ColumnationMerger<((::Owned,::Owned),T,R)>,T>; - +/// A builder for columnar storage. +pub type PreferredBuilder = RcBuilder, TimelyStack<((::Owned,::Owned),T,R)>>>; // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; @@ -220,6 +217,22 @@ mod val_batch { fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { OrdValMerger::new(self, other, compaction_frontier) } + + fn empty(lower: Antichain, upper: Antichain) -> Self { + use timely::progress::Timestamp; + Self { + storage: OrdValStorage { + keys: L::KeyContainer::with_capacity(0), + keys_offs: L::OffsetContainer::with_capacity(0), + vals: L::ValContainer::with_capacity(0), + vals_offs: L::OffsetContainer::with_capacity(0), + times: L::TimeContainer::with_capacity(0), + diffs: L::DiffContainer::with_capacity(0), + }, + description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())), + updates: 0, + } + } } /// State for an in-progress merge. @@ -788,6 +801,20 @@ mod key_batch { fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { OrdKeyMerger::new(self, other, compaction_frontier) } + + fn empty(lower: Antichain, upper: Antichain) -> Self { + use timely::progress::Timestamp; + Self { + storage: OrdKeyStorage { + keys: L::KeyContainer::with_capacity(0), + keys_offs: L::OffsetContainer::with_capacity(0), + times: L::TimeContainer::with_capacity(0), + diffs: L::DiffContainer::with_capacity(0), + }, + description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())), + updates: 0, + } + } } /// State for an in-progress merge. diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index a07b2dc67..866d60678 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -23,23 +23,21 @@ use super::{Update, Layout, Vector, TStack}; use self::val_batch::{RhhValBatch, RhhValBuilder}; /// A trace implementation using a spine of ordered lists. -pub type VecSpine = Spine< - Rc>>, - RcBuilder, Vec<((K,V),T,R)>>>, ->; +pub type VecSpine = Spine>>>; /// A batcher for ordered lists. pub type VecBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>, T>; +/// A builder for ordered lists. +pub type VecBuilder = RcBuilder, Vec<((K,V),T,R)>>>; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; /// A trace implementation backed by columnar storage. -pub type ColSpine = Spine< - Rc>>, - RcBuilder, TimelyStack<((K,V),T,R)>>>, ->; +pub type ColSpine = Spine>>>; /// A batcher for columnar storage. pub type ColBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>, T>; +/// A builder for columnar storage. +pub type ColBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; @@ -318,6 +316,25 @@ mod val_batch { fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { RhhValMerger::new(self, other, compaction_frontier) } + + fn empty(lower: Antichain, upper: Antichain) -> Self { + use timely::progress::Timestamp; + Self { + storage: RhhValStorage { + keys: L::KeyContainer::with_capacity(0), + keys_offs: L::OffsetContainer::with_capacity(0), + vals: L::ValContainer::with_capacity(0), + vals_offs: L::OffsetContainer::with_capacity(0), + times: L::TimeContainer::with_capacity(0), + diffs: L::DiffContainer::with_capacity(0), + key_count: 0, + key_capacity: 0, + divisor: 0, + }, + description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())), + updates: 0, + } + } } /// State for an in-progress merge. diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index deefbaa61..6b1c02d5c 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -70,7 +70,7 @@ use crate::logging::Logger; -use crate::trace::{Batch, Builder, BatchReader, Trace, TraceReader, ExertionLogic}; +use crate::trace::{Batch, BatchReader, Trace, TraceReader, ExertionLogic}; use crate::trace::cursor::CursorList; use crate::trace::Merger; @@ -83,7 +83,7 @@ use ::timely::order::PartialOrder; /// A spine maintains a small number of immutable collections of update tuples, merging the collections when /// two have similar sizes. In this way, it allows the addition of more tuples, which may then be merged with /// other immutable collections. -pub struct Spine { +pub struct Spine { operator: OperatorInfo, logger: Option, logical_frontier: Antichain, // Times after which the trace must accumulate correctly. @@ -97,10 +97,9 @@ pub struct Spine { exert_logic_param: Vec<(usize, usize, usize)>, /// Logic to indicate whether and how many records we should introduce in the absence of actual updates. exert_logic: Option, - phantom: std::marker::PhantomData, } -impl TraceReader for Spine +impl TraceReader for Spine where B: Batch+Clone+'static, { @@ -246,14 +245,10 @@ where // A trace implementation for any key type that can be borrowed from or converted into `Key`. // TODO: Almost all this implementation seems to be generic with respect to the trace and batch types. -impl Trace for Spine +impl Trace for Spine where B: Batch+Clone+'static, - BU: Builder, { - /// A type used to assemble batches from ordered update sequences. - type Builder = BU; - fn new( info: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option, @@ -316,22 +311,20 @@ where /// Completes the trace with a final empty batch. fn close(&mut self) { if !self.upper.borrow().is_empty() { - let builder = Self::Builder::new(); - let batch = builder.done(self.upper.clone(), Antichain::new(), Antichain::from_elem(::minimum())); - self.insert(batch); + self.insert(B::empty(self.upper.clone(), Antichain::new())); } } } // Drop implementation allows us to log batch drops, to zero out maintained totals. -impl Drop for Spine { +impl Drop for Spine { fn drop(&mut self) { self.drop_batches(); } } -impl Spine { +impl Spine { /// Drops and logs batches. Used in `set_logical_compaction` and drop. fn drop_batches(&mut self) { if let Some(logger) = &self.logger { @@ -372,7 +365,7 @@ impl Spine { } } -impl Spine { +impl Spine { /// Determine the amount of effort we should exert in the absence of updates. /// /// This method prepares an iterator over batches, including the level, count, and length of each layer. @@ -434,7 +427,6 @@ impl Spine { activator, exert_logic_param: Vec::default(), exert_logic: None, - phantom: std::marker::PhantomData, } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 07a6c937c..ccf8ff090 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -209,9 +209,6 @@ pub trait TraceReader { pub trait Trace : TraceReader where ::Batch: Batch { - /// A type used to assemble batches from ordered update sequences. - type Builder: Builder; - /// Allocates a new empty trace. fn new( info: ::timely::dataflow::operators::generic::OperatorInfo, @@ -299,6 +296,9 @@ pub trait Batch : BatchReader where Self: ::std::marker::Sized { fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef) -> Self::Merger { Self::Merger::new(self, other, compaction_frontier) } + + /// Produce an empty batch over the indicated interval. + fn empty(lower: Antichain, upper: Antichain) -> Self; } /// Functionality for collecting and batching updates. @@ -441,6 +441,9 @@ pub mod rc_blanket_impls { /// An immutable collection of updates. impl Batch for Rc { type Merger = RcMerger; + fn empty(lower: Antichain, upper: Antichain) -> Self { + Rc::new(B::empty(lower, upper)) + } } /// Wrapper type for building reference counted batches. diff --git a/tests/bfs.rs b/tests/bfs.rs index 43d08c4eb..cc5d8f619 100644 --- a/tests/bfs.rs +++ b/tests/bfs.rs @@ -17,7 +17,7 @@ use differential_dataflow::Collection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arrange; -use differential_dataflow::trace::implementations::ord_neu::{FlatKeyBatcherDefault, FlatKeySpineDefault, FlatValBatcherDefault, FlatValSpineDefault}; +use differential_dataflow::trace::implementations::ord_neu::{FlatKeyBatcherDefault, FlatKeyBuilderDefault, FlatKeySpineDefault, FlatValBatcherDefault, FlatValBuilderDefault, FlatValSpineDefault}; type Node = usize; type Edge = (Node, Node); @@ -246,9 +246,10 @@ fn bfs_differential_flat( let (edge_input, edges) = scope.new_collection(); let c = bfs_flat(&edges, &roots).map(|(_, dist)| (dist, ())); - let arranged = c.arrange::>, FlatKeySpineDefault>(); + let arranged = c.arrange::>, FlatKeyBuilderDefault, FlatKeySpineDefault>(); + type Bu = FlatValBuilderDefault; type T2 = FlatValSpineDefault; - let reduced = arranged.reduce_abelian::<_, _, _, T2>("Count", |_k, s, t| { + let reduced = arranged.reduce_abelian::<_, _, _, Bu, T2>("Count", |_k, s, t| { t.push((s[0].1.clone(), isize::from(1i8))) }); reduced @@ -316,9 +317,10 @@ where let nodes = nodes.enter(&inner.scope()); type Batcher = FlatValBatcherDefault>; + type Builder = FlatValBuilderDefault; type Spine = FlatValSpineDefault; - let arranged1 = inner.arrange::>, Spine>>(); - let arranged2 = edges.arrange::>, Spine>>(); + let arranged1 = inner.arrange::>, Builder>, Spine>>(); + let arranged2 = edges.arrange::>, Builder>, Spine>>(); arranged1 .join_core(&arranged2, move |_k, l, d| Some((d, l + 1))) .concat(&nodes) diff --git a/tests/trace.rs b/tests/trace.rs index addf72cfb..033da8bf7 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -1,12 +1,12 @@ use timely::dataflow::operators::generic::OperatorInfo; use timely::progress::{Antichain, frontier::AntichainRef}; -use differential_dataflow::trace::implementations::{ValBatcher, ValSpine}; +use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine}; use differential_dataflow::trace::{Trace, TraceReader, Batcher}; use differential_dataflow::trace::cursor::Cursor; type IntegerTrace = ValSpine; -type IntegerBuilder = ::Builder; +type IntegerBuilder = ValBuilder; fn get_trace() -> ValSpine { let op_info = OperatorInfo::new(0, 0, [].into());