Skip to content

Commit

Permalink
Use MzOffsetOptimized in FlatStack storage
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jul 8, 2024
1 parent 8224304 commit 60c7e2d
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 116 deletions.
17 changes: 8 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 14 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -267,20 +267,22 @@ debug = 2
# version of Materialize.
[patch."https://github.com/TimelyDataflow/timely-dataflow"]
# Projects that do not reliably release to crates.io.
timely = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
timely_bytes = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
timely_communication = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
timely_container = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
timely_logging = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
timely = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" }
timely_bytes = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" }
timely_communication = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" }
timely_container = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" }
timely_logging = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" }
[patch.crates-io]
# Projects that do not reliably release to crates.io.
timely = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
timely_bytes = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
timely_communication = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
timely_container = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
timely_logging = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
differential-dataflow = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "consolidate_layout_merger_chunk" }
dogsdogsdogs = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "consolidate_layout_merger_chunk" }
timely = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" }
timely_bytes = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" }
timely_communication = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" }
timely_container = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" }
timely_logging = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" }
differential-dataflow = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "region_update" }
dogsdogsdogs = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "region_update" }

flatcontainer = { git = "https://github.com/antiguru/flatcontainer.git" }

# Waiting on https://github.com/sfackler/rust-postgres/pull/752.
postgres = { git = "https://github.com/MaterializeInc/rust-postgres" }
Expand Down
76 changes: 14 additions & 62 deletions src/compute/src/typedefs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use differential_dataflow::trace::implementations::merge_batcher_col::Columnatio
use differential_dataflow::trace::implementations::ord_neu::{FlatValSpine, OrdValBatch};
use differential_dataflow::trace::wrappers::enter::TraceEnter;
use differential_dataflow::trace::wrappers::frontier::TraceFrontier;
use mz_ore::flatcontainer::{MzRegionPreference, MzTupleABCRegion, MzTupleABRegion};
use mz_ore::flatcontainer::{
MzOffsetOptimized, MzRegionPreference, MzTupleABCRegion, MzTupleABRegion,
};
use mz_repr::Diff;
use mz_storage_types::errors::DataflowError;
use timely::dataflow::ScopeParent;
Expand All @@ -39,7 +41,7 @@ pub(crate) mod spines {
use differential_dataflow::trace::implementations::spine_fueled::Spine;
use differential_dataflow::trace::implementations::{Layout, Update};
use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
use mz_ore::flatcontainer::MzRegion;
use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegion};
use mz_timely_util::containers::stack::StackWrapper;
use timely::container::columnation::{Columnation, TimelyStack};
use timely::container::flatcontainer::FlatStack;
Expand Down Expand Up @@ -106,60 +108,6 @@ pub(crate) mod spines {
type Val = VR::Owned;
type Time = TR::Owned;
type Diff = RR::Owned;
type ItemRef<'a> = ((Self::KeyGat<'a>, Self::ValGat<'a>), Self::TimeGat<'a>, Self::DiffGat<'a>)
where
Self: 'a;
type KeyGat<'a> = KR::ReadItem<'a>
where
Self: 'a;
type ValGat<'a> = VR::ReadItem<'a>
where
Self: 'a;
type TimeGat<'a> = TR::ReadItem<'a>
where
Self: 'a;
type DiffGat<'a> = RR::ReadItem<'a>
where
Self: 'a;

fn into_parts<'a>(
((key, val), time, diff): Self::ItemRef<'a>,
) -> (
Self::KeyGat<'a>,
Self::ValGat<'a>,
Self::TimeGat<'a>,
Self::DiffGat<'a>,
) {
(key, val, time, diff)
}

fn reborrow_key<'b, 'a: 'b>(item: Self::KeyGat<'a>) -> Self::KeyGat<'b>
where
Self: 'a,
{
KR::reborrow(item)
}

fn reborrow_val<'b, 'a: 'b>(item: Self::ValGat<'a>) -> Self::ValGat<'b>
where
Self: 'a,
{
VR::reborrow(item)
}

fn reborrow_time<'b, 'a: 'b>(item: Self::TimeGat<'a>) -> Self::TimeGat<'b>
where
Self: 'a,
{
TR::reborrow(item)
}

fn reborrow_diff<'b, 'a: 'b>(item: Self::DiffGat<'a>) -> Self::DiffGat<'b>
where
Self: 'a,
{
RR::reborrow(item)
}
}

/// Layout implementation for [`MzFlatLayout`]. Mostly equivalent to differential's
Expand All @@ -182,10 +130,10 @@ pub(crate) mod spines {
for<'a> RR::ReadItem<'a>: Copy + Ord,
{
type Target = Self;
type KeyContainer = FlatStack<KR>;
type ValContainer = FlatStack<VR>;
type TimeContainer = FlatStack<TR>;
type DiffContainer = FlatStack<RR>;
type KeyContainer = FlatStack<KR, MzOffsetOptimized>;
type ValContainer = FlatStack<VR, MzOffsetOptimized>;
type TimeContainer = FlatStack<TR, MzOffsetOptimized>;
type DiffContainer = FlatStack<RR, MzOffsetOptimized>;
type OffsetContainer = OffsetOptimized;
}
}
Expand Down Expand Up @@ -234,8 +182,12 @@ pub type KeyValBatcher<K, V, T, D> = MergeBatcher<
>;

pub type FlatKeyValBatch<K, V, T, R> = OrdValBatch<MzFlatLayout<K, V, T, R>>;
pub type FlatKeyValSpine<K, V, T, R, C> =
FlatValSpine<MzFlatLayout<K, V, T, R>, MzTupleABCRegion<MzTupleABRegion<K, V>, T, R>, C>;
pub type FlatKeyValSpine<K, V, T, R, C> = FlatValSpine<
MzFlatLayout<K, V, T, R>,
MzTupleABCRegion<MzTupleABRegion<K, V>, T, R>,
C,
MzOffsetOptimized,
>;
pub type FlatKeyValSpineDefault<K, V, T, R, C> = FlatKeyValSpine<
<K as MzRegionPreference>::Region,
<V as MzRegionPreference>::Region,
Expand Down
Loading

0 comments on commit 60c7e2d

Please sign in to comment.