diff --git a/src/compute/src/extensions/arrange.rs b/src/compute/src/extensions/arrange.rs index 949d8b4aab12f..22e46ab88ea5f 100644 --- a/src/compute/src/extensions/arrange.rs +++ b/src/compute/src/extensions/arrange.rs @@ -385,7 +385,7 @@ mod flatcontainer { use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; - use mz_ore::flatcontainer::{MzRegion, MzRegionPreference}; + use mz_ore::flatcontainer::{MzIndex, MzRegion, MzRegionPreference}; use timely::container::flatcontainer::{IntoOwned, Region}; use timely::dataflow::Scope; use timely::progress::Timestamp; @@ -399,10 +399,10 @@ mod flatcontainer { Self: Clone, G: Scope, G::Timestamp: Lattice + Ord + MzRegionPreference, - K: MzRegion, - V: MzRegion, - T: MzRegion, - R: MzRegion, + K: MzRegion, + V: MzRegion, + T: MzRegion, + R: MzRegion, K::Owned: Clone + Ord, V::Owned: Clone + Ord, T::Owned: Lattice + for<'a> PartialOrder<::ReadItem<'a>> + Timestamp, diff --git a/src/compute/src/logging/differential.rs b/src/compute/src/logging/differential.rs index a7074e6681d76..a92612d35c9ae 100644 --- a/src/compute/src/logging/differential.rs +++ b/src/compute/src/logging/differential.rs @@ -136,7 +136,7 @@ pub(super) fn construct( ) .as_collection(move |op, ()| { packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(*op)), + Datum::UInt64(u64::cast_from(op)), Datum::UInt64(u64::cast_from(worker_id)), ]) }) diff --git a/src/compute/src/logging/initialize.rs b/src/compute/src/logging/initialize.rs index 9bf241304dcaf..715d169c7a524 100644 --- a/src/compute/src/logging/initialize.rs +++ b/src/compute/src/logging/initialize.rs @@ -10,11 +10,15 @@ use std::collections::BTreeMap; use std::rc::Rc; use std::time::{Duration, Instant}; +use crate::arrangement::manager::TraceBundle; +use crate::extensions::arrange::{KeyCollection, MzArrange}; +use crate::logging::compute::ComputeEvent; +use crate::logging::{BatchLogger, EventQueue, SharedLoggingState}; use differential_dataflow::dynamic::pointstamp::PointStamp; use differential_dataflow::logging::DifferentialEvent; use differential_dataflow::Collection; use mz_compute_client::logging::{LogVariant, LoggingConfig}; -use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion}; +use mz_ore::flatcontainer::{ItemRegion, MzIndexOptimized, MzRegionPreference, OwnedRegionOpinion}; use mz_repr::{Diff, Timestamp}; use mz_storage_operators::persist_source::Subtime; use mz_storage_types::errors::DataflowError; @@ -27,11 +31,6 @@ use timely::logging::{Logger, ProgressEventTimestamp, TimelyEvent, WorkerIdentif use timely::order::Product; use timely::progress::reachability::logging::TrackerEvent; -use crate::arrangement::manager::TraceBundle; -use crate::extensions::arrange::{KeyCollection, MzArrange}; -use crate::logging::compute::ComputeEvent; -use crate::logging::{BatchLogger, EventQueue, SharedLoggingState}; - /// Initialize logging dataflows. /// /// Returns a logger for compute events, and for each `LogVariant` a trace bundle usable for @@ -87,11 +86,13 @@ type ReachabilityEventRegionPreference = ( OwnedRegionOpinion>, OwnedRegionOpinion, Diff)>>, ); -pub(super) type ReachabilityEventRegion = <( - Duration, - WorkerIdentifier, - ReachabilityEventRegionPreference, -) as MzRegionPreference>::Region; +pub(super) type ReachabilityEventRegion = ItemRegion< + <( + Duration, + WorkerIdentifier, + ReachabilityEventRegionPreference, + ) as MzRegionPreference>::Region, +>; struct LoggingContext<'a, A: Allocate> { worker: &'a mut timely::worker::Worker, @@ -100,7 +101,7 @@ struct LoggingContext<'a, A: Allocate> { now: Instant, start_offset: Duration, t_event_queue: EventQueue>, - r_event_queue: EventQueue>, + r_event_queue: EventQueue>, d_event_queue: EventQueue>, c_event_queue: EventQueue>, shared_state: Rc>, @@ -189,7 +190,7 @@ impl LoggingContext<'_, A> { fn reachability_logger(&self) -> Logger { let event_queue = self.r_event_queue.clone(); type CB = PreallocatingCapacityContainerBuilder< - FlatStack, + FlatStack, >; let mut logger = BatchLogger::::new(event_queue.link, self.interval_ms); Logger::new( diff --git a/src/compute/src/logging/reachability.rs b/src/compute/src/logging/reachability.rs index 0edfc612ba27c..76b83e7c8eeca 100644 --- a/src/compute/src/logging/reachability.rs +++ b/src/compute/src/logging/reachability.rs @@ -16,7 +16,7 @@ use std::rc::Rc; use mz_compute_client::logging::LoggingConfig; use mz_expr::{permutation_for_arrangement, MirScalarExpr}; use mz_ore::cast::CastFrom; -use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion}; +use mz_ore::flatcontainer::{ItemRegion, MzIndexOptimized, MzRegionPreference, OwnedRegionOpinion}; use mz_ore::iter::IteratorExt; use mz_repr::{Datum, Diff, RowArena, SharedRow, Timestamp}; use mz_timely_util::containers::PreallocatingCapacityContainerBuilder; @@ -39,7 +39,7 @@ use crate::typedefs::{FlatKeyValSpineDefault, RowRowSpine}; pub(super) fn construct( worker: &mut timely::worker::Worker, config: &LoggingConfig, - event_queue: EventQueue>, + event_queue: EventQueue>, ) -> BTreeMap { let interval_ms = std::cmp::max(1, config.interval.as_millis()); let worker_index = worker.index(); @@ -55,10 +55,10 @@ pub(super) fn construct( usize, Option, ); - type UpdatesRegion = <((UpdatesKey, ()), Timestamp, Diff) as MzRegionPreference>::Region; + type UpdatesRegion = + ItemRegion<<((UpdatesKey, ()), Timestamp, Diff) as MzRegionPreference>::Region>; - type CB = - PreallocatingCapacityContainerBuilder>; + type CB = PreallocatingCapacityContainerBuilder>; let (updates, token) = Some(event_queue.link).mz_replay::<_, CB, _>( scope, "reachability logs", @@ -103,7 +103,7 @@ pub(super) fn construct( ); let updates = - updates.as_collection(move |(&update_type, addr, &source, &port, ts), _| { + updates.as_collection(move |(update_type, addr, source, port, ts), _| { let row_arena = RowArena::default(); let update_type = if update_type { "source" } else { "target" }; let binding = SharedRow::get(); @@ -119,7 +119,7 @@ pub(super) fn construct( Datum::UInt64(u64::cast_from(port)), Datum::UInt64(u64::cast_from(worker_index)), Datum::String(update_type), - Datum::from(ts.copied()), + Datum::from(ts), ]; row_builder.packer().extend(key.iter().map(|k| datums[*k])); let key_row = row_builder.clone(); diff --git a/src/compute/src/logging/timely.rs b/src/compute/src/logging/timely.rs index ce329ca4b0c20..8dada0159b7cd 100644 --- a/src/compute/src/logging/timely.rs +++ b/src/compute/src/logging/timely.rs @@ -16,14 +16,13 @@ use std::time::Duration; use mz_compute_client::logging::LoggingConfig; use mz_ore::cast::CastFrom; -use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion}; -use mz_ore::region::LgAllocVec; +use mz_ore::flatcontainer::{ItemRegion, MzIndexOptimized, MzRegionPreference, OwnedRegionOpinion}; use mz_repr::{Datum, Diff, Timestamp}; use mz_timely_util::containers::PreallocatingCapacityContainerBuilder; use mz_timely_util::replay::MzReplay; use serde::{Deserialize, Serialize}; use timely::communication::Allocate; -use timely::container::flatcontainer::FlatStack; +use timely::container::flatcontainer::{FlatStack, IntoOwned, MirrorRegion}; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::channels::pushers::buffer::Session; @@ -158,7 +157,7 @@ pub(super) fn construct( ) .as_collection(move |id, name| { packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(*id)), + Datum::UInt64(u64::cast_from(id)), Datum::UInt64(u64::cast_from(worker_id)), Datum::String(name), ]) @@ -191,7 +190,7 @@ pub(super) fn construct( .as_collection({ move |id, address| { packer.pack_by_index(|packer, index| match index { - 0 => packer.push(Datum::UInt64(u64::cast_from(*id))), + 0 => packer.push(Datum::UInt64(u64::cast_from(id))), 1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))), 2 => packer .push_list(address.iter().map(|i| Datum::UInt64(u64::cast_from(*i)))), @@ -272,7 +271,7 @@ pub(super) fn construct( ) .as_collection(move |operator, _| { packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(*operator)), + Datum::UInt64(u64::cast_from(operator)), Datum::UInt64(u64::cast_from(worker_id)), ]) }); @@ -362,7 +361,7 @@ struct MessageCount { } type FlatStackFor = - FlatStack<<(D, Timestamp, Diff) as MzRegionPreference>::Region, MzOffsetOptimized>; + FlatStack::Region>, MzIndexOptimized>; type Pusher = Counter, Tee>>; type OutputSession<'a, D> = @@ -395,7 +394,23 @@ struct ChannelDatum { impl MzRegionPreference for ChannelDatum { type Owned = Self; - type Region = LgAllocVec; + type Region = MirrorRegion; +} + +impl<'a> IntoOwned<'a> for ChannelDatum { + type Owned = Self; + + fn into_owned(self) -> Self::Owned { + self + } + + fn clone_onto(self, other: &mut Self::Owned) { + *other = self; + } + + fn borrow_as(owned: &'a Self::Owned) -> Self { + *owned + } } #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] @@ -406,7 +421,23 @@ struct ParkDatum { impl MzRegionPreference for ParkDatum { type Owned = Self; - type Region = LgAllocVec; + type Region = MirrorRegion; +} + +impl<'a> IntoOwned<'a> for ParkDatum { + type Owned = Self; + + fn into_owned(self) -> Self::Owned { + self + } + + fn clone_onto(self, other: &mut Self::Owned) { + *other = self; + } + + fn borrow_as(owned: &'a Self::Owned) -> Self { + *owned + } } #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] @@ -417,7 +448,23 @@ struct MessageDatum { impl MzRegionPreference for MessageDatum { type Owned = Self; - type Region = LgAllocVec; + type Region = MirrorRegion; +} + +impl<'a> IntoOwned<'a> for MessageDatum { + type Owned = Self; + + fn into_owned(self) -> Self::Owned { + self + } + + fn clone_onto(self, other: &mut Self::Owned) { + *other = self; + } + + fn borrow_as(owned: &'a Self::Owned) -> Self { + *owned + } } #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] @@ -428,7 +475,23 @@ struct ScheduleHistogramDatum { impl MzRegionPreference for ScheduleHistogramDatum { type Owned = Self; - type Region = LgAllocVec; + type Region = MirrorRegion; +} + +impl<'a> IntoOwned<'a> for ScheduleHistogramDatum { + type Owned = Self; + + fn into_owned(self) -> Self::Owned { + self + } + + fn clone_onto(self, other: &mut Self::Owned) { + *other = self; + } + + fn borrow_as(owned: &'a Self::Owned) -> Self { + *owned + } } /// Event handler of the demux operator. diff --git a/src/compute/src/typedefs.rs b/src/compute/src/typedefs.rs index 9be686d79f4a5..5076dcce3f067 100644 --- a/src/compute/src/typedefs.rs +++ b/src/compute/src/typedefs.rs @@ -11,6 +11,9 @@ #![allow(dead_code, missing_docs)] +pub use crate::row_spine::{RowRowSpine, RowSpine, RowValSpine}; +use crate::typedefs::spines::MzFlatLayout; +pub use crate::typedefs::spines::{ColKeySpine, ColValSpine}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::trace::implementations::chunker::ColumnationChunker; @@ -19,17 +22,12 @@ use differential_dataflow::trace::implementations::merge_batcher_col::Columnatio use differential_dataflow::trace::implementations::ord_neu::{FlatValSpine, OrdValBatch}; use differential_dataflow::trace::wrappers::enter::TraceEnter; use differential_dataflow::trace::wrappers::frontier::TraceFrontier; -use mz_ore::flatcontainer::{ - MzOffsetOptimized, MzRegionPreference, MzTupleABCRegion, MzTupleABRegion, -}; +use mz_ore::flatcontainer::{ItemRegion, MzIndexOptimized, MzRegionPreference}; use mz_repr::Diff; use mz_storage_types::errors::DataflowError; +use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use timely::dataflow::ScopeParent; -pub use crate::row_spine::{RowRowSpine, RowSpine, RowValSpine}; -use crate::typedefs::spines::MzFlatLayout; -pub use crate::typedefs::spines::{ColKeySpine, ColValSpine}; - pub(crate) mod spines { use std::rc::Rc; @@ -41,7 +39,7 @@ pub(crate) mod spines { use differential_dataflow::trace::implementations::spine_fueled::Spine; use differential_dataflow::trace::implementations::{Layout, Update}; use differential_dataflow::trace::rc_blanket_impls::RcBuilder; - use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegion}; + use mz_ore::flatcontainer::{MzIndex, MzIndexOptimized, MzRegion}; use mz_timely_util::containers::stack::StackWrapper; use timely::container::columnation::{Columnation, TimelyStack}; use timely::container::flatcontainer::FlatStack; @@ -91,10 +89,10 @@ pub(crate) mod spines { impl Update for MzFlatLayout where - KR: MzRegion, - VR: MzRegion, - TR: MzRegion, - RR: MzRegion, + KR: MzRegion, + VR: MzRegion, + TR: MzRegion, + RR: MzRegion, KR::Owned: Ord + Clone + 'static, VR::Owned: Ord + Clone + 'static, TR::Owned: Ord + Clone + Lattice + Timestamp + 'static, @@ -116,10 +114,10 @@ pub(crate) mod spines { /// to the optimized variant, we might be able to remove this implementation. impl Layout for MzFlatLayout where - KR: MzRegion, - VR: MzRegion, - TR: MzRegion, - RR: MzRegion, + KR: MzRegion, + VR: MzRegion, + TR: MzRegion, + RR: MzRegion, KR::Owned: Ord + Clone + 'static, VR::Owned: Ord + Clone + 'static, TR::Owned: Ord + Clone + Lattice + Timestamp + 'static, @@ -130,10 +128,10 @@ pub(crate) mod spines { for<'a> RR::ReadItem<'a>: Copy + Ord, { type Target = Self; - type KeyContainer = FlatStack; - type ValContainer = FlatStack; - type TimeContainer = FlatStack; - type DiffContainer = FlatStack; + type KeyContainer = FlatStack; + type ValContainer = FlatStack; + type TimeContainer = FlatStack; + type DiffContainer = FlatStack; type OffsetContainer = OffsetOptimized; } } @@ -184,15 +182,15 @@ pub type KeyValBatcher = MergeBatcher< pub type FlatKeyValBatch = OrdValBatch>; pub type FlatKeyValSpine = FlatValSpine< MzFlatLayout, - MzTupleABCRegion, T, R>, + ItemRegion, T, R>>, C, - MzOffsetOptimized, + MzIndexOptimized, >; pub type FlatKeyValSpineDefault = FlatKeyValSpine< - ::Region, - ::Region, - ::Region, - ::Region, + ItemRegion<::Region>, + ItemRegion<::Region>, + ItemRegion<::Region>, + ItemRegion<::Region>, C, >; pub type FlatKeyValAgent = TraceAgent>; diff --git a/src/ore/src/flatcontainer.rs b/src/ore/src/flatcontainer.rs index e03413b2a5b39..27b3ed586189b 100644 --- a/src/ore/src/flatcontainer.rs +++ b/src/ore/src/flatcontainer.rs @@ -20,8 +20,7 @@ use flatcontainer::{OptionRegion, Push, Region, ReserveItems, StringRegion}; use serde::{Deserialize, Serialize}; pub use item::ItemRegion; -pub use offset::MzOffsetOptimized; -pub use tuple::*; +pub use offset::MzIndexOptimized; /// Associate a type with a flat container region. pub trait MzRegionPreference: 'static { @@ -45,7 +44,7 @@ impl std::ops::Deref for MzIndex { /// TODO pub trait MzRegion: - Region + Region + Push<::Owned> + for<'a> Push<&'a ::Owned> + for<'a> Push<::ReadItem<'a>> @@ -56,7 +55,7 @@ pub trait MzRegion: } impl MzRegion for R where - R: Region + R: Region + Push<::Owned> + for<'a> Push<&'a ::Owned> + for<'a> Push<::ReadItem<'a>> @@ -74,276 +73,30 @@ impl MzRegion for R where pub struct OwnedRegionOpinion(std::marker::PhantomData); mod tuple { - use flatcontainer::{Index, Push, Region, ReserveItems}; + use flatcontainer::impls::tuple::*; use paste::paste; - use crate::flatcontainer::{MzIndex, MzRegion, MzRegionPreference}; + use crate::flatcontainer::MzRegionPreference; /// The macro creates the region implementation for tuples macro_rules! tuple_flatcontainer { - ($($name:ident)+) => ( - paste! { + ($($name:ident)+) => (paste! { impl<$($name: MzRegionPreference),*> MzRegionPreference for ($($name,)*) { type Owned = ($($name::Owned,)*); - type Region = []<$($name::Region,)*>; - } - - /// A region for a tuple. - #[allow(non_snake_case)] - #[derive(Default, Debug)] - #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] - pub struct []<$($name),*> { - $([]: $name),* - } - - #[allow(non_snake_case)] - impl<$($name: MzRegion),*> Clone for []<$($name),*> - where - $(<$name as Region>::Index: Index),* - { - #[inline] - fn clone(&self) -> Self { - Self { - $([]: self.[].clone(),)* - } - } - - #[inline] - fn clone_from(&mut self, source: &Self) { - $(self.[].clone_from(&source.[]);)* - } - } - - #[allow(non_snake_case)] - impl<$($name: MzRegion),*> Region for []<$($name),*> - where - $(<$name as Region>::Index: Index),* - { - type Owned = ($($name::Owned,)*); - type ReadItem<'a> = ($($name::ReadItem<'a>,)*) where Self: 'a; - - type Index = MzIndex; - - #[inline] - fn merge_regions<'a>(regions: impl Iterator + Clone) -> Self - where - Self: 'a, - { - Self { - $([]: $name::merge_regions(regions.clone().map(|r| &r.[]))),* - } - } - - #[inline] fn index(&self, index: Self::Index) -> Self::ReadItem<'_> { - ( - $(self.[].index(index),)* - ) - } - - #[inline(always)] - fn reserve_regions<'a, It>(&mut self, regions: It) - where - Self: 'a, - It: Iterator + Clone, - { - $(self.[].reserve_regions(regions.clone().map(|r| &r.[]));)* - } - - #[inline(always)] - fn clear(&mut self) { - $(self.[].clear();)* - } - - #[inline] - fn heap_size(&self, mut callback: Fn) { - $(self.[].heap_size(&mut callback);)* - } - - #[inline] - fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> where Self: 'a { - let ($($name,)*) = item; - ( - $($name::reborrow($name),)* - ) - } - } - - #[allow(non_camel_case_types)] - #[allow(non_snake_case)] - impl<$($name, [<$name _C>]: MzRegion ),*> Push<($($name,)*)> for []<$([<$name _C>]),*> - where - $([<$name _C>]: Push<$name>),* - { - #[inline] - fn push(&mut self, item: ($($name,)*)) - -> <[]<$([<$name _C>]),*> as Region>::Index { - let ($($name,)*) = item; - $(let _index = self.[].push($name);)* - _index - } - } - - #[allow(non_camel_case_types)] - #[allow(non_snake_case)] - impl<'a, $($name, [<$name _C>]),*> Push<&'a ($($name,)*)> for []<$([<$name _C>]),*> - where - $([<$name _C>]: MzRegion + Push<&'a $name>),* - { - #[inline] - fn push(&mut self, item: &'a ($($name,)*)) - -> <[]<$([<$name _C>]),*> as Region>::Index { - let ($($name,)*) = item; - $(let _index = self.[].push($name);)* - _index - } - } - - #[allow(non_camel_case_types)] - #[allow(non_snake_case)] - impl<'a, $($name, [<$name _C>]),*> ReserveItems<&'a ($($name,)*)> for []<$([<$name _C>]),*> - where - $([<$name _C>]: MzRegion + ReserveItems<&'a $name>),* - { - #[inline] - fn reserve_items(&mut self, items: It) - where - It: Iterator + Clone, - { - tuple_flatcontainer!(reserve_items self items $($name)* @ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31); - } - } - - #[allow(non_camel_case_types)] - #[allow(non_snake_case)] - impl<$($name, [<$name _C>]),*> ReserveItems<($($name,)*)> for []<$([<$name _C>]),*> - where - $([<$name _C>]: MzRegion + ReserveItems<$name>),* - { - #[inline] - fn reserve_items(&mut self, items: It) - where - It: Iterator + Clone, - { - tuple_flatcontainer!(reserve_items_owned self items $($name)* @ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31); - } + type Region = []<$($name::Region,)*>; } - } - ); - (reserve_items $self:ident $items:ident $name0:ident $($name:ident)* @ $num0:tt $($num:tt)*) => { - paste! { - $self.[].reserve_items($items.clone().map(|i| &i.$num0)); - tuple_flatcontainer!(reserve_items $self $items $($name)* @ $($num)*); - } - }; - (reserve_items $self:ident $items:ident @ $($num:tt)*) => {}; - (reserve_items_owned $self:ident $items:ident $name0:ident $($name:ident)* @ $num0:tt $($num:tt)*) => { - paste! { - $self.[].reserve_items($items.clone().map(|i| i.$num0)); - tuple_flatcontainer!(reserve_items_owned $self $items $($name)* @ $($num)*); - } - }; - (reserve_items_owned $self:ident $items:ident @ $($num:tt)*) => {}; -} + }); + } tuple_flatcontainer!(A); tuple_flatcontainer!(A B); tuple_flatcontainer!(A B C); tuple_flatcontainer!(A B C D); tuple_flatcontainer!(A B C D E); - - #[cfg(feature = "differential")] - mod differential { - use differential_dataflow::difference::Semigroup; - use differential_dataflow::lattice::Lattice; - use differential_dataflow::trace::implementations::merge_batcher_flat::RegionUpdate; - use differential_dataflow::trace::implementations::Update; - use timely::progress::Timestamp; - - use crate::flatcontainer::{MzRegion, MzTupleABCRegion, MzTupleABRegion}; - - impl Update for MzTupleABCRegion, TR, RR> - where - KR: MzRegion, - KR::Owned: Clone + Ord, - for<'a> KR::ReadItem<'a>: Copy + Ord, - VR: MzRegion, - VR::Owned: Clone + Ord, - for<'a> VR::ReadItem<'a>: Copy + Ord, - TR: MzRegion, - TR::Owned: Clone + Lattice + Ord + Timestamp, - for<'a> TR::ReadItem<'a>: Copy + Ord, - RR: MzRegion, - RR::Owned: Clone + Ord + Semigroup, - for<'a> RR::ReadItem<'a>: Copy + Ord, - { - type Key = KR::Owned; - type Val = VR::Owned; - type Time = TR::Owned; - type Diff = RR::Owned; - } - - impl RegionUpdate for MzTupleABCRegion, TR, RR> - where - KR: MzRegion, - for<'a> KR::ReadItem<'a>: Copy + Ord, - VR: MzRegion, - for<'a> VR::ReadItem<'a>: Copy + Ord, - TR: MzRegion, - for<'a> TR::ReadItem<'a>: Copy + Ord, - RR: MzRegion, - for<'a> RR::ReadItem<'a>: Copy + Ord, - { - type Key<'a> = KR::ReadItem<'a> where Self: 'a; - type Val<'a> = VR::ReadItem<'a> where Self: 'a; - type Time<'a> = TR::ReadItem<'a> where Self: 'a; - type TimeOwned = TR::Owned; - type Diff<'a> = RR::ReadItem<'a> where Self: 'a; - type DiffOwned = RR::Owned; - - #[inline] - fn into_parts<'a>( - ((key, val), time, diff): Self::ReadItem<'a>, - ) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) { - (key, val, time, diff) - } - - #[inline] - fn reborrow_key<'b, 'a: 'b>(item: Self::Key<'a>) -> Self::Key<'b> - where - Self: 'a, - { - KR::reborrow(item) - } - - #[inline] - fn reborrow_val<'b, 'a: 'b>(item: Self::Val<'a>) -> Self::Val<'b> - where - Self: 'a, - { - VR::reborrow(item) - } - - #[inline] - fn reborrow_time<'b, 'a: 'b>(item: Self::Time<'a>) -> Self::Time<'b> - where - Self: 'a, - { - TR::reborrow(item) - } - - #[inline] - fn reborrow_diff<'b, 'a: 'b>(item: Self::Diff<'a>) -> Self::Diff<'b> - where - Self: 'a, - { - RR::reborrow(item) - } - } - } } mod copy { - use crate::region::LgAllocVec; + use flatcontainer::MirrorRegion; use crate::flatcontainer::MzRegionPreference; @@ -351,7 +104,7 @@ mod copy { ($index_type:ty) => { impl MzRegionPreference for $index_type { type Owned = Self; - type Region = LgAllocVec; + type Region = MirrorRegion; } }; } @@ -781,6 +534,78 @@ mod item { self.storage.reserve(items.count()); } } + + #[cfg(feature = "differential")] + mod differential { + use differential_dataflow::trace::implementations::merge_batcher_flat::RegionUpdate; + use differential_dataflow::trace::implementations::Update; + + use crate::flatcontainer::{ItemRegion, MzRegion}; + + impl Update for ItemRegion + where + UR: Update + MzRegion, + UR::Owned: Clone + Ord, + for<'a> UR::ReadItem<'a>: Copy + Ord, + { + type Key = UR::Key; + type Val = UR::Val; + type Time = UR::Time; + type Diff = UR::Diff; + } + + impl RegionUpdate for ItemRegion + where + UR: RegionUpdate + MzRegion, + for<'a> UR::ReadItem<'a>: Copy + Ord, + { + type Key<'a> = UR::Key<'a> where Self: 'a; + type Val<'a> = UR::Val<'a> where Self: 'a; + type Time<'a> = UR::Time<'a> where Self: 'a; + type TimeOwned = UR::TimeOwned; + type Diff<'a> = UR::Diff<'a> where Self: 'a; + type DiffOwned = UR::DiffOwned; + + #[inline] + fn into_parts<'a>( + item: Self::ReadItem<'a>, + ) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) { + UR::into_parts(item) + } + + #[inline] + fn reborrow_key<'b, 'a: 'b>(item: Self::Key<'a>) -> Self::Key<'b> + where + Self: 'a, + { + UR::reborrow_key(item) + } + + #[inline] + fn reborrow_val<'b, 'a: 'b>(item: Self::Val<'a>) -> Self::Val<'b> + where + Self: 'a, + { + UR::reborrow_val(item) + } + + #[inline] + fn reborrow_time<'b, 'a: 'b>(item: Self::Time<'a>) -> Self::Time<'b> + where + Self: 'a, + { + UR::reborrow_time(item) + } + + #[inline] + fn reborrow_diff<'b, 'a: 'b>(item: Self::Diff<'a>) -> Self::Diff<'b> + where + Self: 'a, + { + UR::reborrow_diff(item) + } + } + } } mod lgallocvec { @@ -967,47 +792,56 @@ mod offset { /// TODO #[derive(Default, Clone, Debug)] - pub struct MzOffsetOptimized(IndexOptimized); + pub struct MzIndexOptimized(IndexOptimized); - impl Storage for MzOffsetOptimized { + impl Storage for MzIndexOptimized { + #[inline] fn with_capacity(capacity: usize) -> Self { Self(IndexOptimized::with_capacity(capacity)) } + #[inline] fn reserve(&mut self, additional: usize) { self.0.reserve(additional) } + #[inline] fn clear(&mut self) { self.0.clear(); } + #[inline] fn heap_size(&self, callback: F) { self.0.heap_size(callback); } + #[inline] fn len(&self) -> usize { self.0.len() } + #[inline] fn is_empty(&self) -> bool { self.0.is_empty() } } - impl IndexContainer for MzOffsetOptimized { + impl IndexContainer for MzIndexOptimized { type Iter<'a> = MzOffsetOptimizedIter<>::Iter<'a>> where Self: 'a; + #[inline] fn index(&self, index: usize) -> MzIndex { MzIndex(self.0.index(index)) } + #[inline] fn push(&mut self, item: MzIndex) { self.0.push(item.0); } + #[inline] fn extend>(&mut self, iter: I) where I::IntoIter: ExactSizeIterator, @@ -1015,11 +849,21 @@ mod offset { self.0.extend(iter.into_iter().map(|item| item.0)); } + #[inline] fn iter(&self) -> Self::Iter<'_> { MzOffsetOptimizedIter(self.0.iter()) } } + impl<'a> IntoIterator for &'a MzIndexOptimized { + type Item = MzIndex; + type IntoIter = MzOffsetOptimizedIter<>::Iter<'a>>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } + } + /// TODO #[derive(Clone, Copy, Debug)] pub struct MzOffsetOptimizedIter(I); @@ -1030,6 +874,7 @@ mod offset { { type Item = MzIndex; + #[inline] fn next(&mut self) -> Option { self.0.next().map(MzIndex) } diff --git a/src/repr/src/timestamp.rs b/src/repr/src/timestamp.rs index 96549a11f3031..1461541f7daef 100644 --- a/src/repr/src/timestamp.rs +++ b/src/repr/src/timestamp.rs @@ -467,15 +467,14 @@ impl columnation::Columnation for Timestamp { } mod flatcontainer { - use flatcontainer::IntoOwned; + use flatcontainer::{IntoOwned, MirrorRegion}; use mz_ore::flatcontainer::MzRegionPreference; - use mz_ore::region::LgAllocVec; use crate::Timestamp; impl MzRegionPreference for Timestamp { type Owned = Self; - type Region = LgAllocVec; + type Region = MirrorRegion; } impl<'a> IntoOwned<'a> for Timestamp {