diff --git a/Cargo.lock b/Cargo.lock index 380e82fdaaf6f..5f1d632c92efc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1981,7 +1981,7 @@ dependencies = [ [[package]] name = "differential-dataflow" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/differential-dataflow.git#7760a903c9c451f7cf039a3978994251bafd8721" +source = "git+https://github.com/antiguru/differential-dataflow.git?branch=consolidate_layout_merger_chunk#04a2446fc68a0025529cf676f948f9e8af711aa5" dependencies = [ "abomonation", "abomonation_derive", @@ -2037,7 +2037,7 @@ checksum = "923dea538cea0aa3025e8685b20d6ee21ef99c4f77e954a30febbaac5ec73a97" [[package]] name = "dogsdogsdogs" version = "0.1.0" -source = "git+https://github.com/MaterializeInc/differential-dataflow.git#7760a903c9c451f7cf039a3978994251bafd8721" +source = "git+https://github.com/antiguru/differential-dataflow.git?branch=consolidate_layout_merger_chunk#04a2446fc68a0025529cf676f948f9e8af711aa5" dependencies = [ "abomonation", "abomonation_derive", @@ -2417,9 +2417,9 @@ dependencies = [ [[package]] name = "flatcontainer" -version = "0.4.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcaca60d6093f2c5328fe97b9dfafa16a3968577bc5df75ebd6b23ea79b0a0a4" +checksum = "0ff185ea156496de196dfd189038982f480515ea3338f1ff0a4fbff1e52ea0a6" dependencies = [ "cfg-if", "paste", @@ -5398,6 +5398,7 @@ dependencies = [ "criterion", "ctor", "derivative", + "differential-dataflow", "either", "flatcontainer", "futures", @@ -5429,6 +5430,7 @@ dependencies = [ "serde_json", "smallvec", "stacker", + "timely", "tokio", "tokio-native-tls", "tokio-openssl", @@ -9528,7 +9530,7 @@ dependencies = [ [[package]] name = "timely" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#0c26e5e4198085d6c90db11930f2dba52e9f32cc" +source = "git+https://github.com/MaterializeInc/timely-dataflow.git#f6a5b3620de2050f123aad57fb40a535ba417869" dependencies = [ "abomonation", "abomonation_derive", @@ -9545,12 +9547,12 @@ dependencies = [ [[package]] name = "timely_bytes" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#0c26e5e4198085d6c90db11930f2dba52e9f32cc" +source = "git+https://github.com/MaterializeInc/timely-dataflow.git#f6a5b3620de2050f123aad57fb40a535ba417869" [[package]] name = "timely_communication" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#0c26e5e4198085d6c90db11930f2dba52e9f32cc" +source = "git+https://github.com/MaterializeInc/timely-dataflow.git#f6a5b3620de2050f123aad57fb40a535ba417869" dependencies = [ "abomonation", "abomonation_derive", @@ -9566,7 +9568,7 @@ dependencies = [ [[package]] name = "timely_container" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#0c26e5e4198085d6c90db11930f2dba52e9f32cc" +source = "git+https://github.com/MaterializeInc/timely-dataflow.git#f6a5b3620de2050f123aad57fb40a535ba417869" dependencies = [ "columnation", "flatcontainer", @@ -9576,7 +9578,7 @@ dependencies = [ [[package]] name = "timely_logging" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#0c26e5e4198085d6c90db11930f2dba52e9f32cc" +source = "git+https://github.com/MaterializeInc/timely-dataflow.git#f6a5b3620de2050f123aad57fb40a535ba417869" [[package]] name = "tiny-keccak" diff --git a/Cargo.toml b/Cargo.toml index bf964a6424aa5..d6c90b023c46b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -176,6 +176,13 @@ debug = 2 # tend to get rewritten or disappear (e.g., because a PR is force pushed or gets # merged), after which point it becomes impossible to build that historical # version of Materialize. +[patch."https://github.com/TimelyDataflow/timely-dataflow"] +# Projects that do not reliably release to crates.io. +timely = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } +timely_bytes = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } +timely_communication = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } +timely_container = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } +timely_logging = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } [patch.crates-io] # Projects that do not reliably release to crates.io. timely = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } @@ -183,8 +190,8 @@ timely_bytes = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } timely_communication = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } timely_container = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } timely_logging = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -differential-dataflow = { git = "https://github.com/MaterializeInc/differential-dataflow.git" } -dogsdogsdogs = { git = "https://github.com/MaterializeInc/differential-dataflow.git" } +differential-dataflow = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "consolidate_layout_merger_chunk" } +dogsdogsdogs = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "consolidate_layout_merger_chunk" } # Waiting on https://github.com/sfackler/rust-postgres/pull/752. postgres = { git = "https://github.com/MaterializeInc/rust-postgres" } diff --git a/misc/cargo-vet/audits.toml b/misc/cargo-vet/audits.toml index 514819d385eba..d00085fd87f67 100644 --- a/misc/cargo-vet/audits.toml +++ b/misc/cargo-vet/audits.toml @@ -1,4 +1,3 @@ - # cargo-vet audits file [criteria.maintained-and-necessary] @@ -281,7 +280,7 @@ version = "23.5.26" [[audits.flatcontainer]] who = "Moritz Hoffmann " criteria = "safe-to-deploy" -version = "0.4.1" +version = "0.5.0" [[audits.fluent-uri]] who = "Nikhil Benesch " diff --git a/src/cluster/src/communication.rs b/src/cluster/src/communication.rs index 01298c9df4b49..8598878371794 100644 --- a/src/cluster/src/communication.rs +++ b/src/cluster/src/communication.rs @@ -36,6 +36,7 @@ use std::any::Any; use std::cmp::Ordering; use std::fmt::Display; +use std::sync::Arc; use std::time::Duration; use anyhow::Context; @@ -109,7 +110,7 @@ where } } - match initialize_networking_from_sockets(sockets, process, workers, Box::new(|_| None)) { + match initialize_networking_from_sockets(sockets, process, workers, Arc::new(|_| None)) { Ok((stuff, guard)) => { info!(process = process, "successfully initialized network"); Ok(( diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index 2ae7ed265d57b..689d5ee29410a 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -30,7 +30,7 @@ mz-compute-types = { path = "../compute-types" } mz-dyncfg = { path = "../dyncfg" } mz-dyncfgs = { path = "../dyncfgs" } mz-expr = { path = "../expr" } -mz-ore = { path = "../ore", features = ["async", "flatcontainer", "process", "tracing_"] } +mz-ore = { path = "../ore", features = ["async", "differential", "flatcontainer", "process", "tracing_"] } mz-persist-client = { path = "../persist-client" } mz-persist-types = { path = "../persist-types" } mz-repr = { path = "../repr" } diff --git a/src/compute/src/extensions/arrange.rs b/src/compute/src/extensions/arrange.rs index cc63887c6cd67..949d8b4aab12f 100644 --- a/src/compute/src/extensions/arrange.rs +++ b/src/compute/src/extensions/arrange.rs @@ -23,7 +23,7 @@ use timely::progress::Timestamp; use timely::Container; use crate::logging::compute::ComputeEvent; -use crate::typedefs::{KeyAgent, KeyValAgent, RowAgent, RowRowAgent, RowValAgent}; +use crate::typedefs::{KeyAgent, RowAgent, RowRowAgent, RowValAgent}; /// Extension trait to arrange data. pub trait MzArrange: MzArrangeCore @@ -270,36 +270,6 @@ where } } -impl ArrangementSize for Arranged> -where - G: Scope, - G::Timestamp: Lattice + Ord + Columnation, - K: Data + Columnation, - V: Data + Columnation, - T: Lattice + Timestamp, - R: Semigroup + Ord + Columnation + 'static, -{ - fn log_arrangement_size(self) -> Self { - log_arrangement_size_inner(self, |trace| { - let (mut size, mut capacity, mut allocations) = (0, 0, 0); - let mut callback = |siz, cap| { - size += siz; - capacity += cap; - allocations += usize::from(cap > 0); - }; - trace.map_batches(|batch| { - batch.storage.keys.heap_size(&mut callback); - batch.storage.keys_offs.heap_size(&mut callback); - batch.storage.vals.heap_size(&mut callback); - batch.storage.vals_offs.heap_size(&mut callback); - batch.storage.times.heap_size(&mut callback); - batch.storage.diffs.heap_size(&mut callback); - }); - (size, capacity, allocations) - }) - } -} - impl ArrangementSize for Arranged> where G: Scope, @@ -415,8 +385,8 @@ mod flatcontainer { use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; - use mz_ore::flatcontainer::MzRegionPreference; - use timely::container::flatcontainer::{IntoOwned, Push, Region, ReserveItems}; + use mz_ore::flatcontainer::{MzRegion, MzRegionPreference}; + use timely::container::flatcontainer::{IntoOwned, Region}; use timely::dataflow::Scope; use timely::progress::Timestamp; use timely::PartialOrder; @@ -429,31 +399,10 @@ mod flatcontainer { Self: Clone, G: Scope, G::Timestamp: Lattice + Ord + MzRegionPreference, - K: Region - + Clone - + Push<::Owned> - + for<'a> Push<::ReadItem<'a>> - + for<'a> ReserveItems<::ReadItem<'a>> - + 'static, - V: Region - + Clone - + Push<::Owned> - + for<'a> Push<::ReadItem<'a>> - + for<'a> ReserveItems<::ReadItem<'a>> - + 'static, - T: Region - + Clone - + Push<::Owned> - + for<'a> Push<::ReadItem<'a>> - + for<'a> ReserveItems<::ReadItem<'a>> - + 'static, - R: Region - + Clone - + Push<::Owned> - + for<'a> Push<&'a ::Owned> - + for<'a> Push<::ReadItem<'a>> - + for<'a> ReserveItems<::ReadItem<'a>> - + 'static, + K: MzRegion, + V: MzRegion, + T: MzRegion, + R: MzRegion, K::Owned: Clone + Ord, V::Owned: Clone + Ord, T::Owned: Lattice + for<'a> PartialOrder<::ReadItem<'a>> + Timestamp, diff --git a/src/compute/src/logging/differential.rs b/src/compute/src/logging/differential.rs index da9c28872fbce..a7074e6681d76 100644 --- a/src/compute/src/logging/differential.rs +++ b/src/compute/src/logging/differential.rs @@ -130,7 +130,7 @@ pub(super) fn construct( let stream_to_collection = |input: Stream<_, ((usize, ()), Timestamp, Diff)>, log, name| { let mut packer = PermutedRowPacker::new(log); input - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>( + .mz_arrange_core::<_, KeyValSpine>( Pipeline, &format!("PreArrange Differential {name}"), ) diff --git a/src/compute/src/logging/initialize.rs b/src/compute/src/logging/initialize.rs index 59f4b3197efae..d057737a5b726 100644 --- a/src/compute/src/logging/initialize.rs +++ b/src/compute/src/logging/initialize.rs @@ -16,6 +16,7 @@ use mz_compute_client::logging::{LogVariant, LoggingConfig}; use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion}; use mz_repr::{Diff, Timestamp}; use mz_storage_types::errors::DataflowError; +use mz_timely_util::containers::PreallocatingCapacityContainerBuilder; use mz_timely_util::operator::CollectionExt; use timely::communication::Allocate; use timely::container::flatcontainer::FlatStack; @@ -184,10 +185,8 @@ impl LoggingContext<'_, A> { fn reachability_logger(&self) -> Logger { let event_queue = self.r_event_queue.clone(); - let mut logger = BatchLogger::< - CapacityContainerBuilder>, - _, - >::new(event_queue.link, self.interval_ms); + type CB = PreallocatingCapacityContainerBuilder>; + let mut logger = BatchLogger::::new(event_queue.link, self.interval_ms); Logger::new( self.now, self.start_offset, diff --git a/src/compute/src/logging/reachability.rs b/src/compute/src/logging/reachability.rs index 67988eeea7361..1c94908bed87d 100644 --- a/src/compute/src/logging/reachability.rs +++ b/src/compute/src/logging/reachability.rs @@ -19,10 +19,10 @@ use mz_ore::cast::CastFrom; use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion}; use mz_ore::iter::IteratorExt; use mz_repr::{Datum, Diff, RowArena, SharedRow, Timestamp}; +use mz_timely_util::containers::PreallocatingCapacityContainerBuilder; use mz_timely_util::replay::MzReplay; use timely::communication::Allocate; use timely::container::flatcontainer::FlatStack; -use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; use crate::extensions::arrange::{MzArrange, MzArrangeCore}; @@ -57,7 +57,7 @@ pub(super) fn construct( ); type UpdatesRegion = <((UpdatesKey, ()), Timestamp, Diff) as MzRegionPreference>::Region; - type CB = CapacityContainerBuilder>; + type CB = PreallocatingCapacityContainerBuilder>; let (updates, token) = Some(event_queue.link).mz_replay::<_, CB, _>( scope, "reachability logs", @@ -102,7 +102,7 @@ pub(super) fn construct( ); let updates = - updates.as_collection(move |(update_type, addr, source, port, ts), _| { + updates.as_collection(move |(&update_type, addr, &source, &port, ts), _| { let row_arena = RowArena::default(); let update_type = if update_type { "source" } else { "target" }; let binding = SharedRow::get(); @@ -118,7 +118,7 @@ pub(super) fn construct( Datum::UInt64(u64::cast_from(port)), Datum::UInt64(u64::cast_from(worker_index)), Datum::String(update_type), - Datum::from(ts.clone()), + Datum::from(ts.copied()), ]; row_builder.packer().extend(key.iter().map(|k| datums[*k])); let key_row = row_builder.clone(); diff --git a/src/compute/src/logging/timely.rs b/src/compute/src/logging/timely.rs index 42c95e01c7c03..714052b8af62b 100644 --- a/src/compute/src/logging/timely.rs +++ b/src/compute/src/logging/timely.rs @@ -9,6 +9,7 @@ //! Logging dataflows for events generated by timely dataflow. +use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion}; use std::cell::RefCell; use std::collections::BTreeMap; use std::rc::Rc; @@ -17,6 +18,7 @@ use std::time::Duration; use differential_dataflow::consolidation::ConsolidatingContainerBuilder; use mz_compute_client::logging::LoggingConfig; use mz_ore::cast::CastFrom; +use mz_ore::region::LgAllocVec; use mz_repr::{Datum, Diff, Timestamp}; use mz_timely_util::replay::MzReplay; use serde::{Deserialize, Serialize}; @@ -150,7 +152,10 @@ pub(super) fn construct( // updates that reach `Row` encoding. let mut packer = PermutedRowPacker::new(TimelyLog::Operates); let operates = operates - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>(Pipeline, "PreArrange Timely operates") + .mz_arrange_core::<_, KeyValSpine>( + Pipeline, + "PreArrange Timely operates", + ) .as_collection(move |id, name| { packer.pack_slice(&[ Datum::UInt64(u64::cast_from(*id)), @@ -160,7 +165,10 @@ pub(super) fn construct( }); let mut packer = PermutedRowPacker::new(TimelyLog::Channels); let channels = channels - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>(Pipeline, "PreArrange Timely operates") + .mz_arrange_core::<_, KeyValSpine>( + Pipeline, + "PreArrange Timely operates", + ) .as_collection(move |datum, ()| { let (source_node, source_port) = datum.source; let (target_node, target_port) = datum.target; @@ -176,7 +184,10 @@ pub(super) fn construct( let mut packer = PermutedRowPacker::new(TimelyLog::Addresses); let addresses = addresses - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>(Pipeline, "PreArrange Timely addresses") + .mz_arrange_core::<_, KeyValSpine>, Timestamp, Diff, _>>( + Pipeline, + "PreArrange Timely addresses", + ) .as_collection({ move |id, address| { packer.pack_by_index(|packer, index| match index { @@ -190,7 +201,7 @@ pub(super) fn construct( }); let mut packer = PermutedRowPacker::new(TimelyLog::Parks); let parks = parks - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>(Pipeline, "PreArrange Timely parks") + .mz_arrange_core::<_, KeyValSpine>(Pipeline, "PreArrange Timely parks") .as_collection(move |datum, ()| { packer.pack_slice(&[ Datum::UInt64(u64::cast_from(worker_id)), @@ -203,7 +214,7 @@ pub(super) fn construct( }); let mut packer = PermutedRowPacker::new(TimelyLog::BatchesSent); let batches_sent = batches_sent - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>( + .mz_arrange_core::<_, KeyValSpine>( Pipeline, "PreArrange Timely batches sent", ) @@ -216,7 +227,7 @@ pub(super) fn construct( }); let mut packer = PermutedRowPacker::new(TimelyLog::BatchesReceived); let batches_received = batches_received - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>( + .mz_arrange_core::<_, KeyValSpine>( Pipeline, "PreArrange Timely batches received", ) @@ -229,7 +240,7 @@ pub(super) fn construct( }); let mut packer = PermutedRowPacker::new(TimelyLog::MessagesSent); let messages_sent = messages_sent - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>( + .mz_arrange_core::<_, KeyValSpine>( Pipeline, "PreArrange Timely messages sent", ) @@ -242,7 +253,7 @@ pub(super) fn construct( }); let mut packer = PermutedRowPacker::new(TimelyLog::MessagesReceived); let messages_received = messages_received - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>( + .mz_arrange_core::<_, KeyValSpine>( Pipeline, "PreArrange Timely messages received", ) @@ -255,7 +266,10 @@ pub(super) fn construct( }); let mut packer = PermutedRowPacker::new(TimelyLog::Elapsed); let elapsed = schedules_duration - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>(Pipeline, "PreArrange Timely duration") + .mz_arrange_core::<_, KeyValSpine>( + Pipeline, + "PreArrange Timely duration", + ) .as_collection(move |operator, _| { packer.pack_slice(&[ Datum::UInt64(u64::cast_from(*operator)), @@ -264,7 +278,10 @@ pub(super) fn construct( }); let mut packer = PermutedRowPacker::new(TimelyLog::Histogram); let histogram = schedules_histogram - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>(Pipeline, "PreArrange Timely histogram") + .mz_arrange_core::<_, KeyValSpine>( + Pipeline, + "PreArrange Timely histogram", + ) .as_collection(move |datum, _| { packer.pack_slice(&[ Datum::UInt64(u64::cast_from(datum.operator)), @@ -378,6 +395,11 @@ impl Columnation for ChannelDatum { type InnerRegion = CopyRegion; } +impl MzRegionPreference for ChannelDatum { + type Owned = Self; + type Region = LgAllocVec; +} + #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] struct ParkDatum { duration_pow: u128, @@ -388,6 +410,11 @@ impl Columnation for ParkDatum { type InnerRegion = CopyRegion; } +impl MzRegionPreference for ParkDatum { + type Owned = Self; + type Region = LgAllocVec; +} + #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] struct MessageDatum { channel: usize, @@ -398,6 +425,11 @@ impl Columnation for MessageDatum { type InnerRegion = CopyRegion; } +impl MzRegionPreference for MessageDatum { + type Owned = Self; + type Region = LgAllocVec; +} + #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] struct ScheduleHistogramDatum { operator: usize, @@ -408,6 +440,11 @@ impl Columnation for ScheduleHistogramDatum { type InnerRegion = CopyRegion; } +impl MzRegionPreference for ScheduleHistogramDatum { + type Owned = Self; + type Region = LgAllocVec; +} + /// Event handler of the demux operator. struct DemuxHandler<'a, 'b> { /// State kept by the demux operator. diff --git a/src/compute/src/typedefs.rs b/src/compute/src/typedefs.rs index 1c4736a948a6c..90213b1ce8ba1 100644 --- a/src/compute/src/typedefs.rs +++ b/src/compute/src/typedefs.rs @@ -19,10 +19,9 @@ use differential_dataflow::trace::implementations::merge_batcher_col::Columnatio use differential_dataflow::trace::implementations::ord_neu::{FlatValSpine, OrdValBatch}; use differential_dataflow::trace::wrappers::enter::TraceEnter; use differential_dataflow::trace::wrappers::frontier::TraceFrontier; -use mz_ore::flatcontainer::MzRegionPreference; +use mz_ore::flatcontainer::{MzRegionPreference, MzTupleABCRegion, MzTupleABRegion}; use mz_repr::Diff; use mz_storage_types::errors::DataflowError; -use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use timely::dataflow::ScopeParent; pub use crate::row_spine::{RowRowSpine, RowSpine, RowValSpine}; @@ -40,9 +39,10 @@ pub(crate) mod spines { use differential_dataflow::trace::implementations::spine_fueled::Spine; use differential_dataflow::trace::implementations::{Layout, Update}; use differential_dataflow::trace::rc_blanket_impls::RcBuilder; + use mz_ore::flatcontainer::MzRegion; use mz_timely_util::containers::stack::StackWrapper; use timely::container::columnation::{Columnation, TimelyStack}; - use timely::container::flatcontainer::{FlatStack, Push, Region}; + use timely::container::flatcontainer::FlatStack; use timely::progress::Timestamp; use crate::row_spine::OffsetOptimized; @@ -83,63 +83,109 @@ pub(crate) mod spines { } /// A layout based on flat container stacks - pub struct MzFlatLayout { - phantom: std::marker::PhantomData<(K, V, T, R)>, + pub struct MzFlatLayout { + phantom: std::marker::PhantomData<(KR, VR, TR, RR)>, } - impl Update for MzFlatLayout + impl Update for MzFlatLayout where - K: Region, - V: Region, - T: Region, - R: Region, - K::Owned: Ord + Clone + 'static, - V::Owned: Ord + Clone + 'static, - T::Owned: Ord + Clone + Lattice + Timestamp + 'static, - R::Owned: Ord + Semigroup + 'static, + KR: MzRegion, + VR: MzRegion, + TR: MzRegion, + RR: MzRegion, + KR::Owned: Ord + Clone + 'static, + VR::Owned: Ord + Clone + 'static, + TR::Owned: Ord + Clone + Lattice + Timestamp + 'static, + RR::Owned: Ord + Semigroup + 'static, + for<'a> KR::ReadItem<'a>: Copy + Ord, + for<'a> VR::ReadItem<'a>: Copy + Ord, + for<'a> TR::ReadItem<'a>: Copy + Ord, + for<'a> RR::ReadItem<'a>: Copy + Ord, { - type Key = K::Owned; - type Val = V::Owned; - type Time = T::Owned; - type Diff = R::Owned; + type Key = KR::Owned; + type Val = VR::Owned; + type Time = TR::Owned; + type Diff = RR::Owned; + type ItemRef<'a> = ((Self::KeyGat<'a>, Self::ValGat<'a>), Self::TimeGat<'a>, Self::DiffGat<'a>) + where + Self: 'a; + type KeyGat<'a> = KR::ReadItem<'a> + where + Self: 'a; + type ValGat<'a> = VR::ReadItem<'a> + where + Self: 'a; + type TimeGat<'a> = TR::ReadItem<'a> + where + Self: 'a; + type DiffGat<'a> = RR::ReadItem<'a> + where + Self: 'a; + + fn into_parts<'a>( + ((key, val), time, diff): Self::ItemRef<'a>, + ) -> ( + Self::KeyGat<'a>, + Self::ValGat<'a>, + Self::TimeGat<'a>, + Self::DiffGat<'a>, + ) { + (key, val, time, diff) + } + + fn reborrow_key<'b, 'a: 'b>(item: Self::KeyGat<'a>) -> Self::KeyGat<'b> + where + Self: 'a, + { + KR::reborrow(item) + } + + fn reborrow_val<'b, 'a: 'b>(item: Self::ValGat<'a>) -> Self::ValGat<'b> + where + Self: 'a, + { + VR::reborrow(item) + } + + fn reborrow_time<'b, 'a: 'b>(item: Self::TimeGat<'a>) -> Self::TimeGat<'b> + where + Self: 'a, + { + TR::reborrow(item) + } + + fn reborrow_diff<'b, 'a: 'b>(item: Self::DiffGat<'a>) -> Self::DiffGat<'b> + where + Self: 'a, + { + RR::reborrow(item) + } } /// Layout implementation for [`MzFlatLayout`]. Mostly equivalent to differential's /// flat layout but with a different opinion for the offset container. Here, we use /// [`OffsetOptimized`] instead of an offset list. If differential should gain access /// to the optimized variant, we might be able to remove this implementation. - impl Layout for MzFlatLayout + impl Layout for MzFlatLayout where - K: Region - + Push<::Owned> - + for<'a> Push<::ReadItem<'a>> - + 'static, - V: Region - + Push<::Owned> - + for<'a> Push<::ReadItem<'a>> - + 'static, - T: Region - + Push<::Owned> - + for<'a> Push<::ReadItem<'a>> - + 'static, - R: Region - + Push<::Owned> - + for<'a> Push<::ReadItem<'a>> - + 'static, - K::Owned: Ord + Clone + 'static, - V::Owned: Ord + Clone + 'static, - T::Owned: Ord + Clone + Lattice + Timestamp + 'static, - R::Owned: Ord + Semigroup + 'static, - for<'a> K::ReadItem<'a>: Copy + Ord, - for<'a> V::ReadItem<'a>: Copy + Ord, - for<'a> T::ReadItem<'a>: Copy + Ord, - for<'a> R::ReadItem<'a>: Copy + Ord, + KR: MzRegion, + VR: MzRegion, + TR: MzRegion, + RR: MzRegion, + KR::Owned: Ord + Clone + 'static, + VR::Owned: Ord + Clone + 'static, + TR::Owned: Ord + Clone + Lattice + Timestamp + 'static, + RR::Owned: Ord + Semigroup + 'static, + for<'a> KR::ReadItem<'a>: Copy + Ord, + for<'a> VR::ReadItem<'a>: Copy + Ord, + for<'a> TR::ReadItem<'a>: Copy + Ord, + for<'a> RR::ReadItem<'a>: Copy + Ord, { type Target = Self; - type KeyContainer = FlatStack; - type ValContainer = FlatStack; - type TimeContainer = FlatStack; - type DiffContainer = FlatStack; + type KeyContainer = FlatStack; + type ValContainer = FlatStack; + type TimeContainer = FlatStack; + type DiffContainer = FlatStack; type OffsetContainer = OffsetOptimized; } } @@ -148,10 +194,10 @@ pub(crate) mod spines { // Agents are wrappers around spines that allow shared read access. // Fully generic spines and agents. -pub type KeyValSpine = ColValSpine; -pub type KeyValAgent = TraceAgent>; -pub type KeyValEnter = - TraceEnter>, TEnter>; +pub type KeyValSpine = FlatKeyValSpineDefault; +pub type KeyValAgent = TraceAgent>; +pub type KeyValEnter = + TraceEnter>, TEnter>; // Fully generic key-only spines and agents pub type KeySpine = ColKeySpine; @@ -189,7 +235,7 @@ pub type KeyValBatcher = MergeBatcher< pub type FlatKeyValBatch = OrdValBatch>; pub type FlatKeyValSpine = - FlatValSpine, TupleABCRegion, T, R>, C>; + FlatValSpine, MzTupleABCRegion, T, R>, C>; pub type FlatKeyValSpineDefault = FlatKeyValSpine< ::Region, ::Region, diff --git a/src/ore/Cargo.toml b/src/ore/Cargo.toml index f13219d524289..49ec1dcf7aa57 100644 --- a/src/ore/Cargo.toml +++ b/src/ore/Cargo.toml @@ -27,8 +27,9 @@ clap = { version = "3.2.24", features = ["env"], optional = true } compact_bytes = { version = "0.1.2", optional = true } ctor = { version = "0.1.26", optional = true } derivative = { version = "2.2.0", optional = true } +differential-dataflow = { version = "0.12.0", optional = true } either = "1.8.0" -flatcontainer = { version = "0.4.1", optional = true } +flatcontainer = { version = "0.5.0", optional = true } futures = { version = "0.3.25", optional = true } hibitset = { version = "0.6.4", optional = true } lgalloc = { version = "0.3", optional = true } @@ -50,6 +51,7 @@ smallvec = { version = "1.10.0", optional = true } stacker = { version = "0.1.15", optional = true } sentry = { version = "0.29.1", optional = true, features = ["debug-images"] } serde = { version = "1.0.152", features = ["derive"] } +timely = { version = "0.12.0", default-features = false, features = ["bincode"], optional = true } tokio = { version = "1.38.0", features = [ "io-util", "net", @@ -115,6 +117,8 @@ async = [ "tokio", "tracing", ] +differential = ["differential-dataflow", "flatcontainer_", "timely"] +flatcontainer_ = ["flatcontainer", "region"] bytes_ = ["bytes", "compact_bytes", "smallvec", "smallvec/const_generics", "region", "tracing_"] network = ["async", "bytes", "hyper", "smallvec", "tonic", "tracing"] process = ["libc"] diff --git a/src/ore/src/flatcontainer.rs b/src/ore/src/flatcontainer.rs index fb3850faaa7f0..f73d59851cc29 100644 --- a/src/ore/src/flatcontainer.rs +++ b/src/ore/src/flatcontainer.rs @@ -15,17 +15,54 @@ //! Flat container utilities -use flatcontainer::{Push, Region, ReserveItems}; +use flatcontainer::impls::deduplicate::ConsecutiveOffsetPairs; +use flatcontainer::{OptionRegion, Push, Region, ReserveItems, StringRegion}; +use serde::{Deserialize, Serialize}; + +pub use item::ItemRegion; +pub use tuple::*; /// Associate a type with a flat container region. pub trait MzRegionPreference: 'static { /// The owned type of the container. type Owned; /// A region that can hold `Self`. - type Region: for<'a> Region - + Push - + for<'a> Push<::ReadItem<'a>> - + for<'a> ReserveItems<::ReadItem<'a>>; + type Region: MzRegion; +} + +/// TODO +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct MzIndex(usize); + +impl std::ops::Deref for MzIndex { + type Target = usize; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// TODO +pub trait MzRegion: + Region + + Push<::Owned> + + for<'a> Push<&'a ::Owned> + + for<'a> Push<::ReadItem<'a>> + + for<'a> ReserveItems<::ReadItem<'a>> + + Clone + + 'static +{ +} + +impl MzRegion for R where + R: Region + + Push<::Owned> + + for<'a> Push<&'a ::Owned> + + for<'a> Push<::ReadItem<'a>> + + for<'a> ReserveItems<::ReadItem<'a>> + + Clone + + 'static +{ } /// Opinion indicating that the contents of a collection should be stored in an @@ -36,31 +73,262 @@ pub trait MzRegionPreference: 'static { pub struct OwnedRegionOpinion(std::marker::PhantomData); mod tuple { - use flatcontainer::impls::tuple::*; + use flatcontainer::{Index, Push, Region, ReserveItems}; use paste::paste; - use crate::flatcontainer::MzRegionPreference; + use crate::flatcontainer::{MzIndex, MzRegion, MzRegionPreference}; + /// The macro creates the region implementation for tuples macro_rules! tuple_flatcontainer { - ($($name:ident)+) => ( - paste! { - impl<$($name: MzRegionPreference),*> MzRegionPreference for ($($name,)*) { - type Owned = ($($name::Owned,)*); - type Region = []<$($name::Region,)*>; + ($($name:ident)+) => ( + paste! { + impl<$($name: MzRegionPreference),*> MzRegionPreference for ($($name,)*) { + type Owned = ($($name::Owned,)*); + type Region = []<$($name::Region,)*>; + } + + /// A region for a tuple. + #[allow(non_snake_case)] + #[derive(Default, Debug)] + #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] + pub struct []<$($name),*> { + $([]: $name),* + } + + #[allow(non_snake_case)] + impl<$($name: MzRegion),*> Clone for []<$($name),*> + where + $(<$name as Region>::Index: Index),* + { + fn clone(&self) -> Self { + Self { + $([]: self.[].clone(),)* + } + } + + fn clone_from(&mut self, source: &Self) { + $(self.[].clone_from(&source.[]);)* } } - ) - } + + #[allow(non_snake_case)] + impl<$($name: MzRegion),*> Region for []<$($name),*> + where + $(<$name as Region>::Index: Index),* + { + type Owned = ($($name::Owned,)*); + type ReadItem<'a> = ($($name::ReadItem<'a>,)*) where Self: 'a; + + type Index = MzIndex; + + #[inline] + fn merge_regions<'a>(regions: impl Iterator + Clone) -> Self + where + Self: 'a, + { + Self { + $([]: $name::merge_regions(regions.clone().map(|r| &r.[]))),* + } + } + + #[inline] fn index(&self, index: Self::Index) -> Self::ReadItem<'_> { + ( + $(self.[].index(index),)* + ) + } + + #[inline(always)] + fn reserve_regions<'a, It>(&mut self, regions: It) + where + Self: 'a, + It: Iterator + Clone, + { + $(self.[].reserve_regions(regions.clone().map(|r| &r.[]));)* + } + + #[inline(always)] + fn clear(&mut self) { + $(self.[].clear();)* + } + + #[inline] + fn heap_size(&self, mut callback: Fn) { + $(self.[].heap_size(&mut callback);)* + } + + #[inline] + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> where Self: 'a { + let ($($name,)*) = item; + ( + $($name::reborrow($name),)* + ) + } + } + + #[allow(non_camel_case_types)] + #[allow(non_snake_case)] + impl<$($name, [<$name _C>]: MzRegion ),*> Push<($($name,)*)> for []<$([<$name _C>]),*> + where + $([<$name _C>]: Push<$name>),* + { + #[inline] + fn push(&mut self, item: ($($name,)*)) + -> <[]<$([<$name _C>]),*> as Region>::Index { + let ($($name,)*) = item; + $(let _index = self.[].push($name);)* + _index + } + } + + #[allow(non_camel_case_types)] + #[allow(non_snake_case)] + impl<'a, $($name, [<$name _C>]),*> Push<&'a ($($name,)*)> for []<$([<$name _C>]),*> + where + $([<$name _C>]: MzRegion + Push<&'a $name>),* + { + #[inline] + fn push(&mut self, item: &'a ($($name,)*)) + -> <[]<$([<$name _C>]),*> as Region>::Index { + let ($($name,)*) = item; + $(let _index = self.[].push($name);)* + _index + } + } + + #[allow(non_camel_case_types)] + #[allow(non_snake_case)] + impl<'a, $($name, [<$name _C>]),*> ReserveItems<&'a ($($name,)*)> for []<$([<$name _C>]),*> + where + $([<$name _C>]: MzRegion + ReserveItems<&'a $name>),* + { + #[inline] + fn reserve_items(&mut self, items: It) + where + It: Iterator + Clone, + { + tuple_flatcontainer!(reserve_items self items $($name)* @ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31); + } + } + + #[allow(non_camel_case_types)] + #[allow(non_snake_case)] + impl<$($name, [<$name _C>]),*> ReserveItems<($($name,)*)> for []<$([<$name _C>]),*> + where + $([<$name _C>]: MzRegion + ReserveItems<$name>),* + { + #[inline] + fn reserve_items(&mut self, items: It) + where + It: Iterator + Clone, + { + tuple_flatcontainer!(reserve_items_owned self items $($name)* @ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31); + } + } + } + ); + (reserve_items $self:ident $items:ident $name0:ident $($name:ident)* @ $num0:tt $($num:tt)*) => { + paste! { + $self.[].reserve_items($items.clone().map(|i| &i.$num0)); + tuple_flatcontainer!(reserve_items $self $items $($name)* @ $($num)*); + } + }; + (reserve_items $self:ident $items:ident @ $($num:tt)*) => {}; + (reserve_items_owned $self:ident $items:ident $name0:ident $($name:ident)* @ $num0:tt $($num:tt)*) => { + paste! { + $self.[].reserve_items($items.clone().map(|i| i.$num0)); + tuple_flatcontainer!(reserve_items_owned $self $items $($name)* @ $($num)*); + } + }; + (reserve_items_owned $self:ident $items:ident @ $($num:tt)*) => {}; +} tuple_flatcontainer!(A); tuple_flatcontainer!(A B); tuple_flatcontainer!(A B C); tuple_flatcontainer!(A B C D); tuple_flatcontainer!(A B C D E); + + #[cfg(feature = "differential")] + mod differential { + use differential_dataflow::difference::Semigroup; + use differential_dataflow::lattice::Lattice; + use differential_dataflow::trace::implementations::Update; + use timely::progress::Timestamp; + + use crate::flatcontainer::{MzRegion, MzTupleABCRegion, MzTupleABRegion}; + + impl Update for MzTupleABCRegion, TR, RR> + where + KR: MzRegion, + KR::Owned: Clone + Ord, + for<'a> KR::ReadItem<'a>: Copy + Ord, + VR: MzRegion, + VR::Owned: Clone + Ord, + for<'a> VR::ReadItem<'a>: Copy + Ord, + TR: MzRegion, + TR::Owned: Clone + Lattice + Ord + Timestamp, + for<'a> TR::ReadItem<'a>: Copy + Ord, + RR: MzRegion, + RR::Owned: Clone + Ord + Semigroup, + for<'a> RR::ReadItem<'a>: Copy + Ord, + { + type KeyGat<'a> = KR::ReadItem<'a> where Self: 'a; + type ValGat<'a> = VR::ReadItem<'a> where Self: 'a; + type TimeGat<'a> = TR::ReadItem<'a> where Self: 'a; + type Time = TR::Owned; + type DiffGat<'a> = RR::ReadItem<'a> where Self: 'a; + type Diff = RR::Owned; + + fn into_parts<'a>( + ((key, val), time, diff): Self::ItemRef<'a>, + ) -> ( + Self::KeyGat<'a>, + Self::ValGat<'a>, + Self::TimeGat<'a>, + Self::DiffGat<'a>, + ) { + (key, val, time, diff) + } + + type Key = KR::Owned; + type Val = VR::Owned; + type ItemRef<'a> = ((Self::KeyGat<'a>, Self::ValGat<'a>), Self::TimeGat<'a>, Self::DiffGat<'a>) + where + Self: 'a; + + fn reborrow_key<'b, 'a: 'b>(item: Self::KeyGat<'a>) -> Self::KeyGat<'b> + where + Self: 'a, + { + KR::reborrow(item) + } + + fn reborrow_val<'b, 'a: 'b>(item: Self::ValGat<'a>) -> Self::ValGat<'b> + where + Self: 'a, + { + VR::reborrow(item) + } + + fn reborrow_time<'b, 'a: 'b>(item: Self::TimeGat<'a>) -> Self::TimeGat<'b> + where + Self: 'a, + { + TR::reborrow(item) + } + + fn reborrow_diff<'b, 'a: 'b>(item: Self::DiffGat<'a>) -> Self::DiffGat<'b> + where + Self: 'a, + { + RR::reborrow(item) + } + } + } } mod copy { - use flatcontainer::MirrorRegion; + use crate::region::LgAllocVec; use crate::flatcontainer::MzRegionPreference; @@ -68,7 +336,7 @@ mod copy { ($index_type:ty) => { impl MzRegionPreference for $index_type { type Owned = Self; - type Region = MirrorRegion; + type Region = LgAllocVec; } }; } @@ -104,6 +372,11 @@ mod copy { implement_for!(std::time::Duration); } +impl MzRegionPreference for String { + type Owned = String; + type Region = ItemRegion>; +} + mod vec { use crate::flatcontainer::lgalloc::LgAllocOwnedRegion; use crate::flatcontainer::{MzRegionPreference, OwnedRegionOpinion}; @@ -115,16 +388,19 @@ mod vec { } impl MzRegionPreference for Option { - type Owned = as Region>::Owned; - type Region = flatcontainer::OptionRegion; + type Owned = as Region>::Owned; + type Region = ItemRegion>; } mod lgalloc { //! A region that stores slices of clone types in lgalloc - use crate::region::LgAllocVec; + use flatcontainer::impls::offsets::{OffsetContainer, OffsetOptimized}; use flatcontainer::{CopyIter, Push, Region, ReserveItems}; + use crate::flatcontainer::MzIndex; + use crate::region::LgAllocVec; + /// A container for owned types. /// /// The container can absorb any type, and stores an owned version of the type, similarly to what @@ -150,17 +426,20 @@ mod lgalloc { #[derive(Debug)] pub struct LgAllocOwnedRegion { slices: LgAllocVec, + offsets: OffsetOptimized, } impl Clone for LgAllocOwnedRegion { fn clone(&self) -> Self { Self { slices: self.slices.clone(), + offsets: self.offsets.clone(), } } fn clone_from(&mut self, source: &Self) { self.slices.clone_from(&source.slices); + self.offsets.clone_from(&source.offsets); } } @@ -170,20 +449,25 @@ mod lgalloc { { type Owned = <[T] as ToOwned>::Owned; type ReadItem<'a> = &'a [T] where Self: 'a; - type Index = (usize, usize); + type Index = MzIndex; #[inline] fn merge_regions<'a>(regions: impl Iterator + Clone) -> Self where Self: 'a, { - Self { + let mut this = Self { slices: LgAllocVec::with_capacity(regions.map(|r| r.slices.len()).sum()), - } + offsets: OffsetOptimized::default(), + }; + this.offsets.push(0); + this } #[inline] - fn index(&self, (start, end): Self::Index) -> Self::ReadItem<'_> { + fn index(&self, index: Self::Index) -> Self::ReadItem<'_> { + let start = self.offsets.index(*index); + let end = self.offsets.index(*index + 1); &self.slices[start..end] } @@ -222,18 +506,21 @@ mod lgalloc { impl Default for LgAllocOwnedRegion { #[inline] fn default() -> Self { - Self { + let mut this = Self { slices: LgAllocVec::default(), - } + offsets: OffsetOptimized::default(), + }; + this.offsets.push(0); + this } } impl Push<&[T; N]> for LgAllocOwnedRegion { #[inline] fn push(&mut self, item: &[T; N]) -> as Region>::Index { - let start = self.slices.len(); self.slices.extend_from_slice(item); - (start, self.slices.len()) + self.offsets.push(self.slices.len()); + MzIndex(self.offsets.len() - 2) } } @@ -257,9 +544,9 @@ mod lgalloc { impl Push<&[T]> for LgAllocOwnedRegion { #[inline] fn push(&mut self, item: &[T]) -> as Region>::Index { - let start = self.slices.len(); self.slices.extend_from_slice(item); - (start, self.slices.len()) + self.offsets.push(self.slices.len()); + MzIndex(self.offsets.len() - 2) } } @@ -292,9 +579,9 @@ mod lgalloc { { #[inline] fn push(&mut self, mut item: Vec) -> as Region>::Index { - let start = self.slices.len(); self.slices.append(&mut item); - (start, self.slices.len()) + self.offsets.push(self.slices.len()); + MzIndex(self.offsets.len() - 2) } } @@ -334,19 +621,11 @@ mod lgalloc { #[cfg(test)] mod tests { - use crate::{CopyIter, Push, Region, ReserveItems}; + use flatcontainer::{Push, Region, ReserveItems}; use super::*; - #[test] - fn test_copy_array() { - let mut r = >::default(); - r.reserve_items(std::iter::once(&[1; 4])); - let index = r.push([1; 4]); - assert_eq!([1, 1, 1, 1], r.index(index)); - } - - #[test] + #[crate::test] fn test_copy_ref_ref_array() { let mut r = >::default(); ReserveItems::reserve_items(&mut r, std::iter::once(&[1; 4])); @@ -354,7 +633,7 @@ mod lgalloc { assert_eq!([1, 1, 1, 1], r.index(index)); } - #[test] + #[crate::test] fn test_copy_vec() { let mut r = >::default(); ReserveItems::reserve_items(&mut r, std::iter::once(&vec![1; 4])); @@ -363,13 +642,218 @@ mod lgalloc { let index = r.push(vec![2; 4]); assert_eq!([2, 2, 2, 2], r.index(index)); } + } +} - #[test] - fn test_copy_iter() { - let mut r = >::default(); - r.reserve_items(std::iter::once(CopyIter(std::iter::repeat(1).take(4)))); - let index = r.push(CopyIter(std::iter::repeat(1).take(4))); - assert_eq!([1, 1, 1, 1], r.index(index)); +mod item { + //! A region that stores indexes in lgalloc, converting indexes to [`MzIndex`]. + use flatcontainer::{Push, Region, ReserveItems}; + + use crate::flatcontainer::MzIndex; + use crate::region::LgAllocVec; + + /// TODO + pub struct ItemRegion { + inner: R, + storage: LgAllocVec, + } + + impl std::fmt::Debug for ItemRegion { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ItemRegion").finish_non_exhaustive() + } + } + + impl Clone for ItemRegion { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + storage: self.storage.clone(), + } + } + + fn clone_from(&mut self, source: &Self) { + self.inner.clone_from(&source.inner); + self.storage.clone_from(&source.storage); + } + } + + impl Default for ItemRegion { + fn default() -> Self { + Self { + inner: R::default(), + storage: LgAllocVec::default(), + } + } + } + + impl Region for ItemRegion { + type Owned = R::Owned; + type ReadItem<'a> = R::ReadItem<'a> + where + Self: 'a; + type Index = MzIndex; + + fn merge_regions<'a>(regions: impl Iterator + Clone) -> Self + where + Self: 'a, + { + Self { + inner: R::merge_regions(regions.clone().map(|r| &r.inner)), + storage: LgAllocVec::with_capacity(regions.map(|r| r.storage.len()).sum()), + } + } + + fn index(&self, index: Self::Index) -> Self::ReadItem<'_> { + self.inner.index(self.storage[*index]) + } + + fn reserve_regions<'a, I>(&mut self, regions: I) + where + Self: 'a, + I: Iterator + Clone, + { + self.inner + .reserve_regions(regions.clone().map(|r| &r.inner)); + self.storage.reserve(regions.map(|r| r.storage.len()).sum()); + } + + fn clear(&mut self) { + self.inner.clear(); + self.storage.clear(); + } + + fn heap_size(&self, mut callback: F) { + self.inner.heap_size(&mut callback); + self.storage.heap_size(callback); + } + + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> + where + Self: 'a, + { + R::reborrow(item) + } + } + + impl, T> Push for ItemRegion { + fn push(&mut self, item: T) -> Self::Index { + let index = self.inner.push(item); + self.storage.push(index); + MzIndex(self.storage.len() - 1) + } + } + + impl, T> ReserveItems for ItemRegion { + fn reserve_items(&mut self, items: I) + where + I: Iterator + Clone, + { + self.inner.reserve_items(items.clone()); + self.storage.reserve(items.count()); + } + } +} + +mod lgallocvec { + //! A vector-like structure that stores its contents in lgalloc. + + use flatcontainer::{Push, Region, ReserveItems}; + + use crate::flatcontainer::MzIndex; + use crate::region::LgAllocVec; + + impl Region for LgAllocVec { + type Owned = T; + type ReadItem<'a> = &'a T where Self: 'a; + type Index = MzIndex; + + fn merge_regions<'a>(regions: impl Iterator + Clone) -> Self + where + Self: 'a, + { + Self::with_capacity(regions.map(LgAllocVec::len).sum()) + } + + fn index(&self, index: Self::Index) -> Self::ReadItem<'_> { + &self[*index] + } + + fn reserve_regions<'a, I>(&mut self, regions: I) + where + Self: 'a, + I: Iterator + Clone, + { + self.reserve(regions.map(LgAllocVec::len).sum()); + } + + fn clear(&mut self) { + self.clear(); + } + + fn heap_size(&self, mut callback: F) { + let size_of_t = std::mem::size_of::(); + callback(self.len() * size_of_t, self.capacity() * size_of_t); + } + + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> + where + Self: 'a, + { + item + } + } + + impl Push for LgAllocVec { + fn push(&mut self, item: T) -> Self::Index { + self.push(item); + MzIndex(self.len() - 1) + } + } + + impl Push<&T> for LgAllocVec { + fn push(&mut self, item: &T) -> Self::Index { + self.push(item.clone()); + MzIndex(self.len() - 1) + } + } + + impl Push<&&T> for LgAllocVec { + fn push(&mut self, item: &&T) -> Self::Index { + self.push((*item).clone()); + MzIndex(self.len() - 1) + } + } + + impl ReserveItems for LgAllocVec { + fn reserve_items(&mut self, items: I) + where + I: Iterator + Clone, + { + self.reserve(items.count()); + } + } + + #[cfg(test)] + mod tests { + #[crate::test] + fn vec() { + use flatcontainer::{Push, Region, ReserveItems}; + + use crate::region::LgAllocVec; + + let mut region = LgAllocVec::::default(); + let index = <_ as Push<_>>::push(&mut region, 42); + assert_eq!(region.index(index), &42); + + let mut region = LgAllocVec::::default(); + region.push(42); + region.push(43); + region.push(44); + region.reserve_items([1, 2, 3].iter()); + assert_eq!(region.index(0), &42); + assert_eq!(region.index(1), &43); + assert_eq!(region.index(2), &44); } } } diff --git a/src/ore/src/lib.rs b/src/ore/src/lib.rs index f771740382898..518cf43b7ef7c 100644 --- a/src/ore/src/lib.rs +++ b/src/ore/src/lib.rs @@ -40,7 +40,7 @@ pub mod codegen; pub mod collections; pub mod env; pub mod error; -#[cfg(feature = "flatcontainer")] +#[cfg(feature = "flatcontainer_")] pub mod flatcontainer; pub mod fmt; #[cfg_attr(nightly_doc_features, doc(cfg(feature = "async")))] diff --git a/src/ore/src/region.rs b/src/ore/src/region.rs index af803a9068325..58e72f66ff12a 100644 --- a/src/ore/src/region.rs +++ b/src/ore/src/region.rs @@ -399,7 +399,6 @@ mod vec { use std::fmt::{Debug, Formatter}; use std::mem::{ManuallyDrop, MaybeUninit}; use std::ops::Deref; - use std::ptr; /// A fixed-length region in memory, which is either allocated from heap or lgalloc. pub struct LgAllocVec { @@ -479,7 +478,7 @@ mod vec { self.reserve(lower.saturating_add(1)); } unsafe { - ptr::write( + std::ptr::write( self.elements.as_mut_ptr().add(len), MaybeUninit::new(element), ); @@ -497,7 +496,7 @@ mod vec { self.reserve(count); let len = self.len(); unsafe { - ptr::copy_nonoverlapping( + std::ptr::copy_nonoverlapping( slice.as_ptr(), self.elements.as_mut_ptr().add(len) as *const MaybeUninit as *mut T, count, @@ -513,7 +512,7 @@ mod vec { let len = self.len(); unsafe { data.set_len(0); - ptr::copy_nonoverlapping( + std::ptr::copy_nonoverlapping( data.as_ptr(), self.elements.as_mut_ptr().add(len) as *const MaybeUninit as *mut T, count, @@ -530,6 +529,11 @@ mod vec { self.length = length; } + /// The number of elements in the array. + pub fn len(&self) -> usize { + self.length + } + /// The number of elements this array can absorb. pub fn capacity(&self) -> usize { self.elements.len() @@ -553,6 +557,11 @@ mod vec { /// Grow the array to at least `new_len` elements. Reallocates the underlying storage. fn grow(&mut self, new_len: usize) { let new_capacity = std::cmp::max(self.capacity() * 2, new_len); + println!( + "Reallocating {} -> {}, requested {new_len}", + self.capacity(), + new_capacity + ); let mut new_vec = LgAllocVec::with_capacity(new_capacity); let src_ptr = self.elements.as_ptr(); @@ -643,7 +652,7 @@ mod vec { use super::*; - #[mz_ore::test] + #[crate::test] fn double_drop() { static DROP_COUNT: AtomicUsize = AtomicUsize::new(0); struct DropGuard; diff --git a/src/repr/Cargo.toml b/src/repr/Cargo.toml index d9b3f8ebe1df2..feff320a7dab7 100644 --- a/src/repr/Cargo.toml +++ b/src/repr/Cargo.toml @@ -40,14 +40,14 @@ differential-dataflow = "0.12.0" enum_dispatch = "0.3.11" enum-kinds = "0.5.1" fast-float = "0.2.0" -flatcontainer = "0.4.1" +flatcontainer = "0.5.0" hex = "0.4.3" itertools = "0.10.5" once_cell = "1.16.0" mz-lowertest = { path = "../lowertest" } mz-ore = { path = "../ore", features = [ "bytes_", - "flatcontainer", + "flatcontainer_", "id_gen", "smallvec", "region", diff --git a/src/repr/src/timestamp.rs b/src/repr/src/timestamp.rs index 46dc27b30faac..f071a181f5acc 100644 --- a/src/repr/src/timestamp.rs +++ b/src/repr/src/timestamp.rs @@ -461,14 +461,15 @@ impl columnation::Columnation for Timestamp { } mod flatcontainer { - use flatcontainer::{IntoOwned, MirrorRegion}; + use flatcontainer::IntoOwned; use mz_ore::flatcontainer::MzRegionPreference; + use mz_ore::region::LgAllocVec; use crate::Timestamp; impl MzRegionPreference for Timestamp { type Owned = Self; - type Region = MirrorRegion; + type Region = LgAllocVec; } impl<'a> IntoOwned<'a> for Timestamp { diff --git a/src/timely-util/src/containers.rs b/src/timely-util/src/containers.rs index 9431900d8edfd..e851810b0e72c 100644 --- a/src/timely-util/src/containers.rs +++ b/src/timely-util/src/containers.rs @@ -9,5 +9,90 @@ //! Reusable containers. +use std::collections::VecDeque; + +use timely::container::flatcontainer::{FlatStack, Push, Region}; +use timely::container::{ContainerBuilder, PushInto, SizableContainer}; +use timely::Container; + pub mod array; pub mod stack; + +/// A container builder that uses length and preferred capacity to chunk data. Preallocates the next +/// container based on the capacity of the previous one once a container is full. +/// +/// Ideally, we'd have a `TryPush` trait that would fail if a push would cause a reallocation, but +/// we aren't there yet. +/// +/// Maintains a single empty allocation between [`Self::push_into`] and [`Self::extract`], but not +/// across [`Self::finish`] to maintain a low memory footprint. +/// +/// Maintains FIFO order. +#[derive(Default, Debug)] +pub struct PreallocatingCapacityContainerBuilder { + /// Container that we're writing to. + current: C, + /// Emtpy allocation. + empty: Option, + /// Completed containers pending to be sent. + pending: VecDeque, +} + +impl PushInto for PreallocatingCapacityContainerBuilder> +where + R: Region + Push + Clone + 'static, +{ + #[inline] + fn push_into(&mut self, item: T) { + if self.current.capacity() == 0 { + self.current = self.empty.take().unwrap_or_default(); + // Protect against non-emptied containers. + self.current.clear(); + } + // Ensure capacity + let preferred_capacity = FlatStack::::preferred_capacity(); + if self.current.capacity() < preferred_capacity { + self.current + .reserve(preferred_capacity - self.current.len()); + } + + // Push item + self.current.push(item); + + // Maybe flush + if self.current.len() == self.current.capacity() { + let pending = std::mem::take(&mut self.current); + self.current = FlatStack::merge_capacity(std::iter::once(&pending)); + self.current + .reserve(preferred_capacity.saturating_sub(self.current.len())); + self.pending.push_back(pending); + } + } +} + +impl ContainerBuilder for PreallocatingCapacityContainerBuilder> +where + R: Region + Clone + 'static, +{ + type Container = FlatStack; + + #[inline] + fn extract(&mut self) -> Option<&mut Self::Container> { + self.empty = Some(self.pending.pop_front()?); + self.empty.as_mut() + } + + #[inline] + fn finish(&mut self) -> Option<&mut Self::Container> { + if !self.current.is_empty() { + let pending = std::mem::take(&mut self.current); + self.current = FlatStack::merge_capacity(std::iter::once(&pending)); + let preferred_capacity = FlatStack::::preferred_capacity(); + self.current + .reserve(preferred_capacity.saturating_sub(self.current.len())); + self.pending.push_back(pending); + } + self.empty = self.pending.pop_front(); + self.empty.as_mut() + } +}