Skip to content

Commit

Permalink
CapacityContainer
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jul 9, 2024
1 parent 8d476ef commit bb5836a
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 36 deletions.
37 changes: 2 additions & 35 deletions src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ where
// TODO: Can we replace `multiple` by a bool?
#[cold]
fn consolidate_and_flush_through(&mut self, multiple: usize) {
let preferred_capacity = <Vec<(D,T,R)>>::preferred_capacity();
let preferred_capacity = <Vec<(D,T,R)> as SizableContainer>::preferred_capacity();
consolidate_updates(&mut self.current);
let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
while drain.peek().is_some() {
Expand All @@ -178,7 +178,7 @@ where
/// Precondition: `current` is not allocated or has space for at least one element.
#[inline]
fn push_into(&mut self, item: P) {
let preferred_capacity = <Vec<(D,T,R)>>::preferred_capacity();
let preferred_capacity = <Vec<(D,T,R)> as SizableContainer>::preferred_capacity();
if self.current.capacity() < preferred_capacity * 2 {
self.current.reserve(preferred_capacity * 2 - self.current.capacity());
}
Expand Down Expand Up @@ -253,12 +253,6 @@ pub trait ConsolidateLayout: Container {

/// Compare two items by key to sort containers.
fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;

/// The preferred capacity
fn preferred_capacity() -> usize;

/// Ensure that the container has sufficient capacity to absorb `preferred_capacity` elements.
fn ensure_preferred_capacity(&mut self);
}

impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
Expand All @@ -282,17 +276,6 @@ where
fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) {
self.push((data, time, diff));
}

fn preferred_capacity() -> usize {
<Self as SizableContainer>::preferred_capacity()
}

#[inline]
fn ensure_preferred_capacity(&mut self) {
if self.capacity() < <Self as ConsolidateLayout>::preferred_capacity() {
self.reserve(<Self as ConsolidateLayout>::preferred_capacity() - self.capacity());
}
}
}

mod flatcontainer {
Expand Down Expand Up @@ -334,22 +317,6 @@ mod flatcontainer {
fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) {
self.copy(((key, value), time, diff));
}

fn preferred_capacity() -> usize {
// We don't have a good way to present any pre-defined capacity here, since it's a
// concept foreign to flat containers. Each region might have a capacity, but overall
// the concept of capacity does not exist. For this reason, we just hardcode a number,
// which seems to work reasonably well.
//
// We should revisit this if/once we have an abstraction that can express a capacity
// for `FlatStack`, but we arent' there yet.
1024
}

fn ensure_preferred_capacity(&mut self) {
// Nop, same reasoning as for `preferred_capacity`. We don't know how to ensure capacity
// for a certain number of elements.
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/trace/implementations/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::collections::VecDeque;
use timely::communication::message::RefOrMut;
use timely::Container;
use timely::container::columnation::{Columnation, TimelyStack};
use timely::container::{ContainerBuilder, PushInto};
use timely::container::{CapacityContainer, ContainerBuilder, PushInto};

use crate::consolidation::{consolidate_updates, consolidate_container, ConsolidateLayout};
use crate::difference::Semigroup;

Expand Down Expand Up @@ -292,6 +293,7 @@ impl<'a, Input, Output> PushInto<RefOrMut<'a, Input>> for ContainerChunker<Outpu
where
Input: Container,
Output: ConsolidateLayout
+ CapacityContainer
+ PushInto<Input::Item<'a>>
+ PushInto<Input::ItemRef<'a>>,
{
Expand Down

0 comments on commit bb5836a

Please sign in to comment.