From 5ea6fe53464c1de652b2f708c785c1c7f552caca Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 21 Nov 2023 11:06:35 -0500 Subject: [PATCH] Configurable batcher implementation in ord_neu (#422) Signed-off-by: Moritz Hoffmann --- src/trace/implementations/ord_neu.rs | 72 ++++++++++++++++++++-------- 1 file changed, 53 insertions(+), 19 deletions(-) diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 03891f8c2..37751ef33 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -9,6 +9,7 @@ //! and should consume fewer resources (computation and memory) when it applies. use std::rc::Rc; +use timely::container::columnation::TimelyStack; use trace::implementations::spine_fueled::Spine; @@ -18,21 +19,24 @@ use self::val_batch::{OrdValBatch}; /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine>>>; +pub type OrdValSpine = Spine, Vec<((K,V),T,R)>>>>; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; /// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine>>>; +pub type ColValSpine = Spine, TimelyStack<((K,V),T,R)>>>>; // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; mod val_batch { use std::convert::TryInto; + use std::marker::PhantomData; + use timely::container::columnation::{Columnation, TimelyStack}; use timely::progress::{Antichain, frontier::AntichainRef}; use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; + use trace::implementations::merge_batcher_col::ColumnatedMergeBatcher; use trace::layers::BatchContainer; use super::{Layout, Update}; @@ -82,8 +86,11 @@ mod val_batch { } /// An immutable collection of update tuples, from a contiguous interval of logical times. + /// + /// The `L` parameter captures how the updates should be laid out, and `C` determines which + /// merge batcher to select. #[derive(Abomonation)] - pub struct OrdValBatch { + pub struct OrdValBatch { /// The updates themselves. pub storage: OrdValStorage, /// Description of the update times this layer represents. @@ -94,20 +101,22 @@ mod val_batch { /// we may have many more updates than `storage.updates.len()`. It should equal that /// length, plus the number of singleton optimizations employed. pub updates: usize, + /// Phantom marker for Rust happiness. + pub phantom: PhantomData, } - impl BatchReader for OrdValBatch { + impl BatchReader for OrdValBatch { type Key = ::Key; type Val = ::Val; type Time = ::Time; type R = ::Diff; - type Cursor = OrdValCursor; + type Cursor = OrdValCursor; fn cursor(&self) -> Self::Cursor { OrdValCursor { key_cursor: 0, val_cursor: 0, - phantom: std::marker::PhantomData, + phantom: PhantomData, } } fn len(&self) -> usize { @@ -118,7 +127,7 @@ mod val_batch { fn description(&self) -> &Description<::Time> { &self.description } } - impl Batch for OrdValBatch { + impl Batch for OrdValBatch> { type Batcher = MergeBatcher; type Builder = OrdValBuilder; type Merger = OrdValMerger; @@ -128,6 +137,23 @@ mod val_batch { } } + impl Batch for OrdValBatch> + where + ::Target: Columnation, + Self::Key: Columnation + 'static, + Self::Val: Columnation + 'static, + Self::Time: Columnation + 'static, + Self::R: Columnation + 'static, + { + type Batcher = ColumnatedMergeBatcher; + type Builder = OrdValBuilder; + type Merger = OrdValMerger; + + fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { + OrdValMerger::new(self, other, compaction_frontier) + } + } + /// State for an in-progress merge. pub struct OrdValMerger { /// Key position to merge next in the first batch. @@ -148,8 +174,11 @@ mod val_batch { singletons: usize, } - impl Merger> for OrdValMerger { - fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { + impl Merger> for OrdValMerger + where + OrdValBatch: Batch::Time> + { + fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { assert!(batch1.upper() == batch2.lower()); use lattice::Lattice; @@ -181,14 +210,15 @@ mod val_batch { singletons: 0, } } - fn done(self) -> OrdValBatch { + fn done(self) -> OrdValBatch { OrdValBatch { updates: self.result.updates.len() + self.singletons, storage: self.result, description: self.description, + phantom: PhantomData } } - fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { // An (incomplete) indication of the amount of work we've done so far. let starting_updates = self.result.updates.len(); @@ -387,25 +417,25 @@ mod val_batch { } /// A cursor for navigating a single layer. - pub struct OrdValCursor { + pub struct OrdValCursor { /// Absolute position of the current key. key_cursor: usize, /// Absolute position of the current value. val_cursor: usize, /// Phantom marker for Rust happiness. - phantom: std::marker::PhantomData, + phantom: PhantomData<(L, C)>, } - impl Cursor for OrdValCursor { + impl Cursor for OrdValCursor { type Key = ::Key; type Val = ::Val; type Time = ::Time; type R = ::Diff; - type Storage = OrdValBatch; + type Storage = OrdValBatch; - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &storage.storage.keys.index(self.key_cursor.try_into().ok().unwrap()) } - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { &storage.storage.vals.index(self.val_cursor.try_into().ok().unwrap()) } + fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { storage.storage.keys.index(self.key_cursor) } + fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { storage.storage.vals.index(self.val_cursor) } fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { let (lower, upper) = storage.storage.updates_for_value(self.val_cursor); for index in lower .. upper { @@ -489,7 +519,10 @@ mod val_batch { } } - impl Builder> for OrdValBuilder { + impl Builder> for OrdValBuilder + where + OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> + { fn new() -> Self { Self::with_capacity(0) } fn with_capacity(cap: usize) -> Self { @@ -564,7 +597,7 @@ mod val_batch { } #[inline(never)] - fn done(mut self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdValBatch { + fn done(mut self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdValBatch { // Record the final offsets self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap()); // Remove any pending singleton, and if it was set increment our count. @@ -574,6 +607,7 @@ mod val_batch { updates: self.result.updates.len() + self.singletons, storage: self.result, description: Description::new(lower, upper, since), + phantom: PhantomData, } } }