Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jun 4, 2024
1 parent 59f3197 commit 8f4cf39
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 91 deletions.
165 changes: 92 additions & 73 deletions src/trace/implementations/chunker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Organize streams of data into sorted chunks.
use std::collections::VecDeque;
use std::marker::PhantomData;
use timely::communication::message::RefOrMut;
use timely::Container;
Expand All @@ -10,41 +11,53 @@ use crate::difference::Semigroup;

/// Behavior to transform streams of data into sorted chunks of regular size.
pub trait Chunker {
/// Input container type.
/// Input type.
type Input;
/// Output container type.
/// Output type.
type Output;

/// Accept a container and absorb its contents. The caller must
/// call [`extract`] or [`finish`] soon after pushing a container.
fn push_container(&mut self, container: RefOrMut<Self::Input>);

/// Extract all read data, leaving unfinished data behind.
/// Extract ready data, leaving unfinished data behind.
///
/// Should be called repeatedly until it returns `None`, which indicates that there is no
/// more ready data.
fn extract(&mut self) -> Option<Self::Output>;

/// Unconditionally extract all data, leaving no unfinished data behind.
///
/// Should be called repeatedly until it returns `None`, which indicates that there is no
/// more data.
fn finish(&mut self) -> Option<Self::Output>;
}

/// Chunk a stream of vectors into chains of vectors.
pub struct VecChunker<T> {
pending: Vec<T>,
ready: Vec<Vec<T>>,
ready: VecDeque<Vec<T>>,
}

impl<T> Default for VecChunker<T> {
fn default() -> Self {
Self {
pending: Vec::default(),
ready: Vec::default(),
ready: VecDeque::default(),
}
}
}

