Skip to content

Commit

Permalink
Make it work
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jul 22, 2024
1 parent 254e823 commit 95cfd7e
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 53 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions src/compute/src/logging/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use differential_dataflow::dynamic::pointstamp::PointStamp;
use differential_dataflow::logging::DifferentialEvent;
use differential_dataflow::Collection;
use mz_compute_client::logging::{LogVariant, LoggingConfig};
use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion};
use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion};
use mz_repr::{Diff, Timestamp};
use mz_storage_operators::persist_source::Subtime;
use mz_storage_types::errors::DataflowError;
Expand Down Expand Up @@ -100,7 +100,7 @@ struct LoggingContext<'a, A: Allocate> {
now: Instant,
start_offset: Duration,
t_event_queue: EventQueue<Vec<(Duration, WorkerIdentifier, TimelyEvent)>>,
r_event_queue: EventQueue<FlatStack<ReachabilityEventRegion>>,
r_event_queue: EventQueue<FlatStack<ReachabilityEventRegion, MzOffsetOptimized>>,
d_event_queue: EventQueue<Vec<(Duration, WorkerIdentifier, DifferentialEvent)>>,
c_event_queue: EventQueue<Vec<(Duration, WorkerIdentifier, ComputeEvent)>>,
shared_state: Rc<RefCell<SharedLoggingState>>,
Expand Down Expand Up @@ -188,7 +188,9 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {

fn reachability_logger(&self) -> Logger<TrackerEvent> {
let event_queue = self.r_event_queue.clone();
type CB = PreallocatingCapacityContainerBuilder<FlatStack<ReachabilityEventRegion>>;
type CB = PreallocatingCapacityContainerBuilder<
FlatStack<ReachabilityEventRegion, MzOffsetOptimized>,
>;
let mut logger = BatchLogger::<CB, _>::new(event_queue.link, self.interval_ms);
Logger::new(
self.now,
Expand Down
7 changes: 4 additions & 3 deletions src/compute/src/logging/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::rc::Rc;
use mz_compute_client::logging::LoggingConfig;
use mz_expr::{permutation_for_arrangement, MirScalarExpr};
use mz_ore::cast::CastFrom;
use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion};
use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion};
use mz_ore::iter::IteratorExt;
use mz_repr::{Datum, Diff, RowArena, SharedRow, Timestamp};
use mz_timely_util::containers::PreallocatingCapacityContainerBuilder;
Expand All @@ -39,7 +39,7 @@ use crate::typedefs::{FlatKeyValSpineDefault, RowRowSpine};
pub(super) fn construct<A: Allocate>(
worker: &mut timely::worker::Worker<A>,
config: &LoggingConfig,
event_queue: EventQueue<FlatStack<ReachabilityEventRegion>>,
event_queue: EventQueue<FlatStack<ReachabilityEventRegion, MzOffsetOptimized>>,
) -> BTreeMap<LogVariant, LogCollection> {
let interval_ms = std::cmp::max(1, config.interval.as_millis());
let worker_index = worker.index();
Expand All @@ -57,7 +57,8 @@ pub(super) fn construct<A: Allocate>(
);
type UpdatesRegion = <((UpdatesKey, ()), Timestamp, Diff) as MzRegionPreference>::Region;

type CB = PreallocatingCapacityContainerBuilder<FlatStack<UpdatesRegion>>;
type CB =
PreallocatingCapacityContainerBuilder<FlatStack<UpdatesRegion, MzOffsetOptimized>>;
let (updates, token) = Some(event_queue.link).mz_replay::<_, CB, _>(
scope,
"reachability logs",
Expand Down
16 changes: 9 additions & 7 deletions src/compute/src/logging/timely.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ use std::collections::BTreeMap;
use std::rc::Rc;
use std::time::Duration;

use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
use mz_compute_client::logging::LoggingConfig;
use mz_ore::cast::CastFrom;
use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion};
use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion};
use mz_ore::region::LgAllocVec;
use mz_repr::{Datum, Diff, Timestamp};
use mz_timely_util::containers::PreallocatingCapacityContainerBuilder;
use mz_timely_util::replay::MzReplay;
use serde::{Deserialize, Serialize};
use timely::communication::Allocate;
use timely::container::columnation::{Columnation, CopyRegion};
use timely::container::flatcontainer::FlatStack;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::channels::pushers::buffer::Session;
Expand Down Expand Up @@ -361,10 +361,12 @@ struct MessageCount {
records: i64,
}

type Pusher<D> =
Counter<Timestamp, Vec<(D, Timestamp, Diff)>, Tee<Timestamp, Vec<(D, Timestamp, Diff)>>>;
type FlatStackFor<D> =
FlatStack<<(D, Timestamp, Diff) as MzRegionPreference>::Region, MzOffsetOptimized>;

type Pusher<D> = Counter<Timestamp, FlatStackFor<D>, Tee<Timestamp, FlatStackFor<D>>>;
type OutputSession<'a, D> =
Session<'a, Timestamp, ConsolidatingContainerBuilder<Vec<(D, Timestamp, Diff)>>, Pusher<D>>;
Session<'a, Timestamp, PreallocatingCapacityContainerBuilder<FlatStackFor<D>>, Pusher<D>>;

/// Bundled output buffers used by the demux operator.
//
Expand All @@ -374,7 +376,7 @@ type OutputSession<'a, D> =
struct DemuxOutput<'a> {
operates: OutputSession<'a, (usize, String)>,
channels: OutputSession<'a, (ChannelDatum, ())>,
addresses: OutputSession<'a, (usize, Vec<usize>)>,
addresses: OutputSession<'a, (usize, OwnedRegionOpinion<Vec<usize>>)>,
parks: OutputSession<'a, (ParkDatum, ())>,
batches_sent: OutputSession<'a, (MessageDatum, ())>,
batches_received: OutputSession<'a, (MessageDatum, ())>,
Expand Down
12 changes: 6 additions & 6 deletions src/ore/src/flatcontainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,13 +911,13 @@ mod lgallocvec {
assert_eq!(region.index(index), &42);

let mut region = LgAllocVec::<u32>::default();
region.push(42);
region.push(43);
region.push(44);
let i0 = <_ as Push<_>>::push(&mut region, 42);
let i1 = <_ as Push<_>>::push(&mut region, 43);
let i2 = <_ as Push<_>>::push(&mut region, 44);
region.reserve_items([1, 2, 3].iter());
assert_eq!(region.index(0), &42);
assert_eq!(region.index(1), &43);
assert_eq!(region.index(2), &44);
assert_eq!(region.index(i0), &42);
assert_eq!(region.index(i1), &43);
assert_eq!(region.index(i2), &44);
}
}
}
Expand Down
54 changes: 27 additions & 27 deletions src/timely-util/src/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
use std::collections::VecDeque;

