Skip to content

Commit

Permalink
WIP checkin
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 25, 2023
1 parent 615c688 commit 629458e
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 134 deletions.
41 changes: 26 additions & 15 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,35 @@ fn main() {
let (keys_input, keys) = scope.new_collection::<String, isize>();

match mode.as_str() {
"new" => {
use differential_dataflow::trace::implementations::ord::ColKeySpine;
let data = data.arrange::<ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"old" => {
use differential_dataflow::trace::implementations::ord::OrdKeySpine;
let data = data.arrange::<OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeySpine<_,_,_>>();
// "new" => {
// use differential_dataflow::trace::implementations::ord::ColKeySpine;
// let data = data.arrange::<ColKeySpine<_,_,_>>();
// let keys = keys.arrange::<ColKeySpine<_,_,_>>();
// keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
// .probe_with(&mut probe);
// },
// "old" => {
// use differential_dataflow::trace::implementations::ord::OrdKeySpine;
// let data = data.arrange::<OrdKeySpine<_,_,_>>();
// let keys = keys.arrange::<OrdKeySpine<_,_,_>>();
// keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
// .probe_with(&mut probe);
// },
"rhh" => {

use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
let data = data.map(|x| (HashWrapper { inner: x }, ())).arrange::<VecSpine<HashWrapper<String>,(),_,_>>();
let keys = keys.map(|x| (HashWrapper { inner: x }, ())).arrange::<VecSpine<HashWrapper<String>,(),_,_>>();

keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"rhh" => {
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
"slc" => {

use differential_dataflow::trace::implementations::ord_neu::SlcValSpine;
let data = data.map(|x| (x.into_bytes(), ())).arrange::<SlcValSpine<u8,(),_,_>>();
let keys = keys.map(|x| (x.into_bytes(), ())).arrange::<SlcValSpine<u8,(),_,_>>();

keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
Expand Down
86 changes: 23 additions & 63 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,8 @@ where
pub trait Arrange<G: Scope, K, V, R: Semigroup>
where
G::Timestamp: Lattice,
K: Data,
K: ToOwned + ?Sized,
K::Owned: Data,
V: Data,
{
/// Arranges a stream of `(Key, Val)` updates by `Key`. Accepts an empty instance of the trace type.
Expand All @@ -451,13 +452,13 @@ where
/// is the correct way to determine that times in the shared trace are committed.
fn arrange<Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
where
K: ExchangeData+Hashable,
K::Owned: ExchangeData+Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr: Trace<Key=K>+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.arrange_named("Arrange")
}
Expand All @@ -469,15 +470,15 @@ where
/// is the correct way to determine that times in the shared trace are committed.
fn arrange_named<Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
K: ExchangeData+Hashable,
K::Owned: ExchangeData+Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr: Trace<Key=K>+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
let exchange = Exchange::new(move |update: &((K::Owned,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
}

Expand All @@ -488,54 +489,30 @@ where
/// is the correct way to determine that times in the shared trace are committed.
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
P: ParallelizationContract<G::Timestamp, ((K::Owned,V),G::Timestamp,R)>,
Tr: Trace<Key=K>+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
;
}

impl<G, K, V, R> Arrange<G, K, V, R> for Collection<G, (K, V), R>
impl<G, K, V, R> Arrange<G, K, V, R> for Collection<G, (K::Owned, V), R>
where
G: Scope,
G::Timestamp: Lattice+Ord,
K: Data,
K: ToOwned + ?Sized,
K::Owned: Data,
V: Data,
R: Semigroup,
{
fn arrange<Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
where
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace + TraceReader<Time=G::Timestamp> + 'static, Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.arrange_named("Arrange")
}

fn arrange_named<Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace + TraceReader<Time=G::Timestamp> + 'static, Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
}

fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
P: ParallelizationContract<G::Timestamp, ((K::Owned,V),G::Timestamp,R)>,
Tr: Trace<Key=K>+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
// The `Arrange` operator is tasked with reacting to an advancing input
// frontier by producing the sequence of batches whose lower and upper
Expand Down Expand Up @@ -690,23 +667,6 @@ where
}
}

impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, K, (), R> for Collection<G, K, R>
where
G::Timestamp: Lattice+Ord,
{
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.map(|k| (k, ()))
.arrange_core(pact, name)
}
}

/// Arranges something as `(Key,Val)` pairs according to a type `T` of trace.
///
/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed
Expand All @@ -726,7 +686,7 @@ where G::Timestamp: Lattice+Ord {
}

impl<G: Scope, K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData+Semigroup> ArrangeByKey<G, K, V, R> for Collection<G, (K,V), R>
where
where
G::Timestamp: Lattice+Ord
{
fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
Expand Down
5 changes: 3 additions & 2 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ where
/// This method is used by the various `join` implementations, but it can also be used
/// directly in the event that one has a handle to an `Arranged<G,T>`, perhaps because
/// the arrangement is available for re-use, or from the output of a `reduce` operator.
pub trait JoinCore<G: Scope, K: 'static, V: 'static, R: Semigroup> where G::Timestamp: Lattice+Ord {
pub trait JoinCore<G: Scope, K: 'static + ?Sized, V: 'static, R: Semigroup> where G::Timestamp: Lattice+Ord {

/// Joins two arranged collections with the same key type.
///
Expand Down Expand Up @@ -635,6 +635,7 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
/// dataflow system a chance to run operators that can consume and aggregate the data.
struct Deferred<K, T, R, S1, S2, C1, C2, D>
where
K: ?Sized,
T: Timestamp+Lattice+Ord+Debug,
R: Semigroup,
C1: Cursor<S1, Key=K, Time=T>,
Expand All @@ -657,7 +658,7 @@ where

impl<K, T, R, S1, S2, C1, C2, D> Deferred<K, T, R, S1, S2, C1, C2, D>
where
K: Ord+Debug+Eq,
K: Ord+Debug+Eq + ?Sized,
C1: Cursor<S1, Key=K, Time=T>,
C2: Cursor<S2, Key=K, Time=T>,
C1::Val: Ord+Clone+Debug,
Expand Down
6 changes: 3 additions & 3 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ use trace::implementations::Update;

/// Creates batches from unordered tuples.
pub struct MergeBatcher<U: Update> {
sorter: MergeSorter<(U::Key, U::Val), U::Time, U::Diff>,
sorter: MergeSorter<(U::KeyOwned, U::Val), U::Time, U::Diff>,
lower: Antichain<U::Time>,
frontier: Antichain<U::Time>,
phantom: ::std::marker::PhantomData<U>,
}

impl<U: Update> Batcher for MergeBatcher<U> {
type Item = ((U::Key,U::Val),U::Time,U::Diff);
type Item = ((U::KeyOwned,U::Val),U::Time,U::Diff);
type Time = U::Time;

fn new() -> Self {
Expand Down Expand Up @@ -126,7 +126,7 @@ impl<U: Update> Batcher for MergeBatcher<U> {
let mut buffer = Vec::new();
self.sorter.push(&mut buffer);
// We recycle buffers with allocations (capacity, and not zero-sized).
while buffer.capacity() > 0 && std::mem::size_of::<((U::Key,U::Val),U::Time,U::Diff)>() > 0 {
while buffer.capacity() > 0 && std::mem::size_of::<((U::KeyOwned,U::Val),U::Time,U::Diff)>() > 0 {
buffer = Vec::new();
self.sorter.push(&mut buffer);
}
Expand Down
8 changes: 4 additions & 4 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,25 @@ use trace::implementations::Update;
/// Creates batches from unordered tuples.
pub struct ColumnatedMergeBatcher<U: Update>
where
U::Key: Columnation,
U::KeyOwned: Columnation,
U::Val: Columnation,
U::Time: Columnation,
U::Diff: Columnation,
{
sorter: MergeSorterColumnation<(U::Key, U::Val), U::Time, U::Diff>,
sorter: MergeSorterColumnation<(U::KeyOwned, U::Val), U::Time, U::Diff>,
lower: Antichain<U::Time>,
frontier: Antichain<U::Time>,
phantom: PhantomData<U>,
}

impl<U: Update> Batcher for ColumnatedMergeBatcher<U>
where
U::Key: Columnation + 'static,
U::KeyOwned: Columnation + 'static,
U::Val: Columnation + 'static,
U::Time: Columnation + 'static,
U::Diff: Columnation + 'static,
{
type Item = ((U::Key,U::Val),U::Time,U::Diff);
type Item = ((U::KeyOwned,U::Val),U::Time,U::Diff);
type Time = U::Time;

fn new() -> Self {
Expand Down
Loading

0 comments on commit 629458e

Please sign in to comment.