Skip to content

Commit

Permalink
Remove time from MergeBatcher (#550)
Browse files Browse the repository at this point in the history
We can use the time declared by the Merger direction, which always needs to
equal the time on MergeBatcher's Batcher implementation.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Dec 7, 2024
1 parent c90b92e commit daae392
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 27 deletions.
34 changes: 17 additions & 17 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use crate::trace::{Batcher, Builder};
use crate::Data;

/// Creates batches from unordered tuples.
pub struct MergeBatcher<Input, C, M, T>
pub struct MergeBatcher<Input, C, M>
where
C: ContainerBuilder<Container=M::Chunk> + Default,
M: Merger<Time = T>,
C: ContainerBuilder<Container=M::Chunk>,
M: Merger,
{
/// each power-of-two length list of allocations.
/// Do not push/pop directly but use the corresponding functions
Expand All @@ -36,20 +36,20 @@ where
/// Timely operator ID.
operator_id: usize,
/// Current lower frontier, we sealed up to here.
lower: Antichain<T>,
lower: Antichain<M::Time>,
/// The lower-bound frontier of the data, after the last call to seal.
frontier: Antichain<T>,
frontier: Antichain<M::Time>,
_marker: PhantomData<Input>,
}

impl<Input, C, M, T> Batcher for MergeBatcher<Input, C, M, T>
impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
where
C: ContainerBuilder<Container=M::Chunk> + Default + for<'a> PushInto<&'a mut Input>,
M: Merger<Time = T>,
T: Timestamp,
M: Merger,
M::Time: Timestamp,
{
type Input = Input;
type Time = T;
type Time = M::Time;
type Output = M::Chunk;

fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
Expand All @@ -61,7 +61,7 @@ where
chains: Vec::new(),
stash: Vec::new(),
frontier: Antichain::new(),
lower: Antichain::from_elem(T::minimum()),
lower: Antichain::from_elem(M::Time::minimum()),
_marker: PhantomData,
}
}
Expand All @@ -80,7 +80,7 @@ where
// in `upper`. All updates must have time greater or equal to the previously used `upper`,
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
// Finish
while let Some(chunk) = self.chunker.finish() {
let chunk = std::mem::take(chunk);
Expand Down Expand Up @@ -109,22 +109,22 @@ where

self.stash.clear();

let seal = B::seal(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow());
let seal = B::seal(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(M::Time::minimum()).borrow());
self.lower = upper;
seal
}

/// The frontier of elements remaining after the most recent call to `self.seal`.
#[inline]
fn frontier(&mut self) -> AntichainRef<T> {
fn frontier(&mut self) -> AntichainRef<M::Time> {
self.frontier.borrow()
}
}

impl<Input, C, M, T> MergeBatcher<Input, C, M, T>
impl<Input, C, M> MergeBatcher<Input, C, M>
where
C: ContainerBuilder<Container=M::Chunk> + Default,
M: Merger<Time = T>,
M: Merger,
{
/// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
/// by decreasing length.
Expand Down Expand Up @@ -189,10 +189,10 @@ where
}
}

impl<Input, C, M, T> Drop for MergeBatcher<Input, C, M, T>
impl<Input, C, M> Drop for MergeBatcher<Input, C, M>
where
C: ContainerBuilder<Container=M::Chunk> + Default,
M: Merger<Time = T>,
M: Merger,
{
fn drop(&mut self) {
// Cleanup chain to retract accounting information.
Expand Down
16 changes: 8 additions & 8 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunke
use crate::trace::implementations::spine_fueled::Spine;
use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger};
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
use crate::trace::implementations::merge_batcher_flat::{FlatcontainerMerger, MergerChunk};
use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger;
use crate::trace::rc_blanket_impls::RcBuilder;

use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout};
Expand All @@ -28,7 +28,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
/// A trace implementation using a spine of ordered lists.
pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
/// A batcher using ordered lists.
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>, T>;
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>>;
/// A builder using ordered lists.
pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;

Expand All @@ -38,14 +38,14 @@ pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R
/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
/// A batcher for columnar storage.
pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>, T>;
pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>>;
/// A builder for columnar storage.
pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;

/// A trace implementation backed by flatcontainer storage.
pub type FlatValSpine<L> = Spine<Rc<OrdValBatch<L>>>;
/// A batcher for flatcontainer storage.
pub type FlatValBatcher<R, C> = MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>, <R as MergerChunk>::TimeOwned>;
pub type FlatValBatcher<R, C> = MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>>;
/// A builder for flatcontainer storage.
pub type FlatValBuilder<L, R> = RcBuilder<OrdValBuilder<L, FlatStack<R>>>;

Expand All @@ -63,7 +63,7 @@ pub type FlatValBuilderDefault<K, V, T, R> = FlatValBuilder<FlatLayout<<K as Reg
/// A trace implementation using a spine of ordered lists.
pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
/// A batcher for ordered lists.
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<((K, ()), T, R)>, T>;
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<((K, ()), T, R)>>;
/// A builder for ordered lists.
pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;

Expand All @@ -73,14 +73,14 @@ pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>
/// A trace implementation backed by columnar storage.
pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
/// A batcher for columnar storage
pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColumnationMerger<((K,()),T,R)>, T>;
pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColumnationMerger<((K,()),T,R)>>;
/// A builder for columnar storage
pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;

/// A trace implementation backed by flatcontainer storage.
pub type FlatKeySpine<L> = Spine<Rc<OrdKeyBatch<L>>>;
/// A batcher for flatcontainer storage.
pub type FlatKeyBatcher<R, C> = MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>, <R as MergerChunk>::TimeOwned>;
pub type FlatKeyBatcher<R, C> = MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>>;
/// A builder for flatcontainer storage.
pub type FlatKeyBuilder<L, R> = RcBuilder<OrdKeyBuilder<L, FlatStack<R>>>;

Expand All @@ -96,7 +96,7 @@ pub type FlatKeyBuilderDefault<K, T, R> = FlatKeyBuilder<FlatLayout<<K as Region
/// A trace implementation backed by columnar storage.
pub type PreferredSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Preferred<K,V,T,R>>>>;
/// A batcher for columnar storage.
pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationMerger<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>,T>;
pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationMerger<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>;
/// A builder for columnar storage.
pub type PreferredBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Preferred<K,V,T,R>, TimelyStack<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>>;

Expand Down
4 changes: 2 additions & 2 deletions src/trace/implementations/rhh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder};
/// A trace implementation using a spine of ordered lists.
pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K,V),T,R)>>>>;
/// A batcher for ordered lists.
pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>, T>;
pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>>;
/// A builder for ordered lists.
pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;

Expand All @@ -35,7 +35,7 @@ pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<
/// A trace implementation backed by columnar storage.
pub type ColSpine<K, V, T, R> = Spine<Rc<RhhValBatch<TStack<((K,V),T,R)>>>>;
/// A batcher for columnar storage.
pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>, T>;
pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>>;
/// A builder for columnar storage.
pub type ColBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;

Expand Down

0 comments on commit daae392

Please sign in to comment.