Skip to content

Commit

Permalink
Merge batcher for flat container without key and value (#547)
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Dec 6, 2024
1 parent dafe288 commit b846f73
Showing 1 changed file with 21 additions and 26 deletions.
47 changes: 21 additions & 26 deletions src/trace/implementations/merge_batcher_flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::marker::PhantomData;
use timely::progress::frontier::{Antichain, AntichainRef};
use timely::{Data, PartialOrder};
use timely::container::flatcontainer::{Push, FlatStack, Region, ReserveItems};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use timely::container::flatcontainer::impls::tuple::TupleABCRegion;

use crate::difference::{IsZero, Semigroup};
use crate::trace::implementations::merge_batcher::Merger;
Expand Down Expand Up @@ -56,10 +56,8 @@ impl<MC: Region> FlatcontainerMerger<MC> {

/// Behavior to dissect items of chunks in the merge batcher
pub trait MergerChunk: Region {
/// The key of the update
type Key<'a>: Ord where Self: 'a;
/// The value of the update
type Val<'a>: Ord where Self: 'a;
/// The data portion of the update
type Data<'a>: Ord where Self: 'a;
/// The time of the update
type Time<'a>: Ord where Self: 'a;
/// The owned time type.
Expand All @@ -70,28 +68,25 @@ pub trait MergerChunk: Region {
type DiffOwned;

/// Split a read item into its constituents. Must be cheap.
fn into_parts<'a>(item: Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>);
fn into_parts<'a>(item: Self::ReadItem<'a>) -> (Self::Data<'a>, Self::Time<'a>, Self::Diff<'a>);
}

impl<K,V,T,R> MergerChunk for TupleABCRegion<TupleABRegion<K, V>, T, R>
impl<D,T,R> MergerChunk for TupleABCRegion<D, T, R>
where
K: Region,
for<'a> K::ReadItem<'a>: Ord,
V: Region,
for<'a> V::ReadItem<'a>: Ord,
D: Region,
for<'a> D::ReadItem<'a>: Ord,
T: Region,
for<'a> T::ReadItem<'a>: Ord,
R: Region,
{
type Key<'a> = K::ReadItem<'a> where Self: 'a;
type Val<'a> = V::ReadItem<'a> where Self: 'a;
type Data<'a> = D::ReadItem<'a> where Self: 'a;
type Time<'a> = T::ReadItem<'a> where Self: 'a;
type TimeOwned = T::Owned;
type Diff<'a> = R::ReadItem<'a> where Self: 'a;
type DiffOwned = R::Owned;

fn into_parts<'a>(((key, val), time, diff): Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) {
(key, val, time, diff)
fn into_parts<'a>((data, time, diff): Self::ReadItem<'a>) -> (Self::Data<'a>, Self::Time<'a>, Self::Diff<'a>) {
(data, time, diff)
}
}

Expand All @@ -100,8 +95,8 @@ where
for<'a> MC: MergerChunk + Clone + 'static
+ ReserveItems<<MC as Region>::ReadItem<'a>>
+ Push<<MC as Region>::ReadItem<'a>>
+ Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, &'a MC::DiffOwned)>
+ Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, MC::Diff<'a>)>,
+ Push<(MC::Data<'a>, MC::Time<'a>, &'a MC::DiffOwned)>
+ Push<(MC::Data<'a>, MC::Time<'a>, MC::Diff<'a>)>,
for<'a> MC::Time<'a>: PartialOrder<MC::TimeOwned> + Copy + IntoOwned<'a, Owned=MC::TimeOwned>,
for<'a> MC::Diff<'a>: IntoOwned<'a, Owned = MC::DiffOwned>,
for<'a> MC::TimeOwned: Ord + PartialOrder + PartialOrder<MC::Time<'a>> + Data,
Expand All @@ -125,9 +120,9 @@ where
while !head1.is_empty() && !head2.is_empty() {
while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() {
let cmp = {
let (key1, val1, time1, _diff) = MC::into_parts(head1.peek());
let (key2, val2, time2, _diff) = MC::into_parts(head2.peek());
((key1, val1), time1).cmp(&((key2, val2), time2))
let (data1, time1, _diff) = MC::into_parts(head1.peek());
let (data2, time2, _diff) = MC::into_parts(head2.peek());
(data1, time1).cmp(&(data2, time2))
};
// TODO: The following less/greater branches could plausibly be a good moment for
// `copy_range`, on account of runs of records that might benefit more from a
Expand All @@ -140,12 +135,12 @@ where
result.copy(head2.pop());
}
Ordering::Equal => {
let (key, val, time1, diff1) = MC::into_parts(head1.pop());
let (_key, _val, _time2, diff2) = MC::into_parts(head2.pop());
let (data, time1, diff1) = MC::into_parts(head1.pop());
let (_data, _time2, diff2) = MC::into_parts(head2.pop());
diff1.clone_onto(&mut diff);
diff.plus_equals(&diff2);
if !diff.is_zero() {
result.copy(((key, val), time1, &diff));
result.copy((data, time1, &diff));
}
}
}
Expand Down Expand Up @@ -212,20 +207,20 @@ where
let mut ready = self.empty(stash);

for buffer in merged {
for (key, val, time, diff) in buffer.iter().map(MC::into_parts) {
for (data, time, diff) in buffer.iter().map(MC::into_parts) {
if upper.less_equal(&time) {
frontier.insert_with(&time, |time| (*time).into_owned());
if keep.len() == keep.capacity() && !keep.is_empty() {
kept.push(keep);
keep = self.empty(stash);
}
keep.copy(((key, val), time, diff));
keep.copy((data, time, diff));
} else {
if ready.len() == ready.capacity() && !ready.is_empty() {
readied.push(ready);
ready = self.empty(stash);
}
ready.copy(((key, val), time, diff));
ready.copy((data, time, diff));
}
}
// Recycling buffer.
Expand Down

0 comments on commit b846f73

Please sign in to comment.