Skip to content

Commit

Permalink
Introduce RHH prototype; appropriate columnation example
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 24, 2023
1 parent 49ba02f commit 103c9a7
Show file tree
Hide file tree
Showing 3 changed files with 848 additions and 19 deletions.
46 changes: 27 additions & 19 deletions examples/columnation.rs → examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,9 @@ fn main() {
let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let size: usize = std::env::args().nth(2).unwrap().parse().unwrap();

let mode = std::env::args().any(|a| a == "new");
let mode: String = std::env::args().nth(3).unwrap();

if mode {
println!("Running NEW arrangement");
}
else {
println!("Running OLD arrangement");
}
println!("Running [{:?}] arrangement", mode);

let timer1 = ::std::time::Instant::now();
let timer2 = timer1.clone();
Expand All @@ -30,22 +25,35 @@ fn main() {
let (mut data_input, mut keys_input) = worker.dataflow(|scope| {

use differential_dataflow::operators::{arrange::Arrange, JoinCore};
use differential_dataflow::trace::implementations::ord::{OrdKeySpine, ColKeySpine};

let (data_input, data) = scope.new_collection::<String, isize>();
let (keys_input, keys) = scope.new_collection::<String, isize>();

if mode {
let data = data.arrange::<ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
}
else {
let data = data.arrange::<OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
match mode.as_str() {
"new" => {
use differential_dataflow::trace::implementations::ord::ColKeySpine;
let data = data.arrange::<ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"old" => {
use differential_dataflow::trace::implementations::ord::OrdKeySpine;
let data = data.arrange::<OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"rhh" => {
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
_ => {
println!("unreconized mode: {:?}", mode)
}
}

(data_input, keys_input)
Expand Down
1 change: 1 addition & 0 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub use self::merge_batcher::MergeBatcher as Batcher;

pub mod ord;
pub mod ord_neu;
pub mod rhh;

// Opinionated takes on default spines.
pub use self::ord::OrdValSpine as ValSpine;
Expand Down
Loading

0 comments on commit 103c9a7

Please sign in to comment.