Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove time from MergeBatcher #550

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use crate::trace::{Batcher, Builder};
use crate::Data;

/// Creates batches from unordered tuples.
pub struct MergeBatcher<Input, C, M, T>
pub struct MergeBatcher<Input, C, M>
where
C: ContainerBuilder<Container=M::Chunk> + Default,
M: Merger<Time = T>,
C: ContainerBuilder<Container=M::Chunk>,
M: Merger,
{
/// each power-of-two length list of allocations.
/// Do not push/pop directly but use the corresponding functions
Expand All @@ -36,20 +36,20 @@ where
/// Timely operator ID.
operator_id: usize,
/// Current lower frontier, we sealed up to here.
lower: Antichain<T>,
lower: Antichain<M::Time>,
/// The lower-bound frontier of the data, after the last call to seal.
frontier: Antichain<T>,
frontier: Antichain<M::Time>,
_marker: PhantomData<Input>,
}

impl<Input, C, M, T> Batcher for MergeBatcher<Input, C, M, T>
impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
where
C: ContainerBuilder<Container=M::Chunk> + Default + for<'a> PushInto<&'a mut Input>,
M: Merger<Time = T>,
T: Timestamp,
M: Merger,
M::Time: Timestamp,
{
type Input = Input;
type Time = T;
type Time = M::Time;
type Output = M::Chunk;

fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
Expand All @@ -61,7 +61,7 @@ where
chains: Vec::new(),
stash: Vec::new(),
frontier: Antichain::new(),
lower: Antichain::from_elem(T::minimum()),
lower: Antichain::from_elem(M::Time::minimum()),
_marker: PhantomData,
}
}
Expand All @@ -80,7 +80,7 @@ where
// in `upper`. All updates must have time greater or equal to the previously used `upper`,
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
// Finish
while let Some(chunk) = self.chunker.finish() {
let chunk = std::mem::take(chunk);
Expand Down Expand Up @@ -109,22 +109,22 @@ where

self.stash.clear();

let seal = B::seal(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow());
let seal = B::seal(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(M::Time::minimum()).borrow());
self.lower = upper;
seal
}

/// The frontier of elements remaining after the most recent call to `self.seal`.
#[inline]
fn frontier(&mut self) -> AntichainRef<T> {
fn frontier(&mut self) -> AntichainRef<M::Time> {
self.frontier.borrow()
}
}

impl<Input, C, M, T> MergeBatcher<Input, C, M, T>
impl<Input, C, M> MergeBatcher<Input, C, M>
where
C: ContainerBuilder<Container=M::Chunk> + Default,
M: Merger<Time = T>,
M: Merger,
{
/// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
/// by decreasing length.
Expand Down Expand Up @@ -189,10 +189,10 @@ where
}
}

impl<Input, C, M, T> Drop for MergeBatcher<Input, C, M, T>
impl<Input, C, M> Drop for MergeBatcher<Input, C, M>
where
C: ContainerBuilder<Container=M::Chunk> + Default,
M: Merger<Time = T>,
M: Merger,
{
fn drop(&mut self) {
// Cleanup chain to retract accounting information.
Expand Down
16 changes: 8 additions & 8 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunke
use crate::trace::implementations::spine_fueled::Spine;
use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger};
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
use crate::trace::implementations::merge_batcher_flat::{FlatcontainerMerger, MergerChunk};
use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger;
use crate::trace::rc_blanket_impls::RcBuilder;

use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout};
Expand All @@ -28,7 +28,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
/// A trace implementation using a spine of ordered lists.
pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
/// A batcher using ordered lists.
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>, T>;
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>>;
/// A builder using ordered lists.
pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;

Expand All @@ -38,14 +38,14 @@ pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R
/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
/// A batcher for columnar storage.
pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>, T>;
pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>>;
/// A builder for columnar storage.
pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;

/// A trace implementation backed by flatcontainer storage.
pub type FlatValSpine<L> = Spine<Rc<OrdValBatch<L>>>;
/// A batcher for flatcontainer storage.
pub type FlatValBatcher<R, C> = MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>, <R as MergerChunk>::TimeOwned>;
pub type FlatValBatcher<R, C> = MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>>;
/// A builder for flatcontainer storage.
pub type FlatValBuilder<L, R> = RcBuilder<OrdValBuilder<L, FlatStack<R>>>;

Expand All @@ -63,7 +63,7 @@ pub type FlatValBuilderDefault<K, V, T, R> = FlatValBuilder<FlatLayout<<K as Reg
/// A trace implementation using a spine of ordered lists.
pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
/// A batcher for ordered lists.
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<((K, ()), T, R)>, T>;
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<((K, ()), T, R)>>;
/// A builder for ordered lists.
pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;

Expand All @@ -73,14 +73,14 @@ pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>
/// A trace implementation backed by columnar storage.
pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
/// A batcher for columnar storage
pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColumnationMerger<((K,()),T,R)>, T>;
pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColumnationMerger<((K,()),T,R)>>;
/// A builder for columnar storage
pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;

/// A trace implementation backed by flatcontainer storage.
pub type FlatKeySpine<L> = Spine<Rc<OrdKeyBatch<L>>>;
/// A batcher for flatcontainer storage.
pub type FlatKeyBatcher<R, C> = MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>, <R as MergerChunk>::TimeOwned>;
pub type FlatKeyBatcher<R, C> = MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>>;
/// A builder for flatcontainer storage.
pub type FlatKeyBuilder<L, R> = RcBuilder<OrdKeyBuilder<L, FlatStack<R>>>;

Expand All @@ -96,7 +96,7 @@ pub type FlatKeyBuilderDefault<K, T, R> = FlatKeyBuilder<FlatLayout<<K as Region
/// A trace implementation backed by columnar storage.
pub type PreferredSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Preferred<K,V,T,R>>>>;
/// A batcher for columnar storage.
pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationMerger<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>,T>;
pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationMerger<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>;
/// A builder for columnar storage.
pub type PreferredBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Preferred<K,V,T,R>, TimelyStack<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>>;

Expand Down
4 changes: 2 additions & 2 deletions src/trace/implementations/rhh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder};
/// A trace implementation using a spine of ordered lists.
pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K,V),T,R)>>>>;
/// A batcher for ordered lists.
pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>, T>;
pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>>;
/// A builder for ordered lists.
pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;

Expand All @@ -35,7 +35,7 @@ pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<
/// A trace implementation backed by columnar storage.
pub type ColSpine<K, V, T, R> = Spine<Rc<RhhValBatch<TStack<((K,V),T,R)>>>>;
/// A batcher for columnar storage.
pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>, T>;
pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>>;
/// A builder for columnar storage.
pub type ColBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;

Expand Down
Loading