impl<T> VecChunker<T> {
impl<K, V, T, R> VecChunker<((K, V), T, R)>
where
K: Ord,
V: Ord,
T: Ord,
R: Semigroup,
{
const BUFFER_SIZE_BYTES: usize = 8 << 10;
fn chunk_capacity(&self) -> usize {
let size = ::std::mem::size_of::<T>();
fn chunk_capacity() -> usize {
let size = ::std::mem::size_of::<((K, V), T, R)>();
if size == 0 {
Self::BUFFER_SIZE_BYTES
} else if size <= Self::BUFFER_SIZE_BYTES {
Expand All @@ -54,8 +67,23 @@ impl<T> VecChunker<T> {
}
}

fn pending_capacity(&self) -> usize {
self.chunk_capacity() * 2
/// Form chunks out of pending data, if needed. This function is meant to be applied to
/// potentially full buffers, and ensures that if the buffer was full when called it is at most
/// half full when the function returns.
///
/// `form_chunk` does the following:
/// * If pending is full, consolidate.
/// * If after consolidation it's more than half full, peel off chunks,
/// leaving behind any partial chunk in pending.
fn form_chunk(&mut self) {
consolidate_updates(&mut self.pending);
if self.pending.len() >= Self::chunk_capacity() {
while self.pending.len() > Self::chunk_capacity() {
let mut chunk = Vec::with_capacity(Self::chunk_capacity());
chunk.extend(self.pending.drain(..chunk.capacity()));
self.ready.push_back(chunk);
}
}
}
}

Expand All @@ -72,28 +100,12 @@ where
fn push_container(&mut self, container: RefOrMut<Self::Input>) {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
// because we don't write more than capacity elements into the buffer.
if self.pending.capacity() < self.pending_capacity() {
self.pending.reserve(self.pending_capacity() - self.pending.len());
// Important: Consolidation requires `pending` to have twice the chunk capacity to
// amortize its cost. Otherwise, it risks to do quadratic work.
if self.pending.capacity() < Self::chunk_capacity() * 2 {
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
}

// Form chunks from what's in pending.
// This closure does the following:
// * If pending is full, consolidate.
// * If after consolidation it's more than half full, peel off chunks,
// leaving behind any partial chunk in pending.
let form_chunk = |this: &mut Self| {
if this.pending.len() == this.pending.capacity() {
consolidate_updates(&mut this.pending);
if this.pending.len() >= this.chunk_capacity() {
while this.pending.len() > this.chunk_capacity() {
let mut chunk = Vec::with_capacity(this.chunk_capacity());
chunk.extend(this.pending.drain(..chunk.capacity()));
this.ready.push(chunk);
}
}
}
};

// `container` is either a shared reference or an owned allocations.
match container {
RefOrMut::Ref(vec) => {
Expand All @@ -102,58 +114,65 @@ where
let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len()));
slice = tail;
self.pending.extend_from_slice(head);
form_chunk(self);
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
RefOrMut::Mut(vec) => {
let mut drain = vec.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
form_chunk(self);
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
}
}

fn extract(&mut self) -> Option<Self::Output> {
self.ready.pop()
self.ready.pop_front()
}

fn finish(&mut self) -> Option<Self::Output> {
if !self.pending.is_empty() {
consolidate_updates(&mut self.pending);
while !self.pending.is_empty() {
let mut chunk = Vec::with_capacity(self.chunk_capacity());
let mut chunk = Vec::with_capacity(Self::chunk_capacity());
chunk.extend(self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())));
self.ready.push(chunk);
self.ready.push_back(chunk);
}
}
self.ready.pop()
self.ready.pop_front()
}
}

/// Chunk a stream of vectors into chains of vectors.
pub struct ColumnationChunker<T: Columnation> {
pending: Vec<T>,
ready: Vec<TimelyStack<T>>,
ready: VecDeque<TimelyStack<T>>,
}

impl<T: Columnation> Default for ColumnationChunker<T> {
fn default() -> Self {
Self {
pending: Vec::default(),
ready: Vec::default(),
ready: VecDeque::default(),
}
}
}

impl<T> ColumnationChunker<T>
impl<K,V,T,R> ColumnationChunker<((K, V), T, R)>
where
T: Columnation,
K: Columnation + Ord,
V: Columnation + Ord,
T: Columnation + Ord,
R: Columnation + Semigroup,
{
const BUFFER_SIZE_BYTES: usize = 64 << 10;
fn chunk_capacity(&self) -> usize {
let size = ::std::mem::size_of::<T>();
fn chunk_capacity() -> usize {
let size = ::std::mem::size_of::<((K, V), T, R)>();
if size == 0 {
Self::BUFFER_SIZE_BYTES
} else if size <= Self::BUFFER_SIZE_BYTES {
Expand All @@ -163,9 +182,25 @@ where
}
}

/// Buffer size for pending updates, currently 2 * [`Self::chunk_capacity`].
fn pending_capacity(&self) -> usize {
self.chunk_capacity() * 2
/// Form chunks out of pending data, if needed. This function is meant to be applied to
/// potentially full buffers, and ensures that if the buffer was full when called it is at most
/// half full when the function returns.
///
/// `form_chunk` does the following:
/// * If pending is full, consolidate.
/// * If after consolidation it's more than half full, peel off chunks,
/// leaving behind any partial chunk in pending.
fn form_chunk(&mut self) {
consolidate_updates(&mut self.pending);
if self.pending.len() >= Self::chunk_capacity() {
while self.pending.len() > Self::chunk_capacity() {
let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
for item in self.pending.drain(..chunk.capacity()) {
chunk.copy(&item);
}
self.ready.push_back(chunk);
}
}
}
}

Expand All @@ -182,30 +217,10 @@ where
fn push_container(&mut self, container: RefOrMut<Self::Input>) {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
// because we don't write more than capacity elements into the buffer.
if self.pending.capacity() < self.pending_capacity() {
self.pending.reserve(self.pending_capacity() - self.pending.len());
if self.pending.capacity() < Self::chunk_capacity() * 2 {
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
}

// Form chunks from what's in pending.
// This closure does the following:
// * If pending is full, consolidate.
// * If after consolidation it's more than half full, peel off chunks,
// leaving behind any partial chunk in pending.
let form_chunk = |this: &mut Self| {
if this.pending.len() == this.pending.capacity() {
consolidate_updates(&mut this.pending);
if this.pending.len() >= this.chunk_capacity() {
while this.pending.len() > this.chunk_capacity() {
let mut chunk = TimelyStack::with_capacity(this.chunk_capacity());
for item in this.pending.drain(..chunk.capacity()) {
chunk.copy(&item);
}
this.ready.push(chunk);
}
}
}
};

// `container` is either a shared reference or an owned allocations.
match container {
RefOrMut::Ref(vec) => {
Expand All @@ -214,33 +229,37 @@ where
let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len()));
slice = tail;
self.pending.extend_from_slice(head);
form_chunk(self);
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
RefOrMut::Mut(vec) => {
let mut drain = vec.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
form_chunk(self);
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
}
}

fn extract(&mut self) -> Option<Self::Output> {
self.ready.pop()
self.ready.pop_front()
}

fn finish(&mut self) -> Option<Self::Output> {
consolidate_updates(&mut self.pending);
while !self.pending.is_empty() {
let mut chunk = TimelyStack::with_capacity(self.chunk_capacity());
let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
for item in self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())) {
chunk.copy(&item);
}
self.ready.push(chunk);
self.ready.push_back(chunk);
}
self.ready.pop()
self.ready.pop_front()
}
}

Expand Down
29 changes: 11 additions & 18 deletions src/trace/implementations/merge_batcher_flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ where
for<'a> T::ReadItem<'a>: Ord,
R: Region,
{
type Key<'a> = K::ReadItem<'a> where TupleABCRegion<TupleABRegion<K, V>, T, R>: 'a;
type Val<'a> = V::ReadItem<'a> where TupleABCRegion<TupleABRegion<K, V>, T, R>: 'a;
type Time<'a> = T::ReadItem<'a> where TupleABCRegion<TupleABRegion<K, V>, T, R>: 'a;
type Diff<'a> = R::ReadItem<'a> where TupleABCRegion<TupleABRegion<K, V>, T, R>: 'a;
type Key<'a> = K::ReadItem<'a> where Self: 'a;
type Val<'a> = V::ReadItem<'a> where Self: 'a;
type Time<'a> = T::ReadItem<'a> where Self: 'a;
type Diff<'a> = R::ReadItem<'a> where Self: 'a;

fn into_parts<'a>(((key, val), time, diff): <TupleABCRegion<TupleABRegion<K, V>, T, R> as Region>::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) {
fn into_parts<'a>(((key, val), time, diff): Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) {
(key, val, time, diff)
}
}
Expand Down Expand Up @@ -119,10 +119,8 @@ where
while !head1.is_empty() && !head2.is_empty() {
while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() {
let cmp = {
let x: FR::ReadItem<'_> = head1.peek();
let (key1, val1, time1, _diff) = FR::into_parts(x);
let y: FR::ReadItem<'_> = head2.peek();
let (key2, val2, time2, _diff) = FR::into_parts(y);
let (key1, val1, time1, _diff) = FR::into_parts(head1.peek());
let (key2, val2, time2, _diff) = FR::into_parts(head2.peek());
((key1, val1), time1).cmp(&((key2, val2), time2))
};
match cmp {
Expand All @@ -133,10 +131,8 @@ where
result.copy(head2.pop());
}
Ordering::Equal => {
let element1 = head1.pop();
let (key, val, time1, diff1) = FR::into_parts(element1);
let element2 = head2.pop();
let (_key, _val, _time2, diff2) = FR::into_parts(element2);
let (key, val, time1, diff1) = FR::into_parts(head1.pop());
let (_key, _val, _time2, diff2) = FR::into_parts(head2.pop());
diff1.clone_onto(&mut diff);
diff.plus_equals(&diff2);
if !diff.is_zero() {
Expand Down Expand Up @@ -201,9 +197,7 @@ where
let mut ready = self.empty(stash);

for buffer in merged {
for element in buffer.iter() {
let (key, val, time, diff) = FR::into_parts(element);
// let time_owned = time.flat_to_owned();
for (key, val, time, diff) in buffer.iter().map(FR::into_parts) {
if upper.less_equal(&time) {
frontier.insert_with(&time, |time| (*time).into_owned());
if keep.len() == keep.capacity() && !keep.is_empty() {
Expand Down Expand Up @@ -243,8 +237,7 @@ where
{
let mut prev_keyval = None;
for buffer in chain.iter() {
for element in buffer.iter() {
let (key, val, time, _) = FR::into_parts(element);
for (key, val, time, _diff) in buffer.iter().map(FR::into_parts) {
if !upper.less_equal(&time) {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
Expand Down

0 comments on commit 8f4cf39

Please sign in to comment.