diff --git a/Cargo.lock b/Cargo.lock index 8c2971280d9c9..bff655522ef48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1981,7 +1981,7 @@ dependencies = [ [[package]] name = "differential-dataflow" version = "0.12.0" -source = "git+https://github.com/antiguru/differential-dataflow.git?branch=region_update#0697f7a519247a2f2ca08bad28a59617f3fcce72" +source = "git+https://github.com/antiguru/differential-dataflow.git?branch=region_update#bb5836a50e5233795cf3efcc042f8945f78b4350" dependencies = [ "abomonation", "abomonation_derive", @@ -2037,7 +2037,7 @@ checksum = "923dea538cea0aa3025e8685b20d6ee21ef99c4f77e954a30febbaac5ec73a97" [[package]] name = "dogsdogsdogs" version = "0.1.0" -source = "git+https://github.com/antiguru/differential-dataflow.git?branch=region_update#0697f7a519247a2f2ca08bad28a59617f3fcce72" +source = "git+https://github.com/antiguru/differential-dataflow.git?branch=region_update#bb5836a50e5233795cf3efcc042f8945f78b4350" dependencies = [ "abomonation", "abomonation_derive", @@ -9551,12 +9551,12 @@ dependencies = [ [[package]] name = "timely_bytes" version = "0.12.0" -source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#3f9d3874c6c5d75e2a9e67af9e78d6c40234be6a" +source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#63103bd5f7584fd77b3256580df229189cbcd314" [[package]] name = "timely_communication" version = "0.12.0" -source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#3f9d3874c6c5d75e2a9e67af9e78d6c40234be6a" +source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#63103bd5f7584fd77b3256580df229189cbcd314" dependencies = [ "abomonation", "abomonation_derive", @@ -9572,7 +9572,7 @@ dependencies = [ [[package]] name = "timely_container" version = "0.12.0" -source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#3f9d3874c6c5d75e2a9e67af9e78d6c40234be6a" +source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#63103bd5f7584fd77b3256580df229189cbcd314" dependencies = [ "columnation", "flatcontainer", @@ -9582,7 +9582,7 @@ dependencies = [ [[package]] name = "timely_logging" version = "0.12.0" -source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#3f9d3874c6c5d75e2a9e67af9e78d6c40234be6a" +source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#63103bd5f7584fd77b3256580df229189cbcd314" [[package]] name = "tiny-keccak" diff --git a/src/compute/src/logging/initialize.rs b/src/compute/src/logging/initialize.rs index d057737a5b726..b5a5eb5c093ae 100644 --- a/src/compute/src/logging/initialize.rs +++ b/src/compute/src/logging/initialize.rs @@ -13,7 +13,7 @@ use std::time::{Duration, Instant}; use differential_dataflow::logging::DifferentialEvent; use differential_dataflow::Collection; use mz_compute_client::logging::{LogVariant, LoggingConfig}; -use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion}; +use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion}; use mz_repr::{Diff, Timestamp}; use mz_storage_types::errors::DataflowError; use mz_timely_util::containers::PreallocatingCapacityContainerBuilder; @@ -97,7 +97,7 @@ struct LoggingContext<'a, A: Allocate> { now: Instant, start_offset: Duration, t_event_queue: EventQueue>, - r_event_queue: EventQueue>, + r_event_queue: EventQueue>, d_event_queue: EventQueue>, c_event_queue: EventQueue>, shared_state: Rc>, @@ -185,7 +185,9 @@ impl LoggingContext<'_, A> { fn reachability_logger(&self) -> Logger { let event_queue = self.r_event_queue.clone(); - type CB = PreallocatingCapacityContainerBuilder>; + type CB = PreallocatingCapacityContainerBuilder< + FlatStack, + >; let mut logger = BatchLogger::::new(event_queue.link, self.interval_ms); Logger::new( self.now, diff --git a/src/compute/src/logging/reachability.rs b/src/compute/src/logging/reachability.rs index 1c94908bed87d..0edfc612ba27c 100644 --- a/src/compute/src/logging/reachability.rs +++ b/src/compute/src/logging/reachability.rs @@ -16,7 +16,7 @@ use std::rc::Rc; use mz_compute_client::logging::LoggingConfig; use mz_expr::{permutation_for_arrangement, MirScalarExpr}; use mz_ore::cast::CastFrom; -use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion}; +use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion}; use mz_ore::iter::IteratorExt; use mz_repr::{Datum, Diff, RowArena, SharedRow, Timestamp}; use mz_timely_util::containers::PreallocatingCapacityContainerBuilder; @@ -39,7 +39,7 @@ use crate::typedefs::{FlatKeyValSpineDefault, RowRowSpine}; pub(super) fn construct( worker: &mut timely::worker::Worker, config: &LoggingConfig, - event_queue: EventQueue>, + event_queue: EventQueue>, ) -> BTreeMap { let interval_ms = std::cmp::max(1, config.interval.as_millis()); let worker_index = worker.index(); @@ -57,7 +57,8 @@ pub(super) fn construct( ); type UpdatesRegion = <((UpdatesKey, ()), Timestamp, Diff) as MzRegionPreference>::Region; - type CB = PreallocatingCapacityContainerBuilder>; + type CB = + PreallocatingCapacityContainerBuilder>; let (updates, token) = Some(event_queue.link).mz_replay::<_, CB, _>( scope, "reachability logs", diff --git a/src/compute/src/logging/timely.rs b/src/compute/src/logging/timely.rs index 8d9029ade5fa3..ce329ca4b0c20 100644 --- a/src/compute/src/logging/timely.rs +++ b/src/compute/src/logging/timely.rs @@ -14,16 +14,16 @@ use std::collections::BTreeMap; use std::rc::Rc; use std::time::Duration; -use differential_dataflow::consolidation::ConsolidatingContainerBuilder; use mz_compute_client::logging::LoggingConfig; use mz_ore::cast::CastFrom; -use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion}; +use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion}; use mz_ore::region::LgAllocVec; use mz_repr::{Datum, Diff, Timestamp}; +use mz_timely_util::containers::PreallocatingCapacityContainerBuilder; use mz_timely_util::replay::MzReplay; use serde::{Deserialize, Serialize}; use timely::communication::Allocate; -use timely::container::columnation::{Columnation, CopyRegion}; +use timely::container::flatcontainer::FlatStack; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::channels::pushers::buffer::Session; @@ -361,10 +361,12 @@ struct MessageCount { records: i64, } -type Pusher = - Counter, Tee>>; +type FlatStackFor = + FlatStack<<(D, Timestamp, Diff) as MzRegionPreference>::Region, MzOffsetOptimized>; + +type Pusher = Counter, Tee>>; type OutputSession<'a, D> = - Session<'a, Timestamp, ConsolidatingContainerBuilder>, Pusher>; + Session<'a, Timestamp, PreallocatingCapacityContainerBuilder>, Pusher>; /// Bundled output buffers used by the demux operator. // @@ -374,7 +376,7 @@ type OutputSession<'a, D> = struct DemuxOutput<'a> { operates: OutputSession<'a, (usize, String)>, channels: OutputSession<'a, (ChannelDatum, ())>, - addresses: OutputSession<'a, (usize, Vec)>, + addresses: OutputSession<'a, (usize, OwnedRegionOpinion>)>, parks: OutputSession<'a, (ParkDatum, ())>, batches_sent: OutputSession<'a, (MessageDatum, ())>, batches_received: OutputSession<'a, (MessageDatum, ())>, diff --git a/src/ore/src/flatcontainer.rs b/src/ore/src/flatcontainer.rs index e5a457de3a972..8350efb94ca2c 100644 --- a/src/ore/src/flatcontainer.rs +++ b/src/ore/src/flatcontainer.rs @@ -911,13 +911,13 @@ mod lgallocvec { assert_eq!(region.index(index), &42); let mut region = LgAllocVec::::default(); - region.push(42); - region.push(43); - region.push(44); + let i0 = <_ as Push<_>>::push(&mut region, 42); + let i1 = <_ as Push<_>>::push(&mut region, 43); + let i2 = <_ as Push<_>>::push(&mut region, 44); region.reserve_items([1, 2, 3].iter()); - assert_eq!(region.index(0), &42); - assert_eq!(region.index(1), &43); - assert_eq!(region.index(2), &44); + assert_eq!(region.index(i0), &42); + assert_eq!(region.index(i1), &43); + assert_eq!(region.index(i2), &44); } } } diff --git a/src/timely-util/src/containers.rs b/src/timely-util/src/containers.rs index e851810b0e72c..521daf1560e32 100644 --- a/src/timely-util/src/containers.rs +++ b/src/timely-util/src/containers.rs @@ -11,8 +11,9 @@ use std::collections::VecDeque; +use timely::container::flatcontainer::impls::index::IndexContainer; use timely::container::flatcontainer::{FlatStack, Push, Region}; -use timely::container::{ContainerBuilder, PushInto, SizableContainer}; +use timely::container::{CapacityContainer, ContainerBuilder, PushInto}; use timely::Container; pub mod array; @@ -31,50 +32,50 @@ pub mod stack; #[derive(Default, Debug)] pub struct PreallocatingCapacityContainerBuilder { /// Container that we're writing to. - current: C, + current: Option, /// Emtpy allocation. empty: Option, /// Completed containers pending to be sent. pending: VecDeque, } -impl PushInto for PreallocatingCapacityContainerBuilder> +impl PushInto for PreallocatingCapacityContainerBuilder> where R: Region + Push + Clone + 'static, + S: IndexContainer + Clone + 'static, + FlatStack: CapacityContainer, { #[inline] fn push_into(&mut self, item: T) { - if self.current.capacity() == 0 { - self.current = self.empty.take().unwrap_or_default(); - // Protect against non-emptied containers. - self.current.clear(); - } - // Ensure capacity - let preferred_capacity = FlatStack::::preferred_capacity(); - if self.current.capacity() < preferred_capacity { - self.current - .reserve(preferred_capacity - self.current.len()); + if self.current.is_none() { + let mut empty = self.empty.take().unwrap_or_default(); + empty.clear(); + self.current = Some(empty); } + let current = self.current.as_mut().unwrap(); + + // Ensure capacity + current.ensure_preferred_capacity(); // Push item - self.current.push(item); + current.push(item); // Maybe flush - if self.current.len() == self.current.capacity() { - let pending = std::mem::take(&mut self.current); - self.current = FlatStack::merge_capacity(std::iter::once(&pending)); - self.current - .reserve(preferred_capacity.saturating_sub(self.current.len())); + if current.len() >= FlatStack::::preferred_capacity() { + let pending = std::mem::take(current); + *current = FlatStack::merge_capacity(std::iter::once(&pending)); self.pending.push_back(pending); } } } -impl ContainerBuilder for PreallocatingCapacityContainerBuilder> +impl ContainerBuilder for PreallocatingCapacityContainerBuilder> where R: Region + Clone + 'static, + S: IndexContainer + Clone + 'static, + FlatStack: CapacityContainer, { - type Container = FlatStack; + type Container = FlatStack; #[inline] fn extract(&mut self) -> Option<&mut Self::Container> { @@ -84,12 +85,11 @@ where #[inline] fn finish(&mut self) -> Option<&mut Self::Container> { - if !self.current.is_empty() { - let pending = std::mem::take(&mut self.current); - self.current = FlatStack::merge_capacity(std::iter::once(&pending)); - let preferred_capacity = FlatStack::::preferred_capacity(); - self.current - .reserve(preferred_capacity.saturating_sub(self.current.len())); + let current = self.current.as_mut(); + if current.as_ref().map_or(false, |c| !c.is_empty()) { + let current = current.unwrap(); + let pending = std::mem::take(current); + *current = FlatStack::merge_capacity(std::iter::once(&pending)); self.pending.push_back(pending); } self.empty = self.pending.pop_front();