Skip to content

Commit

Permalink
Extract Builder from Trace (#545)
Browse files Browse the repository at this point in the history
* Extract Builder from Trace

* Update other projects
  • Loading branch information
frankmcsherry authored Dec 6, 2024
1 parent 617ac52 commit cf97c1a
Show file tree
Hide file tree
Showing 19 changed files with 218 additions and 175 deletions.
4 changes: 2 additions & 2 deletions examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ where G::Timestamp: Lattice+Ord {

use differential_dataflow::operators::iterate::SemigroupVariable;
use differential_dataflow::operators::reduce::ReduceCore;
use differential_dataflow::trace::implementations::KeySpine;
use differential_dataflow::trace::implementations::{KeySpine, KeyBuilder};


use timely::order::Product;
Expand All @@ -146,7 +146,7 @@ where G::Timestamp: Lattice+Ord {
.join_map(&edges, |_k,&(),d| *d)
.concat(&roots)
.map(|x| (x,()))
.reduce_core::<_,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| {
.reduce_core::<_,KeyBuilder<_,_,_>,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| {
if output.is_empty() || input[0].1 < output[0].1 {
updates.push(((), input[0].1));
}
Expand Down
34 changes: 17 additions & 17 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,46 +28,46 @@ fn main() {

match mode.as_str() {
"new" => {
use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeySpine};
let data = data.arrange::<ColKeyBatcher<_,_,_>, ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeyBatcher<_,_,_>, ColKeySpine<_,_,_>>();
use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeyBuilder, ColKeySpine};
let data = data.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"old" => {
use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, OrdKeySpine};
let data = data.arrange::<OrdKeyBatcher<_,_,_>, OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeyBatcher<_,_,_>, OrdKeySpine<_,_,_>>();
use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine};
let data = data.arrange::<OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"rhh" => {
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecSpine};
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecSpine<_,(),_,_>>();
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine};
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"slc" => {

use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredSpine};
use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredBuilder, PreferredSpine};

let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredBatcher<[u8],[u8],_,_>, PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredBatcher<[u8],[u8],_,_>, PreferredBuilder<[u8],[u8],_,_>, PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredBatcher<[u8],u8,_,_>, PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredBatcher<[u8],u8,_,_>, PreferredBuilder<[u8],u8,_,_>, PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>,PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));

keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"flat" => {
use differential_dataflow::trace::implementations::ord_neu::{FlatKeyBatcherDefault, FlatKeySpineDefault};
let data = data.arrange::<FlatKeyBatcherDefault<String,usize,isize,_>, FlatKeySpineDefault<String,usize,isize>>();
let keys = keys.arrange::<FlatKeyBatcherDefault<String,usize,isize,_>, FlatKeySpineDefault<String,usize,isize>>();
use differential_dataflow::trace::implementations::ord_neu::{FlatKeyBatcherDefault, FlatKeyBuilderDefault, FlatKeySpineDefault};
let data = data.arrange::<FlatKeyBatcherDefault<String,usize,isize,_>, FlatKeyBuilderDefault<String,usize,isize>, FlatKeySpineDefault<String,usize,isize>>();
let keys = keys.arrange::<FlatKeyBatcherDefault<String,usize,isize,_>, FlatKeyBuilderDefault<String,usize,isize>, FlatKeySpineDefault<String,usize,isize>>();
keys.join_core(&data, |_k, (), ()| Option::<()>::None)
.probe_with(&mut probe);
}
Expand Down
14 changes: 7 additions & 7 deletions experiments/src/bin/deals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;

use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, ValBatcher};
use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, KeyBuilder, ValBatcher, ValBuilder};
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::operators::arrange::Arrange;
Expand Down Expand Up @@ -41,7 +41,7 @@ fn main() {
let (input, graph) = scope.new_collection();

// each edge should exist in both directions.
let graph = graph.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let graph = graph.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();

match program.as_str() {
"tc" => tc(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("tc count: {:?}", x)).probe(),
Expand Down Expand Up @@ -94,10 +94,10 @@ fn tc<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> C
let result =
inner
.map(|(x,y)| (y,x))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&edges, |_y,&x,&z| Some((x, z)))
.concat(&edges.as_collection(|&k,&v| (k,v)))
.arrange::<KeyBatcher<_,_,_>, KeySpine<_,_,_>>()
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;

Expand All @@ -121,12 +121,12 @@ fn sg<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> C

let result =
inner
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&edges, |_,&x,&z| Some((x, z)))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&edges, |_,&x,&z| Some((x, z)))
.concat(&peers)
.arrange::<KeyBatcher<_,_,_>, KeySpine<_,_,_>>()
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;

Expand Down
6 changes: 3 additions & 3 deletions experiments/src/bin/graspan1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use timely::order::Product;

use differential_dataflow::difference::Present;
use differential_dataflow::input::Input;
use differential_dataflow::trace::implementations::{ValBatcher, ValSpine};
use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine};
use differential_dataflow::operators::*;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::operators::iterate::SemigroupVariable;
Expand All @@ -31,7 +31,7 @@ fn main() {
let (n_handle, nodes) = scope.new_collection();
let (e_handle, edges) = scope.new_collection();

let edges = edges.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let edges = edges.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();

// a N c <- a N b && b E c
// N(a,c) <- N(a,b), E(b, c)
Expand All @@ -46,7 +46,7 @@ fn main() {
let next =
labels.join_core(&edges, |_b, a, c| Some((*c, *a)))
.concat(&nodes)
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
// .distinct_total_core::<Diff>();
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None });

