diff --git a/Cargo.lock b/Cargo.lock index 061f740d0b713..8c2971280d9c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1981,7 +1981,7 @@ dependencies = [ [[package]] name = "differential-dataflow" version = "0.12.0" -source = "git+https://github.com/antiguru/differential-dataflow.git?branch=consolidate_layout_merger_chunk#04a2446fc68a0025529cf676f948f9e8af711aa5" +source = "git+https://github.com/antiguru/differential-dataflow.git?branch=region_update#0697f7a519247a2f2ca08bad28a59617f3fcce72" dependencies = [ "abomonation", "abomonation_derive", @@ -2037,7 +2037,7 @@ checksum = "923dea538cea0aa3025e8685b20d6ee21ef99c4f77e954a30febbaac5ec73a97" [[package]] name = "dogsdogsdogs" version = "0.1.0" -source = "git+https://github.com/antiguru/differential-dataflow.git?branch=consolidate_layout_merger_chunk#04a2446fc68a0025529cf676f948f9e8af711aa5" +source = "git+https://github.com/antiguru/differential-dataflow.git?branch=region_update#0697f7a519247a2f2ca08bad28a59617f3fcce72" dependencies = [ "abomonation", "abomonation_derive", @@ -2418,8 +2418,7 @@ dependencies = [ [[package]] name = "flatcontainer" version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ff185ea156496de196dfd189038982f480515ea3338f1ff0a4fbff1e52ea0a6" +source = "git+https://github.com/antiguru/flatcontainer.git#7dc86fadbbaecd6fa3c549e20e1c46b0e448c778" dependencies = [ "cfg-if", "paste", @@ -9535,7 +9534,7 @@ dependencies = [ [[package]] name = "timely" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#f6a5b3620de2050f123aad57fb40a535ba417869" +source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#3f9d3874c6c5d75e2a9e67af9e78d6c40234be6a" dependencies = [ "abomonation", "abomonation_derive", @@ -9552,12 +9551,12 @@ dependencies = [ [[package]] name = "timely_bytes" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#f6a5b3620de2050f123aad57fb40a535ba417869" +source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#3f9d3874c6c5d75e2a9e67af9e78d6c40234be6a" [[package]] name = "timely_communication" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#f6a5b3620de2050f123aad57fb40a535ba417869" +source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#3f9d3874c6c5d75e2a9e67af9e78d6c40234be6a" dependencies = [ "abomonation", "abomonation_derive", @@ -9573,7 +9572,7 @@ dependencies = [ [[package]] name = "timely_container" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#f6a5b3620de2050f123aad57fb40a535ba417869" +source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#3f9d3874c6c5d75e2a9e67af9e78d6c40234be6a" dependencies = [ "columnation", "flatcontainer", @@ -9583,7 +9582,7 @@ dependencies = [ [[package]] name = "timely_logging" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#f6a5b3620de2050f123aad57fb40a535ba417869" +source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#3f9d3874c6c5d75e2a9e67af9e78d6c40234be6a" [[package]] name = "tiny-keccak" diff --git a/Cargo.toml b/Cargo.toml index e024c7d1cc1be..9ba1f7ee77a69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -267,20 +267,22 @@ debug = 2 # version of Materialize. [patch."https://github.com/TimelyDataflow/timely-dataflow"] # Projects that do not reliably release to crates.io. -timely = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -timely_bytes = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -timely_communication = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -timely_container = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -timely_logging = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } +timely = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_bytes = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_communication = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_container = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_logging = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } [patch.crates-io] # Projects that do not reliably release to crates.io. -timely = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -timely_bytes = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -timely_communication = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -timely_container = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -timely_logging = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -differential-dataflow = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "consolidate_layout_merger_chunk" } -dogsdogsdogs = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "consolidate_layout_merger_chunk" } +timely = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_bytes = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_communication = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_container = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_logging = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +differential-dataflow = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "region_update" } +dogsdogsdogs = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "region_update" } + +flatcontainer = { git = "https://github.com/antiguru/flatcontainer.git" } # Waiting on https://github.com/sfackler/rust-postgres/pull/752. postgres = { git = "https://github.com/MaterializeInc/rust-postgres" } diff --git a/src/compute/src/typedefs.rs b/src/compute/src/typedefs.rs index 90213b1ce8ba1..9be686d79f4a5 100644 --- a/src/compute/src/typedefs.rs +++ b/src/compute/src/typedefs.rs @@ -19,7 +19,9 @@ use differential_dataflow::trace::implementations::merge_batcher_col::Columnatio use differential_dataflow::trace::implementations::ord_neu::{FlatValSpine, OrdValBatch}; use differential_dataflow::trace::wrappers::enter::TraceEnter; use differential_dataflow::trace::wrappers::frontier::TraceFrontier; -use mz_ore::flatcontainer::{MzRegionPreference, MzTupleABCRegion, MzTupleABRegion}; +use mz_ore::flatcontainer::{ + MzOffsetOptimized, MzRegionPreference, MzTupleABCRegion, MzTupleABRegion, +}; use mz_repr::Diff; use mz_storage_types::errors::DataflowError; use timely::dataflow::ScopeParent; @@ -39,7 +41,7 @@ pub(crate) mod spines { use differential_dataflow::trace::implementations::spine_fueled::Spine; use differential_dataflow::trace::implementations::{Layout, Update}; use differential_dataflow::trace::rc_blanket_impls::RcBuilder; - use mz_ore::flatcontainer::MzRegion; + use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegion}; use mz_timely_util::containers::stack::StackWrapper; use timely::container::columnation::{Columnation, TimelyStack}; use timely::container::flatcontainer::FlatStack; @@ -106,60 +108,6 @@ pub(crate) mod spines { type Val = VR::Owned; type Time = TR::Owned; type Diff = RR::Owned; - type ItemRef<'a> = ((Self::KeyGat<'a>, Self::ValGat<'a>), Self::TimeGat<'a>, Self::DiffGat<'a>) - where - Self: 'a; - type KeyGat<'a> = KR::ReadItem<'a> - where - Self: 'a; - type ValGat<'a> = VR::ReadItem<'a> - where - Self: 'a; - type TimeGat<'a> = TR::ReadItem<'a> - where - Self: 'a; - type DiffGat<'a> = RR::ReadItem<'a> - where - Self: 'a; - - fn into_parts<'a>( - ((key, val), time, diff): Self::ItemRef<'a>, - ) -> ( - Self::KeyGat<'a>, - Self::ValGat<'a>, - Self::TimeGat<'a>, - Self::DiffGat<'a>, - ) { - (key, val, time, diff) - } - - fn reborrow_key<'b, 'a: 'b>(item: Self::KeyGat<'a>) -> Self::KeyGat<'b> - where - Self: 'a, - { - KR::reborrow(item) - } - - fn reborrow_val<'b, 'a: 'b>(item: Self::ValGat<'a>) -> Self::ValGat<'b> - where - Self: 'a, - { - VR::reborrow(item) - } - - fn reborrow_time<'b, 'a: 'b>(item: Self::TimeGat<'a>) -> Self::TimeGat<'b> - where - Self: 'a, - { - TR::reborrow(item) - } - - fn reborrow_diff<'b, 'a: 'b>(item: Self::DiffGat<'a>) -> Self::DiffGat<'b> - where - Self: 'a, - { - RR::reborrow(item) - } } /// Layout implementation for [`MzFlatLayout`]. Mostly equivalent to differential's @@ -182,10 +130,10 @@ pub(crate) mod spines { for<'a> RR::ReadItem<'a>: Copy + Ord, { type Target = Self; - type KeyContainer = FlatStack; - type ValContainer = FlatStack; - type TimeContainer = FlatStack; - type DiffContainer = FlatStack; + type KeyContainer = FlatStack; + type ValContainer = FlatStack; + type TimeContainer = FlatStack; + type DiffContainer = FlatStack; type OffsetContainer = OffsetOptimized; } } @@ -234,8 +182,12 @@ pub type KeyValBatcher = MergeBatcher< >; pub type FlatKeyValBatch = OrdValBatch>; -pub type FlatKeyValSpine = - FlatValSpine, MzTupleABCRegion, T, R>, C>; +pub type FlatKeyValSpine = FlatValSpine< + MzFlatLayout, + MzTupleABCRegion, T, R>, + C, + MzOffsetOptimized, +>; pub type FlatKeyValSpineDefault = FlatKeyValSpine< ::Region, ::Region, diff --git a/src/ore/src/flatcontainer.rs b/src/ore/src/flatcontainer.rs index 2a478b5d5fda2..59c5f80593b3a 100644 --- a/src/ore/src/flatcontainer.rs +++ b/src/ore/src/flatcontainer.rs @@ -15,11 +15,12 @@ //! Flat container utilities -use flatcontainer::impls::deduplicate::ConsecutiveOffsetPairs; +use flatcontainer::impls::deduplicate::ConsecutiveIndexPairs; use flatcontainer::{OptionRegion, Push, Region, ReserveItems, StringRegion}; use serde::{Deserialize, Serialize}; pub use item::ItemRegion; +pub use offset::MzOffsetOptimized; pub use tuple::*; /// Associate a type with a flat container region. @@ -252,6 +253,7 @@ mod tuple { mod differential { use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; + use differential_dataflow::trace::implementations::merge_batcher_flat::RegionUpdate; use differential_dataflow::trace::implementations::Update; use timely::progress::Timestamp; @@ -272,52 +274,58 @@ mod tuple { RR::Owned: Clone + Ord + Semigroup, for<'a> RR::ReadItem<'a>: Copy + Ord, { - type KeyGat<'a> = KR::ReadItem<'a> where Self: 'a; - type ValGat<'a> = VR::ReadItem<'a> where Self: 'a; - type TimeGat<'a> = TR::ReadItem<'a> where Self: 'a; + type Key = KR::Owned; + type Val = VR::Owned; type Time = TR::Owned; - type DiffGat<'a> = RR::ReadItem<'a> where Self: 'a; type Diff = RR::Owned; + } + + impl RegionUpdate for MzTupleABCRegion, TR, RR> + where + KR: MzRegion, + for<'a> KR::ReadItem<'a>: Copy + Ord, + VR: MzRegion, + for<'a> VR::ReadItem<'a>: Copy + Ord, + TR: MzRegion, + for<'a> TR::ReadItem<'a>: Copy + Ord, + RR: MzRegion, + for<'a> RR::ReadItem<'a>: Copy + Ord, + { + type Key<'a> = KR::ReadItem<'a> where Self: 'a; + type Val<'a> = VR::ReadItem<'a> where Self: 'a; + type Time<'a> = TR::ReadItem<'a> where Self: 'a; + type TimeOwned = TR::Owned; + type Diff<'a> = RR::ReadItem<'a> where Self: 'a; + type DiffOwned = RR::Owned; fn into_parts<'a>( - ((key, val), time, diff): Self::ItemRef<'a>, - ) -> ( - Self::KeyGat<'a>, - Self::ValGat<'a>, - Self::TimeGat<'a>, - Self::DiffGat<'a>, - ) { + ((key, val), time, diff): Self::ReadItem<'a>, + ) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) { (key, val, time, diff) } - type Key = KR::Owned; - type Val = VR::Owned; - type ItemRef<'a> = ((Self::KeyGat<'a>, Self::ValGat<'a>), Self::TimeGat<'a>, Self::DiffGat<'a>) - where - Self: 'a; - - fn reborrow_key<'b, 'a: 'b>(item: Self::KeyGat<'a>) -> Self::KeyGat<'b> + fn reborrow_key<'b, 'a: 'b>(item: Self::Key<'a>) -> Self::Key<'b> where Self: 'a, { KR::reborrow(item) } - fn reborrow_val<'b, 'a: 'b>(item: Self::ValGat<'a>) -> Self::ValGat<'b> + fn reborrow_val<'b, 'a: 'b>(item: Self::Val<'a>) -> Self::Val<'b> where Self: 'a, { VR::reborrow(item) } - fn reborrow_time<'b, 'a: 'b>(item: Self::TimeGat<'a>) -> Self::TimeGat<'b> + fn reborrow_time<'b, 'a: 'b>(item: Self::Time<'a>) -> Self::Time<'b> where Self: 'a, { TR::reborrow(item) } - fn reborrow_diff<'b, 'a: 'b>(item: Self::DiffGat<'a>) -> Self::DiffGat<'b> + fn reborrow_diff<'b, 'a: 'b>(item: Self::Diff<'a>) -> Self::Diff<'b> where Self: 'a, { @@ -374,7 +382,7 @@ mod copy { impl MzRegionPreference for String { type Owned = String; - type Region = ItemRegion>; + type Region = ItemRegion>; } mod vec { @@ -395,11 +403,11 @@ impl MzRegionPreference for Option { mod lgalloc { //! A region that stores slices of clone types in lgalloc - use flatcontainer::impls::offsets::{OffsetContainer, OffsetOptimized}; - use flatcontainer::{CopyIter, Push, Region, ReserveItems}; - use crate::flatcontainer::MzIndex; use crate::region::LgAllocVec; + use flatcontainer::impls::index::{IndexContainer, IndexOptimized}; + use flatcontainer::impls::storage::Storage; + use flatcontainer::{Push, PushIter, Region, ReserveItems}; /// A container for owned types. /// @@ -426,7 +434,7 @@ mod lgalloc { #[derive(Debug)] pub struct LgAllocOwnedRegion { slices: LgAllocVec, - offsets: OffsetOptimized, + offsets: IndexOptimized, } impl Clone for LgAllocOwnedRegion { @@ -458,7 +466,7 @@ mod lgalloc { { let mut this = Self { slices: LgAllocVec::with_capacity(regions.map(|r| r.slices.len()).sum()), - offsets: OffsetOptimized::default(), + offsets: IndexOptimized::default(), }; this.offsets.push(0); this @@ -510,7 +518,7 @@ mod lgalloc { fn default() -> Self { let mut this = Self { slices: LgAllocVec::default(), - offsets: OffsetOptimized::default(), + offsets: IndexOptimized::default(), }; this.offsets.push(0); this @@ -607,14 +615,14 @@ mod lgalloc { } } - impl> ReserveItems> for LgAllocOwnedRegion + impl> ReserveItems> for LgAllocOwnedRegion where [T]: ToOwned, { #[inline] fn reserve_items(&mut self, items: I) where - I: Iterator> + Clone, + I: Iterator> + Clone, { self.slices .reserve(items.flat_map(|i| i.0.into_iter()).count()); @@ -760,10 +768,11 @@ mod item { mod lgallocvec { //! A vector-like structure that stores its contents in lgalloc. - use flatcontainer::{Push, Region, ReserveItems}; - use crate::flatcontainer::MzIndex; use crate::region::LgAllocVec; + use flatcontainer::impls::storage::Storage; + use flatcontainer::{Push, Region, ReserveItems}; + use timely::container::flatcontainer::impls::index::IndexContainer; impl Region for LgAllocVec { type Owned = T; @@ -806,6 +815,59 @@ mod lgallocvec { } } + impl Storage for LgAllocVec { + fn with_capacity(capacity: usize) -> Self { + Self::with_capacity(capacity) + } + + fn reserve(&mut self, additional: usize) { + self.reserve(additional); + } + + fn clear(&mut self) { + self.clear(); + } + + fn heap_size(&self, callback: F) { + self.heap_size(callback); + } + + fn len(&self) -> usize { + self.len() + } + + fn is_empty(&self) -> bool { + self.is_empty() + } + } + + impl IndexContainer for LgAllocVec { + type Iter<'a> = std::iter::Copied> + where + Self: 'a; + + fn index(&self, index: usize) -> T { + self[index] + } + + fn push(&mut self, item: T) { + self.push(item); + } + + fn extend>(&mut self, iter: I) + where + I::IntoIter: ExactSizeIterator, + { + for item in iter { + self.push(item); + } + } + + fn iter(&self) -> Self::Iter<'_> { + self.iter().copied() + } + } + impl Push for LgAllocVec { fn push(&mut self, item: T) -> Self::Index { self.push(item); @@ -859,3 +921,79 @@ mod lgallocvec { } } } + +mod offset { + use crate::flatcontainer::MzIndex; + use flatcontainer::impls::index::{IndexContainer, IndexOptimized}; + use flatcontainer::impls::storage::Storage; + + /// TODO + #[derive(Default, Clone, Debug)] + pub struct MzOffsetOptimized(IndexOptimized); + + impl Storage for MzOffsetOptimized { + fn with_capacity(capacity: usize) -> Self { + Self(IndexOptimized::with_capacity(capacity)) + } + + fn reserve(&mut self, additional: usize) { + self.0.reserve(additional) + } + + fn clear(&mut self) { + self.0.clear(); + } + + fn heap_size(&self, callback: F) { + self.0.heap_size(callback); + } + + fn len(&self) -> usize { + self.0.len() + } + + fn is_empty(&self) -> bool { + self.0.is_empty() + } + } + + impl IndexContainer for MzOffsetOptimized { + type Iter<'a> = MzOffsetOptimizedIter<>::Iter<'a>> + where + Self: 'a; + + fn index(&self, index: usize) -> MzIndex { + MzIndex(self.0.index(index)) + } + + fn push(&mut self, item: MzIndex) { + self.0.push(item.0); + } + + fn extend>(&mut self, iter: I) + where + I::IntoIter: ExactSizeIterator, + { + self.0.extend(iter.into_iter().map(|item| item.0)); + } + + fn iter(&self) -> Self::Iter<'_> { + MzOffsetOptimizedIter(self.0.iter()) + } + } + + /// TODO + #[derive(Clone, Copy, Debug)] + pub struct MzOffsetOptimizedIter(I); + + impl Iterator for MzOffsetOptimizedIter + where + I: Iterator, + { + type Item = MzIndex; + + fn next(&mut self) -> Option { + self.0.next().map(MzIndex) + } + } +} diff --git a/src/ore/src/region.rs b/src/ore/src/region.rs index 11aa51d526df7..3fa8017a70019 100644 --- a/src/ore/src/region.rs +++ b/src/ore/src/region.rs @@ -496,6 +496,7 @@ mod vec { self.reserve(count); let len = self.len(); unsafe { + #[allow(clippy::as_conversions)] std::ptr::copy_nonoverlapping( slice.as_ptr(), self.elements.as_mut_ptr().add(len) as *const MaybeUninit as *mut T, @@ -512,6 +513,7 @@ mod vec { let len = self.len(); unsafe { data.set_len(0); + #[allow(clippy::as_conversions)] std::ptr::copy_nonoverlapping( data.as_ptr(), self.elements.as_mut_ptr().add(len) as *const MaybeUninit as *mut T, @@ -534,6 +536,11 @@ mod vec { self.length } + /// Returns `true` if the array contains no elements. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + /// The number of elements this array can absorb. pub fn capacity(&self) -> usize { self.elements.len() @@ -588,6 +595,11 @@ mod vec { self.grow(new_len); } } + + /// Iterate over the elements. + pub fn iter(&self) -> std::slice::Iter<'_, T> { + self.deref().iter() + } } impl Clone for LgAllocVec {