From 1bd6c4ec9d98343b89d49e9a4ad28483355c9231 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 22 Nov 2023 14:34:54 -0500 Subject: [PATCH] Allow Batcher::seal to be generic in Builder --- src/operators/arrange/arrangement.rs | 34 +++---- src/operators/arrange/upsert.rs | 2 +- src/operators/consolidate.rs | 6 +- src/operators/reduce.rs | 8 +- src/trace/implementations/merge_batcher.rs | 40 +++----- .../implementations/merge_batcher_col.rs | 51 +++++----- src/trace/implementations/ord.rs | 44 +++++---- src/trace/implementations/ord_neu.rs | 19 ++-- src/trace/mod.rs | 98 +++++++------------ tests/trace.rs | 6 +- 10 files changed, 143 insertions(+), 165 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index ae929dbdd..ca6986328 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -29,7 +29,7 @@ use timely::dataflow::operators::Capability; use ::{Data, ExchangeData, Collection, AsCollection, Hashable}; use ::difference::Semigroup; use lattice::Lattice; -use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Cursor}; +use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor}; use trace::implementations::{KeySpine, ValSpine}; use trace::wrappers::enter::{TraceEnter, BatchEnter,}; @@ -456,8 +456,8 @@ where R: ExchangeData, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, { self.arrange_named("Arrange") } @@ -474,8 +474,8 @@ where R: ExchangeData, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); self.arrange_core(exchange, name) @@ -491,8 +491,8 @@ where P: ParallelizationContract, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, ; } @@ -510,8 +510,8 @@ where V: ExchangeData, R: ExchangeData, Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, { self.arrange_named("Arrange") } @@ -522,8 +522,8 @@ where V: ExchangeData, R: ExchangeData, Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); self.arrange_core(exchange, name) @@ -534,8 +534,8 @@ where P: ParallelizationContract, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, { // The `Arrange` operator is tasked with reacting to an advancing input // frontier by producing the sequence of batches whose lower and upper @@ -645,7 +645,7 @@ where } // Extract updates not in advance of `upper`. - let batch = batcher.seal(upper.clone()); + let batch = batcher.seal::<::Builder>(upper.clone()); writer.insert(batch.clone(), Some(capability.time().clone())); @@ -673,7 +673,7 @@ where } else { // Announce progress updates, even without data. - let _batch = batcher.seal(input.frontier().frontier().to_owned()); + let _batch = batcher.seal::<::Builder>(input.frontier().frontier().to_owned()); writer.seal(input.frontier().frontier().to_owned()); } @@ -699,8 +699,8 @@ where P: ParallelizationContract, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, { self.map(|k| (k, ())) .arrange_core(pact, name) diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 242203b20..bdfa23fb2 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -146,7 +146,7 @@ where Tr::Val: ExchangeData, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - ::Builder: Builder, + ::Builder: Builder, { let mut reader: Option> = None; diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 2ace0a06c..52776d781 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -13,7 +13,7 @@ use ::difference::Semigroup; use Data; use lattice::Lattice; -use trace::{Batch, Batcher}; +use trace::{Batch, Batcher, Builder}; /// Methods which require data be arrangeable. impl Collection @@ -58,8 +58,8 @@ where where Tr: crate::trace::Trace+crate::trace::TraceReader+'static, Tr::Batch: crate::trace::Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, { use operators::arrange::arrangement::Arrange; self.map(|k| (k, ())) diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 3a8ce9bc1..7185e83da 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -276,7 +276,7 @@ pub trait ReduceCore where G::Timestam T2::Val: Data, T2::R: Abelian, T2::Batch: Batch, - ::Builder: Builder, + ::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static, { self.reduce_core::<_,T2>(name, move |key, input, output, change| { @@ -299,7 +299,7 @@ pub trait ReduceCore where G::Timestam T2::Val: Data, T2::R: Semigroup, T2::Batch: Batch, - ::Builder: Builder, + ::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static ; } @@ -318,7 +318,7 @@ where T2::R: Semigroup, T2: Trace+TraceReader+'static, T2::Batch: Batch, - ::Builder: Builder, + ::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -337,7 +337,7 @@ where T2::Val: Data, T2::R: Semigroup, T2::Batch: Batch, - ::Builder: Builder, + ::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { let mut result_trace = None; diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index ae367f3a0..9daec42b3 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -7,34 +7,26 @@ use timely::progress::frontier::Antichain; use ::difference::Semigroup; -use lattice::Lattice; -use trace::{Batch, Batcher, Builder}; +use trace::{Batcher, Builder}; +use trace::implementations::Update; /// Creates batches from unordered tuples. -pub struct MergeBatcher where B::Key: Ord, B::Val: Ord, B::Time: Ord, B::R: Semigroup { - sorter: MergeSorter<(B::Key, B::Val), B::Time, B::R>, - lower: Antichain, - frontier: Antichain, - phantom: ::std::marker::PhantomData, +pub struct MergeBatcher { + sorter: MergeSorter<(U::Key, U::Val), U::Time, U::Diff>, + lower: Antichain, + frontier: Antichain, + phantom: ::std::marker::PhantomData, } -impl Batcher for MergeBatcher -where - B: Batch, - B::Key: Ord+Clone, - B::Val: Ord+Clone, - B::Time: Lattice+timely::progress::Timestamp+Ord+Clone, - B::R: Semigroup, - B::Builder: Builder, -{ - type Item = ((B::Key,B::Val),B::Time,B::R); - type Time = B::Time; +impl Batcher for MergeBatcher { + type Item = ((U::Key,U::Val),U::Time,U::Diff); + type Time = U::Time; fn new() -> Self { MergeBatcher { sorter: MergeSorter::new(), frontier: Antichain::new(), - lower: Antichain::from_elem(::minimum()), + lower: Antichain::from_elem(::minimum()), phantom: ::std::marker::PhantomData, } } @@ -61,9 +53,9 @@ where // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline(never)] - fn seal(&mut self, upper: Antichain) -> B { + fn seal>(&mut self, upper: Antichain) -> B::Output { - let mut builder = B::Builder::new(); + let mut builder = B::new(); let mut merged = Vec::new(); self.sorter.finish_into(&mut merged); @@ -109,18 +101,18 @@ where let mut buffer = Vec::new(); self.sorter.push(&mut buffer); // We recycle buffers with allocations (capacity, and not zero-sized). - while buffer.capacity() > 0 && std::mem::size_of::<((B::Key,B::Val),B::Time,B::R)>() > 0 { + while buffer.capacity() > 0 && std::mem::size_of::<((U::Key,U::Val),U::Time,U::Diff)>() > 0 { buffer = Vec::new(); self.sorter.push(&mut buffer); } - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); + let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); self.lower = upper; seal } // the frontier of elements remaining after the most recent call to `self.seal`. - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.frontier.borrow() } } diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 8a316e32f..3b1d5eaec 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -8,39 +8,38 @@ use timely::progress::frontier::Antichain; use ::difference::Semigroup; -use lattice::Lattice; -use trace::{Batch, Batcher, Builder}; +use trace::{Batcher, Builder}; +use trace::implementations::Update; /// Creates batches from unordered tuples. -pub struct ColumnatedMergeBatcher - where - B::Key: Ord+Clone+Columnation, - B::Val: Ord+Clone+Columnation, - B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation, - B::R: Semigroup+Columnation, +pub struct ColumnatedMergeBatcher +where + U::Key: Columnation, + U::Val: Columnation, + U::Time: Columnation, + U::Diff: Columnation, { - sorter: MergeSorterColumnation<(B::Key, B::Val), B::Time, B::R>, - lower: Antichain, - frontier: Antichain, - phantom: PhantomData, + sorter: MergeSorterColumnation<(U::Key, U::Val), U::Time, U::Diff>, + lower: Antichain, + frontier: Antichain, + phantom: PhantomData, } -impl Batcher for ColumnatedMergeBatcher - where - B::Key: Ord+Clone+Columnation+'static, - B::Val: Ord+Clone+Columnation+'static, - B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation+'static, - B::R: Semigroup+Columnation+'static, - B::Builder: Builder, +impl Batcher for ColumnatedMergeBatcher +where + U::Key: Columnation + 'static, + U::Val: Columnation + 'static, + U::Time: Columnation + 'static, + U::Diff: Columnation + 'static, { - type Item = ((B::Key,B::Val),B::Time,B::R); - type Time = B::Time; + type Item = ((U::Key,U::Val),U::Time,U::Diff); + type Time = U::Time; fn new() -> Self { ColumnatedMergeBatcher { sorter: MergeSorterColumnation::new(), frontier: Antichain::new(), - lower: Antichain::from_elem(::minimum()), + lower: Antichain::from_elem(::minimum()), phantom: PhantomData, } } @@ -65,9 +64,9 @@ impl Batcher for ColumnatedMergeBatcher // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline] - fn seal(&mut self, upper: Antichain) -> B { + fn seal>(&mut self, upper: Antichain) -> B::Output { - let mut builder = B::Builder::new(); + let mut builder = B::new(); let mut merged = Default::default(); self.sorter.finish_into(&mut merged); @@ -106,13 +105,13 @@ impl Batcher for ColumnatedMergeBatcher // Drain buffers (fast reclamation). self.sorter.clear_stash(); - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); + let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); self.lower = upper; seal } // the frontier of elements remaining after the most recent call to `self.seal`. - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.frontier.borrow() } } diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 94b124b0a..3ac2946a3 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -94,8 +94,8 @@ impl BatchReader for OrdValBatch { impl Batch for OrdValBatch> { - type Batcher = MergeBatcher; - type Builder = OrdValBuilder; + type Batcher = MergeBatcher; + type Builder = OrdValBuilder>; type Merger = OrdValMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -111,8 +111,8 @@ where Self::Time: Columnation + 'static, Self::R: Columnation + 'static, { - type Batcher = ColumnatedMergeBatcher; - type Builder = OrdValBuilder; + type Batcher = ColumnatedMergeBatcher; + type Builder = OrdValBuilder>; type Merger = OrdValMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -337,26 +337,30 @@ impl Cursor> for OrdValCursor { } /// A builder for creating layers from unsorted update tuples. -pub struct OrdValBuilder { +pub struct OrdValBuilder { builder: KVTDBuilder, + phantom: PhantomData, } -impl Builder> for OrdValBuilder +impl Builder for OrdValBuilder where OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> { type Item = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; + type Output = OrdValBatch; fn new() -> Self { OrdValBuilder { - builder: >::new() + builder: >::new(), + phantom: std::marker::PhantomData, } } fn with_capacity(cap: usize) -> Self { OrdValBuilder { - builder: as TupleBuilder>::with_capacity(cap) + builder: as TupleBuilder>::with_capacity(cap), + phantom: std::marker::PhantomData, } } @@ -411,9 +415,9 @@ impl BatchReader for OrdKeyBatch { fn description(&self) -> &Description<::Time> { &self.desc } } -impl Batch for OrdKeyBatch> { - type Batcher = MergeBatcher; - type Builder = OrdKeyBuilder; +impl Batch for OrdKeyBatch> where L::Target: Update { + type Batcher = MergeBatcher; + type Builder = OrdKeyBuilder>; type Merger = OrdKeyMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -423,13 +427,13 @@ impl Batch for OrdKeyBatch> { impl Batch for OrdKeyBatch> where - ::Target: Columnation + 'static, + ::Target: Update + Columnation + 'static, Self::Key: Columnation + 'static, Self::Time: Columnation + 'static, Self::R: Columnation + 'static, { - type Batcher = ColumnatedMergeBatcher; - type Builder = OrdKeyBuilder; + type Batcher = ColumnatedMergeBatcher; + type Builder = OrdKeyBuilder>; type Merger = OrdKeyMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -636,26 +640,30 @@ impl Cursor> for OrdKeyCursor { /// A builder for creating layers from unsorted update tuples. -pub struct OrdKeyBuilder { +pub struct OrdKeyBuilder { builder: KTDBuilder, + phantom: std::marker::PhantomData } -impl Builder> for OrdKeyBuilder +impl Builder for OrdKeyBuilder where OrdKeyBatch: Batch::Key, Val=(), Time=::Time, R=::Diff> { type Item = ((::Key, ()), ::Time, ::Diff); type Time = ::Time; + type Output = OrdKeyBatch; fn new() -> Self { OrdKeyBuilder { - builder: >::new() + builder: >::new(), + phantom: std::marker::PhantomData, } } fn with_capacity(cap: usize) -> Self { OrdKeyBuilder { - builder: as TupleBuilder>::with_capacity(cap) + builder: as TupleBuilder>::with_capacity(cap), + phantom: std::marker::PhantomData, } } diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index d8dbe5cf4..694af897c 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -128,8 +128,8 @@ mod val_batch { } impl Batch for OrdValBatch> { - type Batcher = MergeBatcher; - type Builder = OrdValBuilder; + type Batcher = MergeBatcher; + type Builder = OrdValBuilder>; type Merger = OrdValMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -145,8 +145,8 @@ mod val_batch { Self::Time: Columnation + 'static, Self::R: Columnation + 'static, { - type Batcher = ColumnatedMergeBatcher; - type Builder = OrdValBuilder; + type Batcher = ColumnatedMergeBatcher; + type Builder = OrdValBuilder>; type Merger = OrdValMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -479,7 +479,7 @@ mod val_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct OrdValBuilder { + pub struct OrdValBuilder { result: OrdValStorage, singleton: Option<(::Time, ::Diff)>, /// Counts the number of singleton optimizations we performed. @@ -487,9 +487,11 @@ mod val_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + /// Phantom marker for Rust happiness. + pub phantom: PhantomData, } - impl OrdValBuilder { + impl OrdValBuilder { /// Pushes a single update, which may set `self.singleton` rather than push. /// /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`. @@ -517,13 +519,13 @@ mod val_batch { } } - impl Builder> for OrdValBuilder + impl Builder for OrdValBuilder where OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> { - type Item = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; + type Output = OrdValBatch; fn new() -> Self { Self::with_capacity(0) } fn with_capacity(cap: usize) -> Self { @@ -538,6 +540,7 @@ mod val_batch { }, singleton: None, singletons: 0, + phantom: std::marker::PhantomData, } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index a94db6c11..0f5bffaa3 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -280,9 +280,9 @@ where /// An immutable collection of updates. pub trait Batch : BatchReader where Self: ::std::marker::Sized { /// A type used to assemble batches from disordered updates. - type Batcher: Batcher; + type Batcher: Batcher