From 4424162b734da897f532972639fb667901f2cc1d Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 22 Nov 2023 07:51:17 -0500 Subject: [PATCH 1/7] Make Cursor trait generic in Storage --- src/operators/arrange/agent.rs | 5 +- src/operators/join.rs | 18 +++--- src/operators/mod.rs | 14 ++--- src/operators/reduce.rs | 28 ++++----- src/trace/cursor/cursor_list.rs | 36 ++++++------ src/trace/cursor/cursor_pair.rs | 30 +++++----- src/trace/cursor/mod.rs | 33 +++++------ src/trace/implementations/ord.rs | 52 ++++++++--------- src/trace/implementations/ord_neu.rs | 26 ++++----- src/trace/implementations/spine_fueled.rs | 5 +- src/trace/mod.rs | 63 ++++++++++---------- src/trace/wrappers/enter.rs | 61 +++++++++---------- src/trace/wrappers/enter_at.rs | 61 +++++++++---------- src/trace/wrappers/filter.rs | 61 +++++++++---------- src/trace/wrappers/freeze.rs | 61 +++++++++---------- src/trace/wrappers/frontier.rs | 71 +++++++++++------------ src/trace/wrappers/rc.rs | 4 +- 17 files changed, 302 insertions(+), 327 deletions(-) diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 847647430..c477adaa3 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -11,7 +11,7 @@ use timely::progress::{Antichain, frontier::AntichainRef}; use timely::dataflow::operators::CapabilitySet; use lattice::Lattice; -use trace::{Trace, TraceReader, Batch, BatchReader, Cursor}; +use trace::{Trace, TraceReader, Batch, BatchReader}; use trace::wrappers::rc::TraceBox; @@ -53,6 +53,7 @@ where type R = Tr::R; type Batch = Tr::Batch; + type Storage = Tr::Storage; type Cursor = Tr::Cursor; fn set_logical_compaction(&mut self, frontier: AntichainRef) { @@ -77,7 +78,7 @@ where fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_compaction.borrow() } - fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Tr::Cursor, ::Storage)> { + fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Self::Cursor, Self::Storage)> { self.trace.borrow_mut().trace.cursor_through(frontier) } fn map_batches(&self, f: F) { self.trace.borrow().trace.map_batches(f) } diff --git a/src/operators/join.rs b/src/operators/join.rs index f530918df..c1708a759 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -633,12 +633,12 @@ impl JoinCore for Arranged /// The structure wraps cursors which allow us to play out join computation at whatever rate we like. /// This allows us to avoid producing and buffering massive amounts of data, without giving the timely /// dataflow system a chance to run operators that can consume and aggregate the data. -struct Deferred +struct Deferred where T: Timestamp+Lattice+Ord+Debug, R: Semigroup, - C1: Cursor, - C2: Cursor, + C1: Cursor, + C2: Cursor, C1::Val: Ord+Clone, C2::Val: Ord+Clone, C1::R: Semigroup, @@ -647,19 +647,19 @@ where { phant: ::std::marker::PhantomData, trace: C1, - trace_storage: C1::Storage, + trace_storage: S1, batch: C2, - batch_storage: C2::Storage, + batch_storage: S2, capability: Capability, done: bool, temp: Vec<((D, T), R)>, } -impl Deferred +impl Deferred where K: Ord+Debug+Eq, - C1: Cursor, - C2: Cursor, + C1: Cursor, + C2: Cursor, C1::Val: Ord+Clone+Debug, C2::Val: Ord+Clone+Debug, C1::R: Semigroup, @@ -668,7 +668,7 @@ where R: Semigroup, D: Clone+Data, { - fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability) -> Self { + fn new(trace: C1, trace_storage: S1, batch: C2, batch_storage: S2, capability: Capability) -> Self { Deferred { phant: ::std::marker::PhantomData, trace, diff --git a/src/operators/mod.rs b/src/operators/mod.rs index 9f048cd5d..e0040443d 100644 --- a/src/operators/mod.rs +++ b/src/operators/mod.rs @@ -38,8 +38,8 @@ impl<'a, V:'a, T, R> EditList<'a, V, T, R> where T: Ord+Clone, R: Semigroup { } } /// Loads the contents of a cursor. - fn load(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L) - where V: Clone, C: Cursor, C::Key: Eq, L: Fn(&T)->T { + fn load(&mut self, cursor: &mut C, storage: &'a S, logic: L) + where V: Clone, C: Cursor, C::Key: Eq, L: Fn(&T)->T { self.clear(); while cursor.val_valid(storage) { cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.clone())); @@ -101,22 +101,22 @@ impl<'storage, V: Ord+Clone+'storage, T: Lattice+Ord+Clone, R: Semigroup> ValueH self.history.clear(); self.buffer.clear(); } - fn load(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L) - where C: Cursor, C::Key: Eq, L: Fn(&T)->T { + fn load(&mut self, cursor: &mut C, storage: &'storage S, logic: L) + where C: Cursor, C::Key: Eq, L: Fn(&T)->T { self.edits.load(cursor, storage, logic); } /// Loads and replays a specified key. /// /// If the key is absent, the replayed history will be empty. - fn replay_key<'history, C, L>( + fn replay_key<'history, S, C, L>( &'history mut self, cursor: &mut C, - storage: &'storage C::Storage, + storage: &'storage S, key: &C::Key, logic: L ) -> HistoryReplay<'storage, 'history, V, T, R> - where C: Cursor, C::Key: Eq, L: Fn(&T)->T + where C: Cursor, C::Key: Eq, L: Fn(&T)->T { self.clear(); cursor.seek_key(storage, key); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index bdf7b6dae..0c84f5953 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -651,12 +651,12 @@ where R2: Semigroup, { fn new() -> Self; - fn compute( + fn compute( &mut self, key: &K, - source_cursor: (&mut C1, &'a C1::Storage), - output_cursor: (&mut C2, &'a C2::Storage), - batch_cursor: (&mut C3, &'a C3::Storage), + source_cursor: (&mut C1, &'a S1), + output_cursor: (&mut C2, &'a S2), + batch_cursor: (&mut C3, &'a S3), times: &mut Vec, logic: &mut L, upper_limit: &Antichain, @@ -664,9 +664,9 @@ where new_interesting: &mut Vec) -> (usize, usize) where K: Eq+Clone, - C1: Cursor, - C2: Cursor, - C3: Cursor, + C1: Cursor, + C2: Cursor, + C3: Cursor, L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>); } @@ -729,12 +729,12 @@ mod history_replay { } } #[inline(never)] - fn compute( + fn compute( &mut self, key: &K, - (source_cursor, source_storage): (&mut C1, &'a C1::Storage), - (output_cursor, output_storage): (&mut C2, &'a C2::Storage), - (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage), + (source_cursor, source_storage): (&mut C1, &'a S1), + (output_cursor, output_storage): (&mut C2, &'a S2), + (batch_cursor, batch_storage): (&mut C3, &'a S3), times: &mut Vec, logic: &mut L, upper_limit: &Antichain, @@ -742,9 +742,9 @@ mod history_replay { new_interesting: &mut Vec) -> (usize, usize) where K: Eq+Clone, - C1: Cursor, - C2: Cursor, - C3: Cursor, + C1: Cursor, + C2: Cursor, + C3: Cursor, L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>) { diff --git a/src/trace/cursor/cursor_list.rs b/src/trace/cursor/cursor_list.rs index a7adf8cc1..32e0dc5d5 100644 --- a/src/trace/cursor/cursor_list.rs +++ b/src/trace/cursor/cursor_list.rs @@ -7,15 +7,15 @@ use super::Cursor; /// The `CursorList` tracks the indices of cursors with the minimum key, and the the indices of cursors with /// the minimum key and minimum value. It performs no clever management of these sets otherwise. #[derive(Debug)] -pub struct CursorList { +pub struct CursorList { cursors: Vec, min_key: Vec, min_val: Vec, } -impl CursorList where C::Key: Ord, C::Val: Ord { +impl CursorList { /// Creates a new cursor list from pre-existing cursors. - pub fn new(cursors: Vec, storage: &[C::Storage]) -> Self { + pub fn new(cursors: Vec, storage: &[S]) -> Self where C: Cursor, C::Key: Ord, C::Val: Ord { let mut result = CursorList { cursors, @@ -36,7 +36,7 @@ impl CursorList where C::Key: Ord, C::Val: Ord { // // Once finished, it invokes `minimize_vals()` to ensure the value cursor is // in a consistent state as well. - fn minimize_keys(&mut self, storage: &[C::Storage]) { + fn minimize_keys(&mut self, storage: &[S]) where C: Cursor, C::Key: Ord, C::Val: Ord { self.min_key.clear(); @@ -64,7 +64,7 @@ impl CursorList where C::Key: Ord, C::Val: Ord { // indices of cursors whose value equals the minimum valid value seen so far. As it // goes, if it observes an improved value it clears the current list, updates the minimum // value, and continues. - fn minimize_vals(&mut self, storage: &[C::Storage]) { + fn minimize_vals(&mut self, storage: &[S]) where C: Cursor, C::Key: Ord, C::Val: Ord { self.min_val.clear(); @@ -85,7 +85,7 @@ impl CursorList where C::Key: Ord, C::Val: Ord { } } -impl Cursor for CursorList +impl> Cursor> for CursorList where C::Key: Ord, C::Val: Ord, @@ -95,30 +95,28 @@ where type Time = C::Time; type R = C::R; - type Storage = Vec; - // validation methods #[inline] - fn key_valid(&self, _storage: &Self::Storage) -> bool { !self.min_key.is_empty() } + fn key_valid(&self, _storage: &Vec) -> bool { !self.min_key.is_empty() } #[inline] - fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.min_val.is_empty() } + fn val_valid(&self, _storage: &Vec) -> bool { !self.min_val.is_empty() } // accessors #[inline] - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { + fn key<'a>(&self, storage: &'a Vec) -> &'a Self::Key { debug_assert!(self.key_valid(storage)); debug_assert!(self.cursors[self.min_key[0]].key_valid(&storage[self.min_key[0]])); self.cursors[self.min_key[0]].key(&storage[self.min_key[0]]) } #[inline] - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { + fn val<'a>(&self, storage: &'a Vec) -> &'a Self::Val { debug_assert!(self.key_valid(storage)); debug_assert!(self.val_valid(storage)); debug_assert!(self.cursors[self.min_val[0]].val_valid(&storage[self.min_val[0]])); self.cursors[self.min_val[0]].val(&storage[self.min_val[0]]) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &Vec, mut logic: L) { for &index in self.min_val.iter() { self.cursors[index].map_times(&storage[index], |t,d| logic(t,d)); } @@ -126,14 +124,14 @@ where // key methods #[inline] - fn step_key(&mut self, storage: &Self::Storage) { + fn step_key(&mut self, storage: &Vec) { for &index in self.min_key.iter() { self.cursors[index].step_key(&storage[index]); } self.minimize_keys(storage); } #[inline] - fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { + fn seek_key(&mut self, storage: &Vec, key: &Self::Key) { for index in 0 .. self.cursors.len() { self.cursors[index].seek_key(&storage[index], key); } @@ -142,14 +140,14 @@ where // value methods #[inline] - fn step_val(&mut self, storage: &Self::Storage) { + fn step_val(&mut self, storage: &Vec) { for &index in self.min_val.iter() { self.cursors[index].step_val(&storage[index]); } self.minimize_vals(storage); } #[inline] - fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { + fn seek_val(&mut self, storage: &Vec, val: &Self::Val) { for &index in self.min_key.iter() { self.cursors[index].seek_val(&storage[index], val); } @@ -158,14 +156,14 @@ where // rewinding methods #[inline] - fn rewind_keys(&mut self, storage: &Self::Storage) { + fn rewind_keys(&mut self, storage: &Vec) { for index in 0 .. self.cursors.len() { self.cursors[index].rewind_keys(&storage[index]); } self.minimize_keys(storage); } #[inline] - fn rewind_vals(&mut self, storage: &Self::Storage) { + fn rewind_vals(&mut self, storage: &Vec) { for &index in self.min_key.iter() { self.cursors[index].rewind_vals(&storage[index]); } diff --git a/src/trace/cursor/cursor_pair.rs b/src/trace/cursor/cursor_pair.rs index c34f03abb..bdbd8b79e 100644 --- a/src/trace/cursor/cursor_pair.rs +++ b/src/trace/cursor/cursor_pair.rs @@ -15,29 +15,27 @@ pub struct CursorPair { val_order: Ordering, // Invalid vals are `Greater` than all other vals. `Equal` implies both valid. } -impl Cursor for CursorPair +impl Cursor<(S1, S2)> for CursorPair where K: Ord, V: Ord, - C1: Cursor, - C2: Cursor, + C1: Cursor, + C2: Cursor, { type Key = K; type Val = V; type Time = T; type R = R; - type Storage = (C1::Storage, C2::Storage); - // validation methods - fn key_valid(&self, storage: &Self::Storage) -> bool { + fn key_valid(&self, storage: &(S1, S2)) -> bool { match self.key_order { Ordering::Less => self.cursor1.key_valid(&storage.0), Ordering::Equal => true, Ordering::Greater => self.cursor2.key_valid(&storage.1), } } - fn val_valid(&self, storage: &Self::Storage) -> bool { + fn val_valid(&self, storage: &(S1, S2)) -> bool { match (self.key_order, self.val_order) { (Ordering::Less, _) => self.cursor1.val_valid(&storage.0), (Ordering::Greater, _) => self.cursor2.val_valid(&storage.1), @@ -48,13 +46,13 @@ where } // accessors - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { + fn key<'a>(&self, storage: &'a (S1, S2)) -> &'a K { match self.key_order { Ordering::Less => self.cursor1.key(&storage.0), _ => self.cursor2.key(&storage.1), } } - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { + fn val<'a>(&self, storage: &'a (S1, S2)) -> &'a V { if self.key_order == Ordering::Less || (self.key_order == Ordering::Equal && self.val_order != Ordering::Greater) { self.cursor1.val(&storage.0) } @@ -62,7 +60,7 @@ where self.cursor2.val(&storage.1) } } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &(S1, S2), mut logic: L) { if self.key_order == Ordering::Less || (self.key_order == Ordering::Equal && self.val_order != Ordering::Greater) { self.cursor1.map_times(&storage.0, |t,d| logic(t,d)); } @@ -72,7 +70,7 @@ where } // key methods - fn step_key(&mut self, storage: &Self::Storage) { + fn step_key(&mut self, storage: &(S1, S2)) { if self.key_order != Ordering::Greater { self.cursor1.step_key(&storage.0); } if self.key_order != Ordering::Less { self.cursor2.step_key(&storage.1); } @@ -83,7 +81,7 @@ where (true, true) => self.cursor1.key(&storage.0).cmp(self.cursor2.key(&storage.1)), }; } - fn seek_key(&mut self, storage: &Self::Storage, key: &K) { + fn seek_key(&mut self, storage: &(S1, S2), key: &K) { self.cursor1.seek_key(&storage.0, key); self.cursor2.seek_key(&storage.1, key); @@ -96,7 +94,7 @@ where } // value methods - fn step_val(&mut self, storage: &Self::Storage) { + fn step_val(&mut self, storage: &(S1, S2)) { match self.key_order { Ordering::Less => self.cursor1.step_val(&storage.0), Ordering::Equal => { @@ -111,7 +109,7 @@ where Ordering::Greater => self.cursor2.step_val(&storage.1), } } - fn seek_val(&mut self, storage: &Self::Storage, val: &V) { + fn seek_val(&mut self, storage: &(S1, S2), val: &V) { match self.key_order { Ordering::Less => self.cursor1.seek_val(&storage.0, val), Ordering::Equal => { @@ -128,11 +126,11 @@ where } // rewinding methods - fn rewind_keys(&mut self, storage: &Self::Storage) { + fn rewind_keys(&mut self, storage: &(S1, S2)) { self.cursor1.rewind_keys(&storage.0); self.cursor2.rewind_keys(&storage.1); } - fn rewind_vals(&mut self, storage: &Self::Storage) { + fn rewind_vals(&mut self, storage: &(S1, S2)) { if self.key_order != Ordering::Greater { self.cursor1.rewind_vals(&storage.0); } if self.key_order != Ordering::Less { self.cursor2.rewind_vals(&storage.1); } } diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index a71ca2dad..ed11050c6 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -12,7 +12,7 @@ pub mod cursor_list; pub use self::cursor_list::CursorList; /// A cursor for navigating ordered `(key, val, time, diff)` updates. -pub trait Cursor { +pub trait Cursor { /// Key by which updates are indexed. type Key; @@ -23,53 +23,50 @@ pub trait Cursor { /// Associated update. type R; - /// Type the cursor addresses data in. - type Storage; - /// Indicates if the current key is valid. /// /// A value of `false` indicates that the cursor has exhausted all keys. - fn key_valid(&self, storage: &Self::Storage) -> bool; + fn key_valid(&self, storage: &Storage) -> bool; /// Indicates if the current value is valid. /// /// A value of `false` indicates that the cursor has exhausted all values for this key. - fn val_valid(&self, storage: &Self::Storage) -> bool; + fn val_valid(&self, storage: &Storage) -> bool; /// A reference to the current key. Asserts if invalid. - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key; + fn key<'a>(&self, storage: &'a Storage) -> &'a Self::Key; /// A reference to the current value. Asserts if invalid. - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val; + fn val<'a>(&self, storage: &'a Storage) -> &'a Self::Val; /// Returns a reference to the current key, if valid. - fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<&'a Self::Key> { + fn get_key<'a>(&self, storage: &'a Storage) -> Option<&'a Self::Key> { if self.key_valid(storage) { Some(self.key(storage)) } else { None } } /// Returns a reference to the current value, if valid. - fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<&'a Self::Val> { + fn get_val<'a>(&self, storage: &'a Storage) -> Option<&'a Self::Val> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } } /// Applies `logic` to each pair of time and difference. Intended for mutation of the /// closure's scope. - fn map_times(&mut self, storage: &Self::Storage, logic: L); + fn map_times(&mut self, storage: &Storage, logic: L); /// Advances the cursor to the next key. - fn step_key(&mut self, storage: &Self::Storage); + fn step_key(&mut self, storage: &Storage); /// Advances the cursor to the specified key. - fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key); + fn seek_key(&mut self, storage: &Storage, key: &Self::Key); /// Advances the cursor to the next value. - fn step_val(&mut self, storage: &Self::Storage); + fn step_val(&mut self, storage: &Storage); /// Advances the cursor to the specified value. - fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val); + fn seek_val(&mut self, storage: &Storage, val: &Self::Val); /// Rewinds the cursor to the first key. - fn rewind_keys(&mut self, storage: &Self::Storage); + fn rewind_keys(&mut self, storage: &Storage); /// Rewinds the cursor to the first value for current key. - fn rewind_vals(&mut self, storage: &Self::Storage); + fn rewind_vals(&mut self, storage: &Storage); /// Rewinds the cursor and outputs its contents to a Vec - fn to_vec(&mut self, storage: &Self::Storage) -> Vec<((Self::Key, Self::Val), Vec<(Self::Time, Self::R)>)> + fn to_vec(&mut self, storage: &Storage) -> Vec<((Self::Key, Self::Val), Vec<(Self::Time, Self::R)>)> where Self::Key: Clone, Self::Val: Clone, diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 28272588a..b9a55a49d 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -309,31 +309,31 @@ pub struct OrdValCursor { cursor: OrderedCursor>, } -impl Cursor for OrdValCursor { +impl Cursor> for OrdValCursor { type Key = ::Key; type Val = ::Val; type Time = ::Time; type R = ::Diff; - type Storage = OrdValBatch; + // type Storage = OrdValBatch; - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &self.cursor.key(&storage.layer) } - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { &self.cursor.child.key(&storage.layer.vals) } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { + fn key<'a>(&self, storage: &'a OrdValBatch) -> &'a Self::Key { &self.cursor.key(&storage.layer) } + fn val<'a>(&self, storage: &'a OrdValBatch) -> &'a Self::Val { &self.cursor.child.key(&storage.layer.vals) } + fn map_times(&mut self, storage: &OrdValBatch, mut logic: L2) { self.cursor.child.child.rewind(&storage.layer.vals.vals); while self.cursor.child.child.valid(&storage.layer.vals.vals) { logic(&self.cursor.child.child.key(&storage.layer.vals.vals).0, &self.cursor.child.child.key(&storage.layer.vals.vals).1); self.cursor.child.child.step(&storage.layer.vals.vals); } } - fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.valid(&storage.layer) } - fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.child.valid(&storage.layer.vals) } - fn step_key(&mut self, storage: &Self::Storage){ self.cursor.step(&storage.layer); } - fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek(&storage.layer, key); } - fn step_val(&mut self, storage: &Self::Storage) { self.cursor.child.step(&storage.layer.vals); } - fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.child.seek(&storage.layer.vals, val); } - fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind(&storage.layer); } - fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.child.rewind(&storage.layer.vals); } + fn key_valid(&self, storage: &OrdValBatch) -> bool { self.cursor.valid(&storage.layer) } + fn val_valid(&self, storage: &OrdValBatch) -> bool { self.cursor.child.valid(&storage.layer.vals) } + fn step_key(&mut self, storage: &OrdValBatch){ self.cursor.step(&storage.layer); } + fn seek_key(&mut self, storage: &OrdValBatch, key: &Self::Key) { self.cursor.seek(&storage.layer, key); } + fn step_val(&mut self, storage: &OrdValBatch) { self.cursor.child.step(&storage.layer.vals); } + fn seek_val(&mut self, storage: &OrdValBatch, val: &Self::Val) { self.cursor.child.seek(&storage.layer.vals, val); } + fn rewind_keys(&mut self, storage: &OrdValBatch) { self.cursor.rewind(&storage.layer); } + fn rewind_vals(&mut self, storage: &OrdValBatch) { self.cursor.child.rewind(&storage.layer.vals); } } /// A builder for creating layers from unsorted update tuples. @@ -607,31 +607,29 @@ pub struct OrdKeyCursor { phantom: PhantomData<(L, C)>, } -impl Cursor for OrdKeyCursor { +impl Cursor> for OrdKeyCursor { type Key = ::Key; type Val = (); type Time = ::Time; type R = ::Diff; - type Storage = OrdKeyBatch; - - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &self.cursor.key(&storage.layer) } - fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { + fn key<'a>(&self, storage: &'a OrdKeyBatch) -> &'a Self::Key { &self.cursor.key(&storage.layer) } + fn val<'a>(&self, _storage: &'a OrdKeyBatch) -> &'a () { &() } + fn map_times(&mut self, storage: &OrdKeyBatch, mut logic: L2) { self.cursor.child.rewind(&storage.layer.vals); while self.cursor.child.valid(&storage.layer.vals) { logic(&self.cursor.child.key(&storage.layer.vals).0, &self.cursor.child.key(&storage.layer.vals).1); self.cursor.child.step(&storage.layer.vals); } } - fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.valid(&storage.layer) } - fn val_valid(&self, _storage: &Self::Storage) -> bool { self.valid } - fn step_key(&mut self, storage: &Self::Storage){ self.cursor.step(&storage.layer); self.valid = true; } - fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek(&storage.layer, key); self.valid = true; } - fn step_val(&mut self, _storage: &Self::Storage) { self.valid = false; } - fn seek_val(&mut self, _storage: &Self::Storage, _val: &()) { } - fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind(&storage.layer); self.valid = true; } - fn rewind_vals(&mut self, _storage: &Self::Storage) { self.valid = true; } + fn key_valid(&self, storage: &OrdKeyBatch) -> bool { self.cursor.valid(&storage.layer) } + fn val_valid(&self, _storage: &OrdKeyBatch) -> bool { self.valid } + fn step_key(&mut self, storage: &OrdKeyBatch){ self.cursor.step(&storage.layer); self.valid = true; } + fn seek_key(&mut self, storage: &OrdKeyBatch, key: &Self::Key) { self.cursor.seek(&storage.layer, key); self.valid = true; } + fn step_val(&mut self, _storage: &OrdKeyBatch) { self.valid = false; } + fn seek_val(&mut self, _storage: &OrdKeyBatch, _val: &()) { } + fn rewind_keys(&mut self, storage: &OrdKeyBatch) { self.cursor.rewind(&storage.layer); self.valid = true; } + fn rewind_vals(&mut self, _storage: &OrdKeyBatch) { self.valid = true; } } diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 03891f8c2..2a9344ea7 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -396,26 +396,24 @@ mod val_batch { phantom: std::marker::PhantomData, } - impl Cursor for OrdValCursor { + impl Cursor> for OrdValCursor { type Key = ::Key; type Val = ::Val; type Time = ::Time; type R = ::Diff; - type Storage = OrdValBatch; - - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &storage.storage.keys.index(self.key_cursor.try_into().ok().unwrap()) } - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { &storage.storage.vals.index(self.val_cursor.try_into().ok().unwrap()) } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { + fn key<'a>(&self, storage: &'a OrdValBatch) -> &'a Self::Key { &storage.storage.keys.index(self.key_cursor.try_into().ok().unwrap()) } + fn val<'a>(&self, storage: &'a OrdValBatch) -> &'a Self::Val { &storage.storage.vals.index(self.val_cursor.try_into().ok().unwrap()) } + fn map_times(&mut self, storage: &OrdValBatch, mut logic: L2) { let (lower, upper) = storage.storage.updates_for_value(self.val_cursor); for index in lower .. upper { let (time, diff) = &storage.storage.updates.index(index); logic(time, diff); } } - fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() } - fn val_valid(&self, storage: &Self::Storage) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 } - fn step_key(&mut self, storage: &Self::Storage){ + fn key_valid(&self, storage: &OrdValBatch) -> bool { self.key_cursor < storage.storage.keys.len() } + fn val_valid(&self, storage: &OrdValBatch) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 } + fn step_key(&mut self, storage: &OrdValBatch){ self.key_cursor += 1; if self.key_valid(storage) { self.rewind_vals(storage); @@ -424,28 +422,28 @@ mod val_batch { self.key_cursor = storage.storage.keys.len(); } } - fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { + fn seek_key(&mut self, storage: &OrdValBatch, key: &Self::Key) { self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key)); if self.key_valid(storage) { self.rewind_vals(storage); } } - fn step_val(&mut self, storage: &Self::Storage) { + fn step_val(&mut self, storage: &OrdValBatch) { self.val_cursor += 1; if !self.val_valid(storage) { self.val_cursor = storage.storage.values_for_key(self.key_cursor).1; } } - fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { + fn seek_val(&mut self, storage: &OrdValBatch, val: &Self::Val) { self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(val)); } - fn rewind_keys(&mut self, storage: &Self::Storage) { + fn rewind_keys(&mut self, storage: &OrdValBatch) { self.key_cursor = 0; if self.key_valid(storage) { self.rewind_vals(storage) } } - fn rewind_vals(&mut self, storage: &Self::Storage) { + fn rewind_vals(&mut self, storage: &OrdValBatch) { self.val_cursor = storage.storage.values_for_key(self.key_cursor).0; } } diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index f1c5f538b..2246298af 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -75,7 +75,7 @@ use ::logging::Logger; use ::difference::Semigroup; use lattice::Lattice; use trace::{Batch, BatchReader, Trace, TraceReader, ExertionLogic}; -use trace::cursor::{Cursor, CursorList}; +use trace::cursor::CursorList; use trace::Merger; use ::timely::dataflow::operators::generic::OperatorInfo; @@ -115,9 +115,10 @@ where type R = B::R; type Batch = B; + type Storage = Vec; type Cursor = CursorList<::Cursor>; - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, ::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, Self::Storage)> { // If `upper` is the minimum frontier, we can return an empty cursor. // This can happen with operators that are written to expect the ability to acquire cursors diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 46e70add5..da353094f 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -58,11 +58,14 @@ pub trait TraceReader { /// The type of an immutable collection of updates. type Batch: BatchReader+Clone+'static; + /// Storage type for `Self::Cursor`. Likely related to `Self::Batch`. + type Storage; + /// The type used to enumerate the collections contents. - type Cursor: Cursor; + type Cursor: Cursor; /// Provides a cursor over updates contained in the trace. - fn cursor(&mut self) -> (Self::Cursor, ::Storage) { + fn cursor(&mut self) -> (Self::Cursor, Self::Storage) { if let Some(cursor) = self.cursor_through(Antichain::new().borrow()) { cursor } @@ -78,7 +81,7 @@ pub trait TraceReader { /// the trace, and (ii) the trace has not been advanced beyond `upper`. Practically, the implementation should /// be expected to look for a "clean cut" using `upper`, and if it finds such a cut can return a cursor. This /// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses. - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, ::Storage)>; + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, Self::Storage)>; /// Advances the frontier that constrains logical compaction. /// @@ -258,7 +261,7 @@ where type R; /// The type used to enumerate the batch's contents. - type Cursor: Cursor; + type Cursor: Cursor; /// Acquires a cursor to the batch's contents. fn cursor(&self) -> Self::Cursor; /// The number of updates in the batch. @@ -392,34 +395,32 @@ pub mod rc_blanket_impls { } } - impl Cursor for RcBatchCursor { + impl Cursor> for RcBatchCursor { type Key = B::Key; type Val = B::Val; type Time = B::Time; type R = B::R; - type Storage = Rc; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } + #[inline] fn key_valid(&self, storage: &Rc) -> bool { self.cursor.key_valid(storage) } + #[inline] fn val_valid(&self, storage: &Rc) -> bool { self.cursor.val_valid(storage) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } + #[inline] fn key<'a>(&self, storage: &'a Rc) -> &'a Self::Key { self.cursor.key(storage) } + #[inline] fn val<'a>(&self, storage: &'a Rc) -> &'a Self::Val { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { + fn map_times(&mut self, storage: &Rc, logic: L) { self.cursor.map_times(storage, logic) } - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } + #[inline] fn step_key(&mut self, storage: &Rc) { self.cursor.step_key(storage) } + #[inline] fn seek_key(&mut self, storage: &Rc, key: &Self::Key) { self.cursor.seek_key(storage, key) } - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } + #[inline] fn step_val(&mut self, storage: &Rc) { self.cursor.step_val(storage) } + #[inline] fn seek_val(&mut self, storage: &Rc, val: &Self::Val) { self.cursor.seek_val(storage, val) } - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } + #[inline] fn rewind_keys(&mut self, storage: &Rc) { self.cursor.rewind_keys(storage) } + #[inline] fn rewind_vals(&mut self, storage: &Rc) { self.cursor.rewind_vals(storage) } } /// An immutable collection of updates. @@ -509,34 +510,32 @@ pub mod abomonated_blanket_impls { } } - impl Cursor for AbomonatedBatchCursor { + impl Cursor>> for AbomonatedBatchCursor { type Key = B::Key; type Val = B::Val; type Time = B::Time; type R = B::R; - type Storage = Abomonated>; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } + #[inline] fn key_valid(&self, storage: &Abomonated>) -> bool { self.cursor.key_valid(storage) } + #[inline] fn val_valid(&self, storage: &Abomonated>) -> bool { self.cursor.val_valid(storage) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } + #[inline] fn key<'a>(&self, storage: &'a Abomonated>) -> &'a Self::Key { self.cursor.key(storage) } + #[inline] fn val<'a>(&self, storage: &'a Abomonated>) -> &'a Self::Val { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { + fn map_times(&mut self, storage: &Abomonated>, logic: L) { self.cursor.map_times(storage, logic) } - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } + #[inline] fn step_key(&mut self, storage: &Abomonated>) { self.cursor.step_key(storage) } + #[inline] fn seek_key(&mut self, storage: &Abomonated>, key: &Self::Key) { self.cursor.seek_key(storage, key) } - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } + #[inline] fn step_val(&mut self, storage: &Abomonated>) { self.cursor.step_val(storage) } + #[inline] fn seek_val(&mut self, storage: &Abomonated>, val: &Self::Val) { self.cursor.seek_val(storage, val) } - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } + #[inline] fn rewind_keys(&mut self, storage: &Abomonated>) { self.cursor.rewind_keys(storage) } + #[inline] fn rewind_vals(&mut self, storage: &Abomonated>) { self.cursor.rewind_vals(storage) } } /// An immutable collection of updates. diff --git a/src/trace/wrappers/enter.rs b/src/trace/wrappers/enter.rs index 1b4316dc4..9ed2d439d 100644 --- a/src/trace/wrappers/enter.rs +++ b/src/trace/wrappers/enter.rs @@ -48,6 +48,7 @@ where type R = Tr::R; type Batch = BatchEnter; + type Storage = Tr::Storage; type Cursor = CursorEnter; fn map_batches(&self, mut f: F) { @@ -86,7 +87,7 @@ where self.stash2.borrow() } - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, ::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, Self::Storage)> { self.stash1.clear(); for time in upper.iter() { self.stash1.insert(time.clone().to_outer()); @@ -159,12 +160,12 @@ where } /// Wrapper to provide cursor to nested scope. -pub struct CursorEnter { +pub struct CursorEnter { phantom: ::std::marker::PhantomData, cursor: C, } -impl CursorEnter { +impl CursorEnter { fn new(cursor: C) -> Self { CursorEnter { phantom: ::std::marker::PhantomData, @@ -173,9 +174,9 @@ impl CursorEnter { } } -impl Cursor for CursorEnter +impl Cursor for CursorEnter where - C: Cursor, + C: Cursor, C::Time: Timestamp, TInner: Refines+Lattice, { @@ -184,29 +185,27 @@ where type Time = TInner; type R = C::R; - type Storage = C::Storage; + #[inline] fn key_valid(&self, storage: &S) -> bool { self.cursor.key_valid(storage) } + #[inline] fn val_valid(&self, storage: &S) -> bool { self.cursor.val_valid(storage) } - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } + #[inline] fn key<'a>(&self, storage: &'a S) -> &'a Self::Key { self.cursor.key(storage) } + #[inline] fn val<'a>(&self, storage: &'a S) -> &'a Self::Val { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &S, mut logic: L) { self.cursor.map_times(storage, |time, diff| { logic(&TInner::to_inner(time.clone()), diff) }) } - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } + #[inline] fn step_key(&mut self, storage: &S) { self.cursor.step_key(storage) } + #[inline] fn seek_key(&mut self, storage: &S, key: &Self::Key) { self.cursor.seek_key(storage, key) } - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } + #[inline] fn step_val(&mut self, storage: &S) { self.cursor.step_val(storage) } + #[inline] fn seek_val(&mut self, storage: &S, val: &Self::Val) { self.cursor.seek_val(storage, val) } - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } + #[inline] fn rewind_keys(&mut self, storage: &S) { self.cursor.rewind_keys(storage) } + #[inline] fn rewind_vals(&mut self, storage: &S) { self.cursor.rewind_vals(storage) } } @@ -226,7 +225,7 @@ impl BatchCursorEnter { } } -impl Cursor for BatchCursorEnter +impl Cursor> for BatchCursorEnter where B::Time: Timestamp, TInner: Refines+Lattice, @@ -236,27 +235,25 @@ where type Time = TInner; type R = B::R; - type Storage = BatchEnter; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } + #[inline] fn key_valid(&self, storage: &BatchEnter) -> bool { self.cursor.key_valid(&storage.batch) } + #[inline] fn val_valid(&self, storage: &BatchEnter) -> bool { self.cursor.val_valid(&storage.batch) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(&storage.batch) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(&storage.batch) } + #[inline] fn key<'a>(&self, storage: &'a BatchEnter) -> &'a Self::Key { self.cursor.key(&storage.batch) } + #[inline] fn val<'a>(&self, storage: &'a BatchEnter) -> &'a Self::Val { self.cursor.val(&storage.batch) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &BatchEnter, mut logic: L) { self.cursor.map_times(&storage.batch, |time, diff| { logic(&TInner::to_inner(time.clone()), diff) }) } - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(&storage.batch, key) } + #[inline] fn step_key(&mut self, storage: &BatchEnter) { self.cursor.step_key(&storage.batch) } + #[inline] fn seek_key(&mut self, storage: &BatchEnter, key: &Self::Key) { self.cursor.seek_key(&storage.batch, key) } - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(&storage.batch, val) } + #[inline] fn step_val(&mut self, storage: &BatchEnter) { self.cursor.step_val(&storage.batch) } + #[inline] fn seek_val(&mut self, storage: &BatchEnter, val: &Self::Val) { self.cursor.seek_val(&storage.batch, val) } - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) } + #[inline] fn rewind_keys(&mut self, storage: &BatchEnter) { self.cursor.rewind_keys(&storage.batch) } + #[inline] fn rewind_vals(&mut self, storage: &BatchEnter) { self.cursor.rewind_vals(&storage.batch) } } diff --git a/src/trace/wrappers/enter_at.rs b/src/trace/wrappers/enter_at.rs index e5c5419ed..027817bce 100644 --- a/src/trace/wrappers/enter_at.rs +++ b/src/trace/wrappers/enter_at.rs @@ -63,6 +63,7 @@ where type R = Tr::R; type Batch = BatchEnter; + type Storage = Tr::Storage; type Cursor = CursorEnter; fn map_batches(&self, mut f: F2) { @@ -102,7 +103,7 @@ where self.stash2.borrow() } - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, ::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, Self::Storage)> { self.stash1.clear(); for time in upper.iter() { self.stash1.insert(time.clone().to_outer()); @@ -180,13 +181,13 @@ where } /// Wrapper to provide cursor to nested scope. -pub struct CursorEnter { +pub struct CursorEnter { phantom: ::std::marker::PhantomData, cursor: C, logic: F, } -impl CursorEnter { +impl CursorEnter { fn new(cursor: C, logic: F) -> Self { CursorEnter { phantom: ::std::marker::PhantomData, @@ -196,9 +197,9 @@ impl CursorEnter { } } -impl Cursor for CursorEnter +impl Cursor for CursorEnter where - C: Cursor, + C: Cursor, C::Time: Timestamp, TInner: Refines+Lattice, F: FnMut(&C::Key, &C::Val, &C::Time)->TInner, @@ -208,16 +209,14 @@ where type Time = TInner; type R = C::R; - type Storage = C::Storage; + #[inline] fn key_valid(&self, storage: &S) -> bool { self.cursor.key_valid(storage) } + #[inline] fn val_valid(&self, storage: &S) -> bool { self.cursor.val_valid(storage) } - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } + #[inline] fn key<'a>(&self, storage: &'a S) -> &'a Self::Key { self.cursor.key(storage) } + #[inline] fn val<'a>(&self, storage: &'a S) -> &'a Self::Val { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &S, mut logic: L) { let key = self.key(storage); let val = self.val(storage); let logic2 = &mut self.logic; @@ -226,14 +225,14 @@ where }) } - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } + #[inline] fn step_key(&mut self, storage: &S) { self.cursor.step_key(storage) } + #[inline] fn seek_key(&mut self, storage: &S, key: &Self::Key) { self.cursor.seek_key(storage, key) } - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } + #[inline] fn step_val(&mut self, storage: &S) { self.cursor.step_val(storage) } + #[inline] fn seek_val(&mut self, storage: &S, val: &Self::Val) { self.cursor.seek_val(storage, val) } - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } + #[inline] fn rewind_keys(&mut self, storage: &S) { self.cursor.rewind_keys(storage) } + #[inline] fn rewind_vals(&mut self, storage: &S) { self.cursor.rewind_vals(storage) } } @@ -255,7 +254,7 @@ impl BatchCursorEnter { } } -impl Cursor for BatchCursorEnter +impl Cursor> for BatchCursorEnter where B::Time: Timestamp, TInner: Refines+Lattice, @@ -266,16 +265,14 @@ where type Time = TInner; type R = B::R; - type Storage = BatchEnter; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } + #[inline] fn key_valid(&self, storage: &BatchEnter) -> bool { self.cursor.key_valid(&storage.batch) } + #[inline] fn val_valid(&self, storage: &BatchEnter) -> bool { self.cursor.val_valid(&storage.batch) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(&storage.batch) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(&storage.batch) } + #[inline] fn key<'a>(&self, storage: &'a BatchEnter) -> &'a Self::Key { self.cursor.key(&storage.batch) } + #[inline] fn val<'a>(&self, storage: &'a BatchEnter) -> &'a Self::Val { self.cursor.val(&storage.batch) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &BatchEnter, mut logic: L) { let key = self.key(storage); let val = self.val(storage); let logic2 = &mut self.logic; @@ -284,12 +281,12 @@ where }) } - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(&storage.batch, key) } + #[inline] fn step_key(&mut self, storage: &BatchEnter) { self.cursor.step_key(&storage.batch) } + #[inline] fn seek_key(&mut self, storage: &BatchEnter, key: &Self::Key) { self.cursor.seek_key(&storage.batch, key) } - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(&storage.batch, val) } + #[inline] fn step_val(&mut self, storage: &BatchEnter) { self.cursor.step_val(&storage.batch) } + #[inline] fn seek_val(&mut self, storage: &BatchEnter, val: &Self::Val) { self.cursor.seek_val(&storage.batch, val) } - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) } + #[inline] fn rewind_keys(&mut self, storage: &BatchEnter) { self.cursor.rewind_keys(&storage.batch) } + #[inline] fn rewind_vals(&mut self, storage: &BatchEnter) { self.cursor.rewind_vals(&storage.batch) } } diff --git a/src/trace/wrappers/filter.rs b/src/trace/wrappers/filter.rs index 3fdf29603..64ff943b9 100644 --- a/src/trace/wrappers/filter.rs +++ b/src/trace/wrappers/filter.rs @@ -41,6 +41,7 @@ where type R = Tr::R; type Batch = BatchFilter; + type Storage = Tr::Storage; type Cursor = CursorFilter; fn map_batches(&self, mut f: F2) { @@ -55,7 +56,7 @@ where fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_physical_compaction(frontier) } fn get_physical_compaction(&mut self) -> AntichainRef { self.trace.get_physical_compaction() } - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, ::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, Self::Storage)> { self.trace.cursor_through(upper).map(|(x,y)| (CursorFilter::new(x, self.logic.clone()), y)) } } @@ -117,12 +118,12 @@ where } /// Wrapper to provide cursor to nested scope. -pub struct CursorFilter { +pub struct CursorFilter { cursor: C, logic: F, } -impl CursorFilter { +impl CursorFilter { fn new(cursor: C, logic: F) -> Self { CursorFilter { cursor, @@ -131,9 +132,9 @@ impl CursorFilter { } } -impl Cursor for CursorFilter +impl Cursor for CursorFilter where - C: Cursor, + C: Cursor, C::Time: Timestamp, F: FnMut(&C::Key, &C::Val)->bool+'static { @@ -142,16 +143,14 @@ where type Time = C::Time; type R = C::R; - type Storage = C::Storage; + #[inline] fn key_valid(&self, storage: &S) -> bool { self.cursor.key_valid(storage) } + #[inline] fn val_valid(&self, storage: &S) -> bool { self.cursor.val_valid(storage) } - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } + #[inline] fn key<'a>(&self, storage: &'a S) -> &'a Self::Key { self.cursor.key(storage) } + #[inline] fn val<'a>(&self, storage: &'a S) -> &'a Self::Val { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { + fn map_times(&mut self, storage: &S, logic: L) { let key = self.key(storage); let val = self.val(storage); if (self.logic)(key, val) { @@ -159,14 +158,14 @@ where } } - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } + #[inline] fn step_key(&mut self, storage: &S) { self.cursor.step_key(storage) } + #[inline] fn seek_key(&mut self, storage: &S, key: &Self::Key) { self.cursor.seek_key(storage, key) } - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } + #[inline] fn step_val(&mut self, storage: &S) { self.cursor.step_val(storage) } + #[inline] fn seek_val(&mut self, storage: &S, val: &Self::Val) { self.cursor.seek_val(storage, val) } - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } + #[inline] fn rewind_keys(&mut self, storage: &S) { self.cursor.rewind_keys(storage) } + #[inline] fn rewind_vals(&mut self, storage: &S) { self.cursor.rewind_vals(storage) } } @@ -186,7 +185,7 @@ impl BatchCursorFilter { } } -impl Cursor for BatchCursorFilter +impl Cursor> for BatchCursorFilter where B::Time: Timestamp, F: FnMut(&B::Key, &B::Val)->bool+'static, @@ -196,16 +195,14 @@ where type Time = B::Time; type R = B::R; - type Storage = BatchFilter; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } + #[inline] fn key_valid(&self, storage: &BatchFilter) -> bool { self.cursor.key_valid(&storage.batch) } + #[inline] fn val_valid(&self, storage: &BatchFilter) -> bool { self.cursor.val_valid(&storage.batch) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(&storage.batch) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(&storage.batch) } + #[inline] fn key<'a>(&self, storage: &'a BatchFilter) -> &'a Self::Key { self.cursor.key(&storage.batch) } + #[inline] fn val<'a>(&self, storage: &'a BatchFilter) -> &'a Self::Val { self.cursor.val(&storage.batch) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { + fn map_times(&mut self, storage: &BatchFilter, logic: L) { let key = self.key(storage); let val = self.val(storage); if (self.logic)(key, val) { @@ -213,12 +210,12 @@ where } } - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(&storage.batch, key) } + #[inline] fn step_key(&mut self, storage: &BatchFilter) { self.cursor.step_key(&storage.batch) } + #[inline] fn seek_key(&mut self, storage: &BatchFilter, key: &Self::Key) { self.cursor.seek_key(&storage.batch, key) } - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(&storage.batch, val) } + #[inline] fn step_val(&mut self, storage: &BatchFilter) { self.cursor.step_val(&storage.batch) } + #[inline] fn seek_val(&mut self, storage: &BatchFilter, val: &Self::Val) { self.cursor.seek_val(&storage.batch, val) } - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) } + #[inline] fn rewind_keys(&mut self, storage: &BatchFilter) { self.cursor.rewind_keys(&storage.batch) } + #[inline] fn rewind_vals(&mut self, storage: &BatchFilter) { self.cursor.rewind_vals(&storage.batch) } } diff --git a/src/trace/wrappers/freeze.rs b/src/trace/wrappers/freeze.rs index ed5965266..17f1dfb02 100644 --- a/src/trace/wrappers/freeze.rs +++ b/src/trace/wrappers/freeze.rs @@ -92,6 +92,7 @@ where type R = Tr::R; type Batch = BatchFreeze; + type Storage = Tr::Storage; type Cursor = CursorFreeze; fn map_batches(&self, mut f: F2) { @@ -107,7 +108,7 @@ where fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_physical_compaction(frontier) } fn get_physical_compaction(&mut self) -> AntichainRef { self.trace.get_physical_compaction() } - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, ::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, Self::Storage)> { let func = &self.func; self.trace.cursor_through(upper) .map(|(cursor, storage)| (CursorFreeze::new(cursor, func.clone()), storage)) @@ -185,12 +186,12 @@ where } /// Wrapper to provide cursor to nested scope. -pub struct CursorFreeze { +pub struct CursorFreeze { cursor: C, func: Rc, } -impl CursorFreeze { +impl CursorFreeze { fn new(cursor: C, func: Rc) -> Self { CursorFreeze { cursor: cursor, @@ -199,9 +200,9 @@ impl CursorFreeze { } } -impl Cursor for CursorFreeze +impl Cursor for CursorFreeze where - C: Cursor, + C: Cursor, F: Fn(&C::Time)->Option, { type Key = C::Key; @@ -209,15 +210,13 @@ where type Time = C::Time; type R = C::R; - type Storage = C::Storage; + #[inline] fn key_valid(&self, storage: &S) -> bool { self.cursor.key_valid(storage) } + #[inline] fn val_valid(&self, storage: &S) -> bool { self.cursor.val_valid(storage) } - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } + #[inline] fn key<'a>(&self, storage: &'a S) -> &'a Self::Key { self.cursor.key(storage) } + #[inline] fn val<'a>(&self, storage: &'a S) -> &'a Self::Val { self.cursor.val(storage) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } - - #[inline] fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + #[inline] fn map_times(&mut self, storage: &S, mut logic: L) { let func = &self.func; self.cursor.map_times(storage, |time, diff| { if let Some(time) = func(time) { @@ -226,14 +225,14 @@ where }) } - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } + #[inline] fn step_key(&mut self, storage: &S) { self.cursor.step_key(storage) } + #[inline] fn seek_key(&mut self, storage: &S, key: &Self::Key) { self.cursor.seek_key(storage, key) } - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } + #[inline] fn step_val(&mut self, storage: &S) { self.cursor.step_val(storage) } + #[inline] fn seek_val(&mut self, storage: &S, val: &Self::Val) { self.cursor.seek_val(storage, val) } - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } + #[inline] fn rewind_keys(&mut self, storage: &S) { self.cursor.rewind_keys(storage) } + #[inline] fn rewind_vals(&mut self, storage: &S) { self.cursor.rewind_vals(storage) } } @@ -252,7 +251,7 @@ impl BatchCursorFreeze { } } -impl Cursor for BatchCursorFreeze +impl Cursor> for BatchCursorFreeze where F: Fn(&B::Time)->Option, { @@ -261,15 +260,13 @@ where type Time = B::Time; type R = B::R; - type Storage = BatchFreeze; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } + #[inline] fn key_valid(&self, storage: &BatchFreeze) -> bool { self.cursor.key_valid(&storage.batch) } + #[inline] fn val_valid(&self, storage: &BatchFreeze) -> bool { self.cursor.val_valid(&storage.batch) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(&storage.batch) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(&storage.batch) } + #[inline] fn key<'a>(&self, storage: &'a BatchFreeze) -> &'a Self::Key { self.cursor.key(&storage.batch) } + #[inline] fn val<'a>(&self, storage: &'a BatchFreeze) -> &'a Self::Val { self.cursor.val(&storage.batch) } - #[inline] fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + #[inline] fn map_times(&mut self, storage: &BatchFreeze, mut logic: L) { let func = &self.func; self.cursor.map_times(&storage.batch, |time, diff| { if let Some(time) = func(time) { @@ -278,12 +275,12 @@ where }) } - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(&storage.batch, key) } + #[inline] fn step_key(&mut self, storage: &BatchFreeze) { self.cursor.step_key(&storage.batch) } + #[inline] fn seek_key(&mut self, storage: &BatchFreeze, key: &Self::Key) { self.cursor.seek_key(&storage.batch, key) } - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(&storage.batch, val) } + #[inline] fn step_val(&mut self, storage: &BatchFreeze) { self.cursor.step_val(&storage.batch) } + #[inline] fn seek_val(&mut self, storage: &BatchFreeze, val: &Self::Val) { self.cursor.seek_val(&storage.batch, val) } - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) } + #[inline] fn rewind_keys(&mut self, storage: &BatchFreeze) { self.cursor.rewind_keys(&storage.batch) } + #[inline] fn rewind_vals(&mut self, storage: &BatchFreeze) { self.cursor.rewind_vals(&storage.batch) } } diff --git a/src/trace/wrappers/frontier.rs b/src/trace/wrappers/frontier.rs index cb0865c49..54be6fade 100644 --- a/src/trace/wrappers/frontier.rs +++ b/src/trace/wrappers/frontier.rs @@ -54,7 +54,8 @@ where type R = Tr::R; type Batch = BatchFrontier; - type Cursor = CursorFrontier; + type Storage = Tr::Storage; + type Cursor = CursorFrontier; fn map_batches(&self, mut f: F) { let since = self.since.borrow(); @@ -68,7 +69,7 @@ where fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_physical_compaction(frontier) } fn get_physical_compaction(&mut self) -> AntichainRef { self.trace.get_physical_compaction() } - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, ::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, Self::Storage)> { let since = self.since.borrow(); let until = self.until.borrow(); self.trace.cursor_through(upper).map(|(x,y)| (CursorFrontier::new(x, since, until), y)) @@ -134,14 +135,14 @@ where } /// Wrapper to provide cursor to nested scope. -pub struct CursorFrontier { +pub struct CursorFrontier { cursor: C, - since: Antichain, - until: Antichain + since: Antichain, + until: Antichain } -impl CursorFrontier where C::Time: Clone { - fn new(cursor: C, since: AntichainRef, until: AntichainRef) -> Self { +impl CursorFrontier where T: Clone { + fn new(cursor: C, since: AntichainRef, until: AntichainRef) -> Self { CursorFrontier { cursor, since: since.to_owned(), @@ -150,26 +151,24 @@ impl CursorFrontier where C::Time: Clone { } } -impl Cursor for CursorFrontier +impl Cursor for CursorFrontier where - C: Cursor, - C::Time: Timestamp+Lattice, + C: Cursor, + T: Timestamp+Lattice, { type Key = C::Key; type Val = C::Val; type Time = C::Time; type R = C::R; - type Storage = C::Storage; + #[inline] fn key_valid(&self, storage: &S) -> bool { self.cursor.key_valid(storage) } + #[inline] fn val_valid(&self, storage: &S) -> bool { self.cursor.val_valid(storage) } - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } + #[inline] fn key<'a>(&self, storage: &'a S) -> &'a Self::Key { self.cursor.key(storage) } + #[inline] fn val<'a>(&self, storage: &'a S) -> &'a Self::Val { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &S, mut logic: L) { let since = self.since.borrow(); let until = self.until.borrow(); let mut temp: C::Time = ::minimum(); @@ -182,14 +181,14 @@ where }) } - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } + #[inline] fn step_key(&mut self, storage: &S) { self.cursor.step_key(storage) } + #[inline] fn seek_key(&mut self, storage: &S, key: &Self::Key) { self.cursor.seek_key(storage, key) } - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } + #[inline] fn step_val(&mut self, storage: &S) { self.cursor.step_val(storage) } + #[inline] fn seek_val(&mut self, storage: &S, val: &Self::Val) { self.cursor.seek_val(storage, val) } - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } + #[inline] fn rewind_keys(&mut self, storage: &S) { self.cursor.rewind_keys(storage) } + #[inline] fn rewind_vals(&mut self, storage: &S) { self.cursor.rewind_vals(storage) } } @@ -211,7 +210,7 @@ impl BatchCursorFrontier where B::Time: Clone { } } -impl Cursor for BatchCursorFrontier +impl Cursor> for BatchCursorFrontier where B::Time: Timestamp+Lattice, { @@ -220,16 +219,14 @@ where type Time = B::Time; type R = B::R; - type Storage = BatchFrontier; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } + #[inline] fn key_valid(&self, storage: &BatchFrontier) -> bool { self.cursor.key_valid(&storage.batch) } + #[inline] fn val_valid(&self, storage: &BatchFrontier) -> bool { self.cursor.val_valid(&storage.batch) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(&storage.batch) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(&storage.batch) } + #[inline] fn key<'a>(&self, storage: &'a BatchFrontier) -> &'a Self::Key { self.cursor.key(&storage.batch) } + #[inline] fn val<'a>(&self, storage: &'a BatchFrontier) -> &'a Self::Val { self.cursor.val(&storage.batch) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &BatchFrontier, mut logic: L) { let since = self.since.borrow(); let until = self.until.borrow(); let mut temp: B::Time = ::minimum(); @@ -242,12 +239,12 @@ where }) } - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(&storage.batch, key) } + #[inline] fn step_key(&mut self, storage: &BatchFrontier) { self.cursor.step_key(&storage.batch) } + #[inline] fn seek_key(&mut self, storage: &BatchFrontier, key: &Self::Key) { self.cursor.seek_key(&storage.batch, key) } - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(&storage.batch, val) } + #[inline] fn step_val(&mut self, storage: &BatchFrontier) { self.cursor.step_val(&storage.batch) } + #[inline] fn seek_val(&mut self, storage: &BatchFrontier, val: &Self::Val) { self.cursor.seek_val(&storage.batch, val) } - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) } + #[inline] fn rewind_keys(&mut self, storage: &BatchFrontier) { self.cursor.rewind_keys(&storage.batch) } + #[inline] fn rewind_vals(&mut self, storage: &BatchFrontier) { self.cursor.rewind_vals(&storage.batch) } } diff --git a/src/trace/wrappers/rc.rs b/src/trace/wrappers/rc.rs index 508521b8d..67854c991 100644 --- a/src/trace/wrappers/rc.rs +++ b/src/trace/wrappers/rc.rs @@ -18,7 +18,6 @@ use timely::progress::{Antichain, frontier::{AntichainRef, MutableAntichain}}; use lattice::Lattice; use trace::TraceReader; -use trace::cursor::Cursor; /// A wrapper around a trace which tracks the frontiers of all referees. /// @@ -103,6 +102,7 @@ where type R = Tr::R; type Batch = Tr::Batch; + type Storage = Tr::Storage; type Cursor = Tr::Cursor; /// Sets frontier to now be elements in `frontier`. @@ -122,7 +122,7 @@ where } fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_compaction.borrow() } /// Creates a new cursor over the wrapped trace. - fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Tr::Cursor, ::Storage)> { + fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Tr::Cursor, Tr::Storage)> { ::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier) } From df140d67abbac3245e9469655024144375f2fa82 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 22 Nov 2023 09:01:59 -0500 Subject: [PATCH 2/7] Decouple Batcher input from Batch types --- src/operators/arrange/arrangement.rs | 18 ++++++++++-- src/operators/consolidate.rs | 3 ++ src/trace/implementations/merge_batcher.rs | 5 +++- .../implementations/merge_batcher_col.rs | 5 +++- src/trace/mod.rs | 28 ++++++++++++------- 5 files changed, 45 insertions(+), 14 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index bd6c48dcc..0193004c4 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -456,6 +456,8 @@ where R: ExchangeData, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + ::Batcher: Batcher, + ::Batcher: Batcher, { self.arrange_named("Arrange") } @@ -472,6 +474,8 @@ where R: ExchangeData, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + ::Batcher: Batcher, + ::Batcher: Batcher, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); self.arrange_core(exchange, name) @@ -487,6 +491,8 @@ where P: ParallelizationContract, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + ::Batcher: Batcher, + ::Batcher: Batcher, ; } @@ -503,7 +509,9 @@ where K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData, - Tr: Trace + TraceReader + 'static, Tr::Batch: Batch + Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, + ::Batcher: Batcher, + ::Batcher: Batcher, { self.arrange_named("Arrange") } @@ -513,7 +521,9 @@ where K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData, - Tr: Trace + TraceReader + 'static, Tr::Batch: Batch + Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, + ::Batcher: Batcher, + ::Batcher: Batcher, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); self.arrange_core(exchange, name) @@ -524,6 +534,8 @@ where P: ParallelizationContract, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + ::Batcher: Batcher, + ::Batcher: Batcher, { // The `Arrange` operator is tasked with reacting to an advancing input // frontier by producing the sequence of batches whose lower and upper @@ -687,6 +699,8 @@ where P: ParallelizationContract, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + ::Batcher: Batcher, + ::Batcher: Batcher, { self.map(|k| (k, ())) .arrange_core(pact, name) diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index fff61eaad..2ace0a06c 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -13,6 +13,7 @@ use ::difference::Semigroup; use Data; use lattice::Lattice; +use trace::{Batch, Batcher}; /// Methods which require data be arrangeable. impl Collection @@ -57,6 +58,8 @@ where where Tr: crate::trace::Trace+crate::trace::TraceReader+'static, Tr::Batch: crate::trace::Batch, + ::Batcher: Batcher, + ::Batcher: Batcher, { use operators::arrange::arrangement::Arrange; self.map(|k| (k, ())) diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 00b01501f..4f900b14c 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -26,6 +26,9 @@ where B::Time: Lattice+timely::progress::Timestamp+Ord+Clone, B::R: Semigroup, { + type Item = ((B::Key,B::Val),B::Time,B::R); + type Time = B::Time; + fn new() -> Self { MergeBatcher { sorter: MergeSorter::new(), @@ -36,7 +39,7 @@ where } #[inline(never)] - fn push_batch(&mut self, batch: RefOrMut>) { + fn push_batch(&mut self, batch: RefOrMut>) { // `batch` is either a shared reference or an owned allocations. match batch { RefOrMut::Ref(reference) => { diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 9cc82fb38..1321e9fd3 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -32,6 +32,9 @@ impl Batcher for ColumnatedMergeBatcher B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation+'static, B::R: Semigroup+Columnation+'static, { + type Item = ((B::Key,B::Val),B::Time,B::R); + type Time = B::Time; + fn new() -> Self { ColumnatedMergeBatcher { sorter: MergeSorterColumnation::new(), @@ -42,7 +45,7 @@ impl Batcher for ColumnatedMergeBatcher } #[inline] - fn push_batch(&mut self, batch: RefOrMut>) { + fn push_batch(&mut self, batch: RefOrMut>) { // `batch` is either a shared reference or an owned allocations. match batch { RefOrMut::Ref(reference) => { diff --git a/src/trace/mod.rs b/src/trace/mod.rs index da353094f..394b5978e 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -301,15 +301,19 @@ pub trait Batch : BatchReader where Self: ::std::marker::Sized { } /// Functionality for collecting and batching updates. -pub trait Batcher { +pub trait Batcher { + /// Type of update pushed into the batcher. + type Item; + /// Times at which batches are formed. + type Time: Timestamp; /// Allocates a new empty batcher. fn new() -> Self; /// Adds an unordered batch of elements to the batcher. - fn push_batch(&mut self, batch: RefOrMut>); + fn push_batch(&mut self, batch: RefOrMut>); /// Returns all updates not greater or equal to an element of `upper`. - fn seal(&mut self, upper: Antichain) -> Output; + fn seal(&mut self, upper: Antichain) -> Output; /// Returns the lower envelope of contained update times. - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef; + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef; } /// Functionality for building batches from ordered update sequences. @@ -435,10 +439,12 @@ pub mod rc_blanket_impls { /// Functionality for collecting and batching updates. impl Batcher> for RcBatcher { + type Item = <::Batcher as Batcher>::Item; + type Time = <::Batcher as Batcher>::Time; fn new() -> Self { RcBatcher { batcher: >::new() } } - fn push_batch(&mut self, batch: RefOrMut>) { self.batcher.push_batch(batch) } - fn seal(&mut self, upper: Antichain) -> Rc { Rc::new(self.batcher.seal(upper)) } - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.batcher.frontier() } + fn push_batch(&mut self, batch: RefOrMut>) { self.batcher.push_batch(batch) } + fn seal(&mut self, upper: Antichain) -> Rc { Rc::new(self.batcher.seal(upper)) } + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.batcher.frontier() } } /// Wrapper type for building reference counted batches. @@ -550,15 +556,17 @@ pub mod abomonated_blanket_impls { /// Functionality for collecting and batching updates. impl Batcher>> for AbomonatedBatcher { + type Item = <::Batcher as Batcher>::Item; + type Time = <::Batcher as Batcher>::Time; fn new() -> Self { AbomonatedBatcher { batcher: >::new() } } - fn push_batch(&mut self, batch: RefOrMut>) { self.batcher.push_batch(batch) } - fn seal(&mut self, upper: Antichain) -> Abomonated> { + fn push_batch(&mut self, batch: RefOrMut>) { self.batcher.push_batch(batch) } + fn seal(&mut self, upper: Antichain) -> Abomonated> { let batch = self.batcher.seal(upper); let mut bytes = Vec::with_capacity(measure(&batch)); unsafe { abomonation::encode(&batch, &mut bytes).unwrap() }; unsafe { Abomonated::::new(bytes).unwrap() } } - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.batcher.frontier() } + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.batcher.frontier() } } /// Wrapper type for building reference counted batches. From 6a68b179e27a0df5b3c1532604cd33883df23bc2 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 22 Nov 2023 09:43:23 -0500 Subject: [PATCH 3/7] Decouple Builder input from Batch types --- src/operators/arrange/upsert.rs | 5 ++- src/operators/reduce.rs | 6 ++- src/trace/implementations/merge_batcher.rs | 3 +- .../implementations/merge_batcher_col.rs | 5 ++- src/trace/implementations/ord.rs | 16 +++++--- src/trace/implementations/ord_neu.rs | 9 +++-- src/trace/mod.rs | 39 ++++++++++++------- 7 files changed, 53 insertions(+), 30 deletions(-) diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 9f152e3f0..242203b20 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -146,6 +146,7 @@ where Tr::Val: ExchangeData, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + ::Builder: Builder, { let mut reader: Option> = None; @@ -277,10 +278,10 @@ where for (time, std::cmp::Reverse(next)) in list { if prev_value != next { if let Some(prev) = prev_value { - updates.push((key.clone(), prev, time.clone(), -1)); + updates.push(((key.clone(), prev), time.clone(), -1)); } if let Some(next) = next.as_ref() { - updates.push((key.clone(), next.clone(), time.clone(), 1)); + updates.push(((key.clone(), next.clone()), time.clone(), 1)); } prev_value = next; } diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 0c84f5953..3a8ce9bc1 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -276,6 +276,7 @@ pub trait ReduceCore where G::Timestam T2::Val: Data, T2::R: Abelian, T2::Batch: Batch, + ::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static, { self.reduce_core::<_,T2>(name, move |key, input, output, change| { @@ -298,6 +299,7 @@ pub trait ReduceCore where G::Timestam T2::Val: Data, T2::R: Semigroup, T2::Batch: Batch, + ::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static ; } @@ -316,6 +318,7 @@ where T2::R: Semigroup, T2: Trace+TraceReader+'static, T2::Batch: Batch, + ::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -334,6 +337,7 @@ where T2::Val: Data, T2::R: Semigroup, T2::Batch: Batch, + ::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { let mut result_trace = None; @@ -548,7 +552,7 @@ where for index in 0 .. buffers.len() { buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - builders[index].push((key.clone(), val, time, diff)); + builders[index].push(((key.clone(), val), time, diff)); } } } diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 4f900b14c..ae367f3a0 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -25,6 +25,7 @@ where B::Val: Ord+Clone, B::Time: Lattice+timely::progress::Timestamp+Ord+Clone, B::R: Semigroup, + B::Builder: Builder, { type Item = ((B::Key,B::Val),B::Time,B::R); type Time = B::Time; @@ -86,7 +87,7 @@ where keep.push(((key, val), time, diff)); } else { - builder.push((key, val, time, diff)); + builder.push(((key, val), time, diff)); } } // Recycling buffer. diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 1321e9fd3..8a316e32f 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -31,6 +31,7 @@ impl Batcher for ColumnatedMergeBatcher B::Val: Ord+Clone+Columnation+'static, B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation+'static, B::R: Semigroup+Columnation+'static, + B::Builder: Builder, { type Item = ((B::Key,B::Val),B::Time,B::R); type Time = B::Time; @@ -77,7 +78,7 @@ impl Batcher for ColumnatedMergeBatcher self.frontier.clear(); for buffer in merged.drain(..) { - for datum @ ((key, val), time, diff) in &buffer[..] { + for datum @ ((_key, _val), time, _diff) in &buffer[..] { if upper.less_equal(time) { self.frontier.insert(time.clone()); if !keep.is_empty() && keep.len() == keep.capacity() { @@ -87,7 +88,7 @@ impl Batcher for ColumnatedMergeBatcher keep.copy(datum); } else { - builder.copy((key, val, time, diff)); + builder.copy(datum); } } // Recycling buffer. diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index b9a55a49d..94b124b0a 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -346,6 +346,8 @@ impl Builder> for OrdValBuilder where OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> { + type Item = ((::Key, ::Val), ::Time, ::Diff); + type Time = ::Time; fn new() -> Self { OrdValBuilder { @@ -359,16 +361,16 @@ where } #[inline] - fn push(&mut self, (key, val, time, diff): ( as BatchReader>::Key, as BatchReader>::Val, as BatchReader>::Time, as BatchReader>::R)) { + fn push(&mut self, ((key, val), time, diff): Self::Item) { self.builder.push_tuple((key, (val, (time, diff)))); } - fn copy(&mut self, (key, val, time, diff): (& as BatchReader>::Key, & as BatchReader>::Val, & as BatchReader>::Time, & as BatchReader>::R)) { + fn copy(&mut self, ((key, val), time, diff): &Self::Item) { self.builder.push_tuple((key.clone(), (val.clone(), (time.clone(), diff.clone())))); } #[inline(never)] - fn done(self, lower: Antichain< as BatchReader>::Time>, upper: Antichain< as BatchReader>::Time>, since: Antichain< as BatchReader>::Time>) -> OrdValBatch { + fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { OrdValBatch { layer: self.builder.done(), desc: Description::new(lower, upper, since), @@ -642,6 +644,8 @@ impl Builder> for OrdKeyBuilder where OrdKeyBatch: Batch::Key, Val=(), Time=::Time, R=::Diff> { + type Item = ((::Key, ()), ::Time, ::Diff); + type Time = ::Time; fn new() -> Self { OrdKeyBuilder { @@ -656,17 +660,17 @@ where } #[inline] - fn push(&mut self, (key, _, time, diff): (::Key, (), ::Time, ::Diff)) { + fn push(&mut self, ((key, _), time, diff): Self::Item) { self.builder.push_tuple((key, (time, diff))); } #[inline] - fn copy(&mut self, (key, _, time, diff): (&::Key, &(), &::Time, &::Diff)) { + fn copy(&mut self, ((key, _), time, diff): &Self::Item) { self.builder.push_tuple((key.clone(), (time.clone(), diff.clone()))); } #[inline(never)] - fn done(self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdKeyBatch { + fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdKeyBatch { OrdKeyBatch { layer: self.builder.done(), desc: Description::new(lower, upper, since), diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 2a9344ea7..cc12c215a 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -489,6 +489,9 @@ mod val_batch { impl Builder> for OrdValBuilder { + type Item = ((::Key, ::Val), ::Time, ::Diff); + type Time = ::Time; + fn new() -> Self { Self::with_capacity(0) } fn with_capacity(cap: usize) -> Self { // We don't introduce zero offsets as they will be introduced by the first `push` call. @@ -506,7 +509,7 @@ mod val_batch { } #[inline] - fn push(&mut self, (key, val, time, diff): (::Key, ::Val, ::Time, ::Diff)) { + fn push(&mut self, ((key, val), time, diff): Self::Item) { // Perhaps this is a continuation of an already received key. if self.result.keys.last() == Some(&key) { @@ -532,7 +535,7 @@ mod val_batch { } #[inline] - fn copy(&mut self, (key, val, time, diff): (&::Key, &::Val, &::Time, &::Diff)) { + fn copy(&mut self, ((key, val), time, diff): &Self::Item) { // Perhaps this is a continuation of an already received key. if self.result.keys.last() == Some(key) { @@ -562,7 +565,7 @@ mod val_batch { } #[inline(never)] - fn done(mut self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdValBatch { + fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { // Record the final offsets self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap()); // Remove any pending singleton, and if it was set increment our count. diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 394b5978e..a94db6c11 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -256,7 +256,7 @@ where /// Values associated with keys. type Val; /// Timestamps associated with updates - type Time; + type Time: Timestamp; /// Associated update. type R; @@ -280,9 +280,9 @@ where /// An immutable collection of updates. pub trait Batch : BatchReader where Self: ::std::marker::Sized { /// A type used to assemble batches from disordered updates. - type Batcher: Batcher; + type Batcher: Batcher; /// A type used to assemble batches from ordered update sequences. - type Builder: Builder; + type Builder: Builder; /// A type used to progressively merge batches. type Merger: Merger; @@ -317,7 +317,12 @@ pub trait Batcher { } /// Functionality for building batches from ordered update sequences. -pub trait Builder { +pub trait Builder { + /// Type of update pushed into the builder. + type Item; + /// Times at which batches are formed. + type Time: Timestamp; + /// Allocates an empty builder. fn new() -> Self; /// Allocates an empty builder with some capacity. @@ -326,17 +331,17 @@ pub trait Builder { /// /// The default implementation uses `self.copy` with references to the owned arguments. /// One should override it if the builder can take advantage of owned arguments. - fn push(&mut self, element: (Output::Key, Output::Val, Output::Time, Output::R)) { - self.copy((&element.0, &element.1, &element.2, &element.3)); + fn push(&mut self, element: Self::Item) { + self.copy(&element); } /// Adds an element to the batch. - fn copy(&mut self, element: (&Output::Key, &Output::Val, &Output::Time, &Output::R)); + fn copy(&mut self, element: &Self::Item); /// Adds an ordered sequence of elements to the batch. - fn extend>(&mut self, iter: I) { + fn extend>(&mut self, iter: I) { for item in iter { self.push(item); } } /// Completes building and returns the batch. - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Output; + fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Output; } /// Represents a merge in progress. @@ -452,11 +457,13 @@ pub mod rc_blanket_impls { /// Functionality for building batches from ordered update sequences. impl Builder> for RcBuilder { + type Item = <::Builder as Builder>::Item; + type Time = <::Builder as Builder>::Time; fn new() -> Self { RcBuilder { builder: >::new() } } fn with_capacity(cap: usize) -> Self { RcBuilder { builder: >::with_capacity(cap) } } - fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { self.builder.push(element) } - fn copy(&mut self, element: (&B::Key, &B::Val, &B::Time, &B::R)) { self.builder.copy(element) } - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } + fn push(&mut self, element: Self::Item) { self.builder.push(element) } + fn copy(&mut self, element: &Self::Item) { self.builder.copy(element) } + fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } } /// Wrapper type for merging reference counted batches. @@ -574,11 +581,13 @@ pub mod abomonated_blanket_impls { /// Functionality for building batches from ordered update sequences. impl Builder>> for AbomonatedBuilder { + type Item = <::Builder as Builder>::Item; + type Time = <::Builder as Builder>::Time; fn new() -> Self { AbomonatedBuilder { builder: >::new() } } fn with_capacity(cap: usize) -> Self { AbomonatedBuilder { builder: >::with_capacity(cap) } } - fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { self.builder.push(element) } - fn copy(&mut self, element: (&B::Key, &B::Val, &B::Time, &B::R)) { self.builder.copy(element) } - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Abomonated> { + fn push(&mut self, element: Self::Item) { self.builder.push(element) } + fn copy(&mut self, element: &Self::Item) { self.builder.copy(element) } + fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Abomonated> { let batch = self.builder.done(lower, upper, since); let mut bytes = Vec::with_capacity(measure(&batch)); unsafe { abomonation::encode(&batch, &mut bytes).unwrap() }; From 409ad42ae10ff8c26f51f32a034383b7bd69b14c Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 22 Nov 2023 12:41:45 -0500 Subject: [PATCH 4/7] Allow Cursor types to be unsized --- src/trace/cursor/mod.rs | 8 ++++---- src/trace/wrappers/freeze.rs | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index ed11050c6..2cf5c4293 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -15,13 +15,13 @@ pub use self::cursor_list::CursorList; pub trait Cursor { /// Key by which updates are indexed. - type Key; + type Key: ?Sized; /// Values associated with keys. - type Val; + type Val: ?Sized; /// Timestamps associated with updates - type Time; + type Time: ?Sized; /// Associated update. - type R; + type R: ?Sized; /// Indicates if the current key is valid. /// diff --git a/src/trace/wrappers/freeze.rs b/src/trace/wrappers/freeze.rs index 17f1dfb02..ab3800291 100644 --- a/src/trace/wrappers/freeze.rs +++ b/src/trace/wrappers/freeze.rs @@ -203,6 +203,7 @@ impl CursorFreeze { impl Cursor for CursorFreeze where C: Cursor, + C::Time: Sized, F: Fn(&C::Time)->Option, { type Key = C::Key; From f3cbb6be6377b551c19eea4ad5a05252009a0ed4 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 22 Nov 2023 12:58:43 -0500 Subject: [PATCH 5/7] Decouple input and output arrangement types --- src/operators/arrange/arrangement.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 0193004c4..ae929dbdd 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -454,7 +454,7 @@ where K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData, - Tr: Trace+TraceReader+'static, + Tr: Trace+TraceReader+'static, Tr::Batch: Batch, ::Batcher: Batcher, ::Batcher: Batcher, @@ -472,7 +472,7 @@ where K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData, - Tr: Trace+TraceReader+'static, + Tr: Trace+TraceReader+'static, Tr::Batch: Batch, ::Batcher: Batcher, ::Batcher: Batcher, @@ -489,7 +489,7 @@ where fn arrange_core(&self, pact: P, name: &str) -> Arranged> where P: ParallelizationContract, - Tr: Trace+TraceReader+'static, + Tr: Trace+TraceReader+'static, Tr::Batch: Batch, ::Batcher: Batcher, ::Batcher: Batcher, @@ -509,7 +509,7 @@ where K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData, - Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, + Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, ::Batcher: Batcher, ::Batcher: Batcher, { @@ -521,7 +521,7 @@ where K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData, - Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, + Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, ::Batcher: Batcher, ::Batcher: Batcher, { @@ -532,7 +532,7 @@ where fn arrange_core(&self, pact: P, name: &str) -> Arranged> where P: ParallelizationContract, - Tr: Trace+TraceReader+'static, + Tr: Trace+TraceReader+'static, Tr::Batch: Batch, ::Batcher: Batcher, ::Batcher: Batcher, @@ -697,7 +697,7 @@ where fn arrange_core(&self, pact: P, name: &str) -> Arranged> where P: ParallelizationContract, - Tr: Trace+TraceReader+'static, + Tr: Trace+TraceReader+'static, Tr::Batch: Batch, ::Batcher: Batcher, ::Batcher: Batcher, From 1bd6c4ec9d98343b89d49e9a4ad28483355c9231 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 22 Nov 2023 14:34:54 -0500 Subject: [PATCH 6/7] Allow Batcher::seal to be generic in Builder --- src/operators/arrange/arrangement.rs | 34 +++---- src/operators/arrange/upsert.rs | 2 +- src/operators/consolidate.rs | 6 +- src/operators/reduce.rs | 8 +- src/trace/implementations/merge_batcher.rs | 40 +++----- .../implementations/merge_batcher_col.rs | 51 +++++----- src/trace/implementations/ord.rs | 44 +++++---- src/trace/implementations/ord_neu.rs | 19 ++-- src/trace/mod.rs | 98 +++++++------------ tests/trace.rs | 6 +- 10 files changed, 143 insertions(+), 165 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index ae929dbdd..ca6986328 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -29,7 +29,7 @@ use timely::dataflow::operators::Capability; use ::{Data, ExchangeData, Collection, AsCollection, Hashable}; use ::difference::Semigroup; use lattice::Lattice; -use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Cursor}; +use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor}; use trace::implementations::{KeySpine, ValSpine}; use trace::wrappers::enter::{TraceEnter, BatchEnter,}; @@ -456,8 +456,8 @@ where R: ExchangeData, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, { self.arrange_named("Arrange") } @@ -474,8 +474,8 @@ where R: ExchangeData, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); self.arrange_core(exchange, name) @@ -491,8 +491,8 @@ where P: ParallelizationContract, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, ; } @@ -510,8 +510,8 @@ where V: ExchangeData, R: ExchangeData, Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, { self.arrange_named("Arrange") } @@ -522,8 +522,8 @@ where V: ExchangeData, R: ExchangeData, Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); self.arrange_core(exchange, name) @@ -534,8 +534,8 @@ where P: ParallelizationContract, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, { // The `Arrange` operator is tasked with reacting to an advancing input // frontier by producing the sequence of batches whose lower and upper @@ -645,7 +645,7 @@ where } // Extract updates not in advance of `upper`. - let batch = batcher.seal(upper.clone()); + let batch = batcher.seal::<::Builder>(upper.clone()); writer.insert(batch.clone(), Some(capability.time().clone())); @@ -673,7 +673,7 @@ where } else { // Announce progress updates, even without data. - let _batch = batcher.seal(input.frontier().frontier().to_owned()); + let _batch = batcher.seal::<::Builder>(input.frontier().frontier().to_owned()); writer.seal(input.frontier().frontier().to_owned()); } @@ -699,8 +699,8 @@ where P: ParallelizationContract, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, { self.map(|k| (k, ())) .arrange_core(pact, name) diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 242203b20..bdfa23fb2 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -146,7 +146,7 @@ where Tr::Val: ExchangeData, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - ::Builder: Builder, + ::Builder: Builder, { let mut reader: Option> = None; diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 2ace0a06c..52776d781 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -13,7 +13,7 @@ use ::difference::Semigroup; use Data; use lattice::Lattice; -use trace::{Batch, Batcher}; +use trace::{Batch, Batcher, Builder}; /// Methods which require data be arrangeable. impl Collection @@ -58,8 +58,8 @@ where where Tr: crate::trace::Trace+crate::trace::TraceReader+'static, Tr::Batch: crate::trace::Batch, - ::Batcher: Batcher, - ::Batcher: Batcher, + ::Batcher: Batcher, + ::Builder: Builder, { use operators::arrange::arrangement::Arrange; self.map(|k| (k, ())) diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 3a8ce9bc1..7185e83da 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -276,7 +276,7 @@ pub trait ReduceCore where G::Timestam T2::Val: Data, T2::R: Abelian, T2::Batch: Batch, - ::Builder: Builder, + ::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static, { self.reduce_core::<_,T2>(name, move |key, input, output, change| { @@ -299,7 +299,7 @@ pub trait ReduceCore where G::Timestam T2::Val: Data, T2::R: Semigroup, T2::Batch: Batch, - ::Builder: Builder, + ::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static ; } @@ -318,7 +318,7 @@ where T2::R: Semigroup, T2: Trace+TraceReader+'static, T2::Batch: Batch, - ::Builder: Builder, + ::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -337,7 +337,7 @@ where T2::Val: Data, T2::R: Semigroup, T2::Batch: Batch, - ::Builder: Builder, + ::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { let mut result_trace = None; diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index ae367f3a0..9daec42b3 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -7,34 +7,26 @@ use timely::progress::frontier::Antichain; use ::difference::Semigroup; -use lattice::Lattice; -use trace::{Batch, Batcher, Builder}; +use trace::{Batcher, Builder}; +use trace::implementations::Update; /// Creates batches from unordered tuples. -pub struct MergeBatcher where B::Key: Ord, B::Val: Ord, B::Time: Ord, B::R: Semigroup { - sorter: MergeSorter<(B::Key, B::Val), B::Time, B::R>, - lower: Antichain, - frontier: Antichain, - phantom: ::std::marker::PhantomData, +pub struct MergeBatcher { + sorter: MergeSorter<(U::Key, U::Val), U::Time, U::Diff>, + lower: Antichain, + frontier: Antichain, + phantom: ::std::marker::PhantomData, } -impl Batcher for MergeBatcher -where - B: Batch, - B::Key: Ord+Clone, - B::Val: Ord+Clone, - B::Time: Lattice+timely::progress::Timestamp+Ord+Clone, - B::R: Semigroup, - B::Builder: Builder, -{ - type Item = ((B::Key,B::Val),B::Time,B::R); - type Time = B::Time; +impl Batcher for MergeBatcher { + type Item = ((U::Key,U::Val),U::Time,U::Diff); + type Time = U::Time; fn new() -> Self { MergeBatcher { sorter: MergeSorter::new(), frontier: Antichain::new(), - lower: Antichain::from_elem(::minimum()), + lower: Antichain::from_elem(::minimum()), phantom: ::std::marker::PhantomData, } } @@ -61,9 +53,9 @@ where // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline(never)] - fn seal(&mut self, upper: Antichain) -> B { + fn seal>(&mut self, upper: Antichain) -> B::Output { - let mut builder = B::Builder::new(); + let mut builder = B::new(); let mut merged = Vec::new(); self.sorter.finish_into(&mut merged); @@ -109,18 +101,18 @@ where let mut buffer = Vec::new(); self.sorter.push(&mut buffer); // We recycle buffers with allocations (capacity, and not zero-sized). - while buffer.capacity() > 0 && std::mem::size_of::<((B::Key,B::Val),B::Time,B::R)>() > 0 { + while buffer.capacity() > 0 && std::mem::size_of::<((U::Key,U::Val),U::Time,U::Diff)>() > 0 { buffer = Vec::new(); self.sorter.push(&mut buffer); } - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); + let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); self.lower = upper; seal } // the frontier of elements remaining after the most recent call to `self.seal`. - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.frontier.borrow() } } diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 8a316e32f..3b1d5eaec 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -8,39 +8,38 @@ use timely::progress::frontier::Antichain; use ::difference::Semigroup; -use lattice::Lattice; -use trace::{Batch, Batcher, Builder}; +use trace::{Batcher, Builder}; +use trace::implementations::Update; /// Creates batches from unordered tuples. -pub struct ColumnatedMergeBatcher - where - B::Key: Ord+Clone+Columnation, - B::Val: Ord+Clone+Columnation, - B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation, - B::R: Semigroup+Columnation, +pub struct ColumnatedMergeBatcher +where + U::Key: Columnation, + U::Val: Columnation, + U::Time: Columnation, + U::Diff: Columnation, { - sorter: MergeSorterColumnation<(B::Key, B::Val), B::Time, B::R>, - lower: Antichain, - frontier: Antichain, - phantom: PhantomData, + sorter: MergeSorterColumnation<(U::Key, U::Val), U::Time, U::Diff>, + lower: Antichain, + frontier: Antichain, + phantom: PhantomData, } -impl Batcher for ColumnatedMergeBatcher - where - B::Key: Ord+Clone+Columnation+'static, - B::Val: Ord+Clone+Columnation+'static, - B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation+'static, - B::R: Semigroup+Columnation+'static, - B::Builder: Builder, +impl Batcher for ColumnatedMergeBatcher +where + U::Key: Columnation + 'static, + U::Val: Columnation + 'static, + U::Time: Columnation + 'static, + U::Diff: Columnation + 'static, { - type Item = ((B::Key,B::Val),B::Time,B::R); - type Time = B::Time; + type Item = ((U::Key,U::Val),U::Time,U::Diff); + type Time = U::Time; fn new() -> Self { ColumnatedMergeBatcher { sorter: MergeSorterColumnation::new(), frontier: Antichain::new(), - lower: Antichain::from_elem(::minimum()), + lower: Antichain::from_elem(::minimum()), phantom: PhantomData, } } @@ -65,9 +64,9 @@ impl Batcher for ColumnatedMergeBatcher // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline] - fn seal(&mut self, upper: Antichain) -> B { + fn seal>(&mut self, upper: Antichain) -> B::Output { - let mut builder = B::Builder::new(); + let mut builder = B::new(); let mut merged = Default::default(); self.sorter.finish_into(&mut merged); @@ -106,13 +105,13 @@ impl Batcher for ColumnatedMergeBatcher // Drain buffers (fast reclamation). self.sorter.clear_stash(); - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); + let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); self.lower = upper; seal } // the frontier of elements remaining after the most recent call to `self.seal`. - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.frontier.borrow() } } diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 94b124b0a..3ac2946a3 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -94,8 +94,8 @@ impl BatchReader for OrdValBatch { impl Batch for OrdValBatch> { - type Batcher = MergeBatcher; - type Builder = OrdValBuilder; + type Batcher = MergeBatcher; + type Builder = OrdValBuilder>; type Merger = OrdValMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -111,8 +111,8 @@ where Self::Time: Columnation + 'static, Self::R: Columnation + 'static, { - type Batcher = ColumnatedMergeBatcher; - type Builder = OrdValBuilder; + type Batcher = ColumnatedMergeBatcher; + type Builder = OrdValBuilder>; type Merger = OrdValMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -337,26 +337,30 @@ impl Cursor> for OrdValCursor { } /// A builder for creating layers from unsorted update tuples. -pub struct OrdValBuilder { +pub struct OrdValBuilder { builder: KVTDBuilder, + phantom: PhantomData, } -impl Builder> for OrdValBuilder +impl Builder for OrdValBuilder where OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> { type Item = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; + type Output = OrdValBatch; fn new() -> Self { OrdValBuilder { - builder: >::new() + builder: >::new(), + phantom: std::marker::PhantomData, } } fn with_capacity(cap: usize) -> Self { OrdValBuilder { - builder: as TupleBuilder>::with_capacity(cap) + builder: as TupleBuilder>::with_capacity(cap), + phantom: std::marker::PhantomData, } } @@ -411,9 +415,9 @@ impl BatchReader for OrdKeyBatch { fn description(&self) -> &Description<::Time> { &self.desc } } -impl Batch for OrdKeyBatch> { - type Batcher = MergeBatcher; - type Builder = OrdKeyBuilder; +impl Batch for OrdKeyBatch> where L::Target: Update { + type Batcher = MergeBatcher; + type Builder = OrdKeyBuilder>; type Merger = OrdKeyMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -423,13 +427,13 @@ impl Batch for OrdKeyBatch> { impl Batch for OrdKeyBatch> where - ::Target: Columnation + 'static, + ::Target: Update + Columnation + 'static, Self::Key: Columnation + 'static, Self::Time: Columnation + 'static, Self::R: Columnation + 'static, { - type Batcher = ColumnatedMergeBatcher; - type Builder = OrdKeyBuilder; + type Batcher = ColumnatedMergeBatcher; + type Builder = OrdKeyBuilder>; type Merger = OrdKeyMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -636,26 +640,30 @@ impl Cursor> for OrdKeyCursor { /// A builder for creating layers from unsorted update tuples. -pub struct OrdKeyBuilder { +pub struct OrdKeyBuilder { builder: KTDBuilder, + phantom: std::marker::PhantomData } -impl Builder> for OrdKeyBuilder +impl Builder for OrdKeyBuilder where OrdKeyBatch: Batch::Key, Val=(), Time=::Time, R=::Diff> { type Item = ((::Key, ()), ::Time, ::Diff); type Time = ::Time; + type Output = OrdKeyBatch; fn new() -> Self { OrdKeyBuilder { - builder: >::new() + builder: >::new(), + phantom: std::marker::PhantomData, } } fn with_capacity(cap: usize) -> Self { OrdKeyBuilder { - builder: as TupleBuilder>::with_capacity(cap) + builder: as TupleBuilder>::with_capacity(cap), + phantom: std::marker::PhantomData, } } diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index d8dbe5cf4..694af897c 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -128,8 +128,8 @@ mod val_batch { } impl Batch for OrdValBatch> { - type Batcher = MergeBatcher; - type Builder = OrdValBuilder; + type Batcher = MergeBatcher; + type Builder = OrdValBuilder>; type Merger = OrdValMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -145,8 +145,8 @@ mod val_batch { Self::Time: Columnation + 'static, Self::R: Columnation + 'static, { - type Batcher = ColumnatedMergeBatcher; - type Builder = OrdValBuilder; + type Batcher = ColumnatedMergeBatcher; + type Builder = OrdValBuilder>; type Merger = OrdValMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -479,7 +479,7 @@ mod val_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct OrdValBuilder { + pub struct OrdValBuilder { result: OrdValStorage, singleton: Option<(::Time, ::Diff)>, /// Counts the number of singleton optimizations we performed. @@ -487,9 +487,11 @@ mod val_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + /// Phantom marker for Rust happiness. + pub phantom: PhantomData, } - impl OrdValBuilder { + impl OrdValBuilder { /// Pushes a single update, which may set `self.singleton` rather than push. /// /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`. @@ -517,13 +519,13 @@ mod val_batch { } } - impl Builder> for OrdValBuilder + impl Builder for OrdValBuilder where OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> { - type Item = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; + type Output = OrdValBatch; fn new() -> Self { Self::with_capacity(0) } fn with_capacity(cap: usize) -> Self { @@ -538,6 +540,7 @@ mod val_batch { }, singleton: None, singletons: 0, + phantom: std::marker::PhantomData, } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index a94db6c11..0f5bffaa3 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -280,9 +280,9 @@ where /// An immutable collection of updates. pub trait Batch : BatchReader where Self: ::std::marker::Sized { /// A type used to assemble batches from disordered updates. - type Batcher: Batcher; + type Batcher: Batcher