Skip to content

Commit

Permalink
Consolidation consolidation (#552)
Browse files Browse the repository at this point in the history
* Make consolidate_into a trait method

* Streaming consolidate_into
  • Loading branch information
frankmcsherry authored Dec 8, 2024
1 parent 63212f9 commit 84cfc36
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 49 deletions.
90 changes: 44 additions & 46 deletions src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,41 @@ pub trait ConsolidateLayout: Container {

/// Compare two items by key to sort containers.
fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;

/// Consolidate the supplied container.
fn consolidate_into(&mut self, target: &mut Self) {
// Sort input data
let mut permutation = Vec::with_capacity(self.len());
permutation.extend(self.drain());
permutation.sort_by(|a, b| Self::cmp(a, b));

// Iterate over the data, accumulating diffs for like keys.
let mut iter = permutation.drain(..);
if let Some(item) = iter.next() {

let (k, d) = Self::into_parts(item);
let mut prev_key = k;
let mut prev_diff = d.into_owned();

for item in iter {
let (next_key, next_diff) = Self::into_parts(item);
if next_key == prev_key {
prev_diff.plus_equals(&next_diff);
}
else {
if !prev_diff.is_zero() {
target.push_with_diff(prev_key, prev_diff);
}
prev_key = next_key;
prev_diff = next_diff.into_owned();
}
}

if !prev_diff.is_zero() {
target.push_with_diff(prev_key, prev_diff);
}
}
}
}

impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
Expand All @@ -278,6 +313,12 @@ where
fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) {
self.push((data, time, diff));
}

/// Consolidate the supplied container.
fn consolidate_into(&mut self, target: &mut Self) {
consolidate_updates(self);
std::mem::swap(self, target);
}
}

impl<K, V, T, R> ConsolidateLayout for FlatStack<TupleABCRegion<TupleABRegion<K, V>, T, R>>
Expand Down Expand Up @@ -308,49 +349,6 @@ where
}
}

/// Consolidate the supplied container.
pub fn consolidate_container<C: ConsolidateLayout>(container: &mut C, target: &mut C) {
// Sort input data
let mut permutation = Vec::with_capacity(container.len());
permutation.extend(container.drain());
permutation.sort_by(|a, b| C::cmp(a, b));

// Consolidate sorted data.
let mut previous: Option<(C::Key<'_>, C::DiffOwned)> = None;
// TODO: We should ensure that `target` has sufficient capacity, but `Container` doesn't
// offer a suitable API.
for item in permutation.drain(..) {
let (key, diff) = C::into_parts(item);
match &mut previous {
// Initial iteration, remember key and diff.
// TODO: Opportunity for GatCow for diff.
None => previous = Some((key, diff.into_owned())),
Some((prevkey, d)) => {
// Second and following iteration, compare and accumulate or emit.
if key == *prevkey {
// Keys match, keep accumulating.
d.plus_equals(&diff);
} else {
// Keys don't match, write down result if non-zero.
if !d.is_zero() {
// Unwrap because we checked for `Some` above.
let (prevkey, diff) = previous.take().unwrap();
target.push_with_diff(prevkey, diff);
}
// Remember current key and diff as `previous`
previous = Some((key, diff.into_owned()));
}
}
}
}
// Write any residual data, if non-zero.
if let Some((previtem, d)) = previous {
if !d.is_zero() {
target.push_with_diff(previtem, d);
}
}
}



#[cfg(test)]
Expand Down Expand Up @@ -445,11 +443,11 @@ mod tests {
}

#[test]
fn test_consolidate_container() {
fn test_consolidate_into() {
let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)];
let mut target = Vec::default();
data.sort();
consolidate_container(&mut data, &mut target);
data.consolidate_into(&mut target);
assert_eq!(target, [(2, 1, 1)]);
}

Expand Down Expand Up @@ -477,7 +475,7 @@ mod tests {
data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
data.sort_by(|x,y| x.0.cmp(&y.0));
let start = std::time::Instant::now();
consolidate_container(&mut data, &mut target);
data.consolidate_into(&mut target);
duration += start.elapsed();

consolidate_updates(&mut data2);
Expand Down
6 changes: 3 additions & 3 deletions src/trace/implementations/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::VecDeque;
use timely::Container;
use timely::container::columnation::{Columnation, TimelyStack};
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
use crate::consolidation::{consolidate_updates, consolidate_container, ConsolidateLayout};
use crate::consolidation::{consolidate_updates, ConsolidateLayout};
use crate::difference::Semigroup;

/// Chunk a stream of vectors into chains of vectors.
Expand Down Expand Up @@ -269,7 +269,7 @@ where
self.pending.push(item);
if self.pending.at_capacity() {
let starting_len = self.pending.len();
consolidate_container(&mut self.pending, &mut self.empty);
self.pending.consolidate_into(&mut self.empty);
std::mem::swap(&mut self.pending, &mut self.empty);
self.empty.clear();
if self.pending.len() > starting_len / 2 {
Expand Down Expand Up @@ -300,7 +300,7 @@ where

fn finish(&mut self) -> Option<&mut Self::Container> {
if !self.pending.is_empty() {
consolidate_container(&mut self.pending, &mut self.empty);
self.pending.consolidate_into(&mut self.empty);
std::mem::swap(&mut self.pending, &mut self.empty);
self.empty.clear();
if !self.pending.is_empty() {
Expand Down

0 comments on commit 84cfc36

Please sign in to comment.