Expand Down
40 changes: 20 additions & 20 deletions experiments/src/bin/graspan2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use differential_dataflow::Collection;
use differential_dataflow::input::Input;
use differential_dataflow::operators::*;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::trace::implementations::{ValSpine, KeySpine, ValBatcher, KeyBatcher};
use differential_dataflow::trace::implementations::{ValSpine, KeySpine, ValBatcher, KeyBatcher, ValBuilder, KeyBuilder};
use differential_dataflow::difference::Present;

type Node = u32;
Expand Down Expand Up @@ -47,7 +47,7 @@ fn unoptimized() {
.flat_map(|(a,b)| vec![a,b])
.concat(&dereference.flat_map(|(a,b)| vec![a,b]));

let dereference = dereference.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let dereference = dereference.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();

let (value_flow, memory_alias, value_alias) =
scope
Expand All @@ -60,14 +60,14 @@ fn unoptimized() {
let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1));

let value_flow_arranged = value_flow.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let memory_alias_arranged = memory_alias.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let value_flow_arranged = value_flow.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();
let memory_alias_arranged = memory_alias.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();

// VA(a,b) <- VF(x,a),VF(x,b)
// VA(a,b) <- VF(x,a),MA(x,y),VF(y,b)
let value_alias_next = value_flow_arranged.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)));
let value_alias_next = value_flow_arranged.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a)))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
.concat(&value_alias_next);

Expand All @@ -77,16 +77,16 @@ fn unoptimized() {
let value_flow_next =
assignment
.map(|(a,b)| (b,a))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a)))
.concat(&assignment.map(|(a,b)| (b,a)))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
.concat(&nodes.map(|n| (n,n)));

let value_flow_next =
value_flow_next
.arrange::<ValBatcher<_,_,_,_>, KeySpine<_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand All @@ -95,12 +95,12 @@ fn unoptimized() {
let memory_alias_next: Collection<_,_,Present> =
value_alias_next
.join_core(&dereference, |_x,&y,&a| Some((y,a)))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&dereference, |_y,&a,&b| Some((a,b)));

let memory_alias_next: Collection<_,_,Present> =
memory_alias_next
.arrange::<KeyBatcher<_,_,_>, KeySpine<_,_,_>>()
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand Down Expand Up @@ -172,7 +172,7 @@ fn optimized() {
.flat_map(|(a,b)| vec![a,b])
.concat(&dereference.flat_map(|(a,b)| vec![a,b]));

let dereference = dereference.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let dereference = dereference.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();

let (value_flow, memory_alias) =
scope
Expand All @@ -185,22 +185,22 @@ fn optimized() {
let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1));

let value_flow_arranged = value_flow.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let memory_alias_arranged = memory_alias.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
let value_flow_arranged = value_flow.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();
let memory_alias_arranged = memory_alias.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();

// VF(a,a) <-
// VF(a,b) <- A(a,x),VF(x,b)
// VF(a,b) <- A(a,x),MA(x,y),VF(y,b)
let value_flow_next =
assignment
.map(|(a,b)| (b,a))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a)))
.concat(&assignment.map(|(a,b)| (b,a)))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
.concat(&nodes.map(|n| (n,n)))
.arrange::<KeyBatcher<_,_,_>, KeySpine<_,_,_>>()
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand All @@ -209,9 +209,9 @@ fn optimized() {
let value_flow_deref =
value_flow
.map(|(a,b)| (b,a))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&dereference, |_x,&a,&b| Some((a,b)))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();

// MA(a,b) <- VFD(x,a),VFD(y,b)
// MA(a,b) <- VFD(x,a),MA(x,y),VFD(y,b)
Expand All @@ -222,10 +222,10 @@ fn optimized() {
let memory_alias_next =
memory_alias_arranged
.join_core(&value_flow_deref, |_x,&y,&a| Some((y,a)))
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&value_flow_deref, |_y,&a,&b| Some((a,b)))
.concat(&memory_alias_next)
.arrange::<KeyBatcher<_,_,_>, KeySpine<_,_,_>>()
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand Down
4 changes: 2 additions & 2 deletions interactive/src/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl<V: ExchangeData+Hash+Datum> Render for Plan<V> {
Plan::Distinct(distinct) => {

use differential_dataflow::operators::arrange::ArrangeBySelf;
use differential_dataflow::trace::implementations::KeySpine;
use differential_dataflow::trace::implementations::{KeyBuilder, KeySpine};

let input =
if let Some(mut trace) = arrangements.get_unkeyed(&self) {
Expand All @@ -170,7 +170,7 @@ impl<V: ExchangeData+Hash+Datum> Render for Plan<V> {
input_arrangement
};

let output = input.reduce_abelian::<_,_,_,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1)));
let output = input.reduce_abelian::<_,_,_,KeyBuilder<_,_,_>,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1)));

arrangements.set_unkeyed(&self, &output.trace);
output.as_collection(|k,&()| k.clone())
Expand Down
Loading

0 comments on commit cf97c1a

Please sign in to comment.