use timely::container::flatcontainer::impls::index::IndexContainer;
use timely::container::flatcontainer::{FlatStack, Push, Region};
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
use timely::container::{CapacityContainer, ContainerBuilder, PushInto};
use timely::Container;

pub mod array;
Expand All @@ -31,50 +32,50 @@ pub mod stack;
#[derive(Default, Debug)]
pub struct PreallocatingCapacityContainerBuilder<C> {
/// Container that we're writing to.
current: C,
current: Option<C>,
/// Emtpy allocation.
empty: Option<C>,
/// Completed containers pending to be sent.
pending: VecDeque<C>,
}

impl<T, R> PushInto<T> for PreallocatingCapacityContainerBuilder<FlatStack<R>>
impl<T, R, S> PushInto<T> for PreallocatingCapacityContainerBuilder<FlatStack<R, S>>
where
R: Region + Push<T> + Clone + 'static,
S: IndexContainer<R::Index> + Clone + 'static,
FlatStack<R, S>: CapacityContainer,
{
#[inline]
fn push_into(&mut self, item: T) {
if self.current.capacity() == 0 {
self.current = self.empty.take().unwrap_or_default();
// Protect against non-emptied containers.
self.current.clear();
}
// Ensure capacity
let preferred_capacity = FlatStack::<R>::preferred_capacity();
if self.current.capacity() < preferred_capacity {
self.current
.reserve(preferred_capacity - self.current.len());
if self.current.is_none() {
let mut empty = self.empty.take().unwrap_or_default();
empty.clear();
self.current = Some(empty);
}

let current = self.current.as_mut().unwrap();

// Ensure capacity
current.ensure_preferred_capacity();
// Push item
self.current.push(item);
current.push(item);

// Maybe flush
if self.current.len() == self.current.capacity() {
let pending = std::mem::take(&mut self.current);
self.current = FlatStack::merge_capacity(std::iter::once(&pending));
self.current
.reserve(preferred_capacity.saturating_sub(self.current.len()));
if current.len() >= FlatStack::<R, S>::preferred_capacity() {
let pending = std::mem::take(current);
*current = FlatStack::merge_capacity(std::iter::once(&pending));
self.pending.push_back(pending);
}
}
}

impl<R> ContainerBuilder for PreallocatingCapacityContainerBuilder<FlatStack<R>>
impl<R, S> ContainerBuilder for PreallocatingCapacityContainerBuilder<FlatStack<R, S>>
where
R: Region + Clone + 'static,
S: IndexContainer<R::Index> + Clone + 'static,
FlatStack<R, S>: CapacityContainer,
{
type Container = FlatStack<R>;
type Container = FlatStack<R, S>;

#[inline]
fn extract(&mut self) -> Option<&mut Self::Container> {
Expand All @@ -84,12 +85,11 @@ where

#[inline]
fn finish(&mut self) -> Option<&mut Self::Container> {
if !self.current.is_empty() {
let pending = std::mem::take(&mut self.current);
self.current = FlatStack::merge_capacity(std::iter::once(&pending));
let preferred_capacity = FlatStack::<R>::preferred_capacity();
self.current
.reserve(preferred_capacity.saturating_sub(self.current.len()));
let current = self.current.as_mut();
if current.as_ref().map_or(false, |c| !c.is_empty()) {
let current = current.unwrap();
let pending = std::mem::take(current);
*current = FlatStack::merge_capacity(std::iter::once(&pending));
self.pending.push_back(pending);
}
self.empty = self.pending.pop_front();
Expand Down

0 comments on commit 95cfd7e

Please sign in to comment.