Skip to content

Commit

Permalink
Back out of MzTupleRegion
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jul 22, 2024
1 parent ae9afc9 commit c249582
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 327 deletions.
10 changes: 5 additions & 5 deletions src/compute/src/extensions/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ mod flatcontainer {
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use mz_ore::flatcontainer::{MzRegion, MzRegionPreference};
use mz_ore::flatcontainer::{MzIndex, MzRegion, MzRegionPreference};
use timely::container::flatcontainer::{IntoOwned, Region};
use timely::dataflow::Scope;
use timely::progress::Timestamp;
Expand All @@ -399,10 +399,10 @@ mod flatcontainer {
Self: Clone,
G: Scope<Timestamp = T::Owned>,
G::Timestamp: Lattice + Ord + MzRegionPreference,
K: MzRegion,
V: MzRegion,
T: MzRegion,
R: MzRegion,
K: MzRegion<Index = MzIndex>,
V: MzRegion<Index = MzIndex>,
T: MzRegion<Index = MzIndex>,
R: MzRegion<Index = MzIndex>,
K::Owned: Clone + Ord,
V::Owned: Clone + Ord,
T::Owned: Lattice + for<'a> PartialOrder<<T as Region>::ReadItem<'a>> + Timestamp,
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/logging/differential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub(super) fn construct<A: Allocate>(
)
.as_collection(move |op, ()| {
packer.pack_slice(&[
Datum::UInt64(u64::cast_from(*op)),
Datum::UInt64(u64::cast_from(op)),
Datum::UInt64(u64::cast_from(worker_id)),
])
})
Expand Down
27 changes: 14 additions & 13 deletions src/compute/src/logging/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ use std::collections::BTreeMap;
use std::rc::Rc;
use std::time::{Duration, Instant};

use crate::arrangement::manager::TraceBundle;
use crate::extensions::arrange::{KeyCollection, MzArrange};
use crate::logging::compute::ComputeEvent;
use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
use differential_dataflow::dynamic::pointstamp::PointStamp;
use differential_dataflow::logging::DifferentialEvent;
use differential_dataflow::Collection;
use mz_compute_client::logging::{LogVariant, LoggingConfig};
use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion};
use mz_ore::flatcontainer::{ItemRegion, MzIndexOptimized, MzRegionPreference, OwnedRegionOpinion};
use mz_repr::{Diff, Timestamp};
use mz_storage_operators::persist_source::Subtime;
use mz_storage_types::errors::DataflowError;
Expand All @@ -27,11 +31,6 @@ use timely::logging::{Logger, ProgressEventTimestamp, TimelyEvent, WorkerIdentif
use timely::order::Product;
use timely::progress::reachability::logging::TrackerEvent;

use crate::arrangement::manager::TraceBundle;
use crate::extensions::arrange::{KeyCollection, MzArrange};
use crate::logging::compute::ComputeEvent;
use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};

/// Initialize logging dataflows.
///
/// Returns a logger for compute events, and for each `LogVariant` a trace bundle usable for
Expand Down Expand Up @@ -87,11 +86,13 @@ type ReachabilityEventRegionPreference = (
OwnedRegionOpinion<Vec<usize>>,
OwnedRegionOpinion<Vec<(usize, usize, bool, Option<Timestamp>, Diff)>>,
);
pub(super) type ReachabilityEventRegion = <(
Duration,
WorkerIdentifier,
ReachabilityEventRegionPreference,
) as MzRegionPreference>::Region;
pub(super) type ReachabilityEventRegion = ItemRegion<
<(
Duration,
WorkerIdentifier,
ReachabilityEventRegionPreference,
) as MzRegionPreference>::Region,
>;

struct LoggingContext<'a, A: Allocate> {
worker: &'a mut timely::worker::Worker<A>,
Expand All @@ -100,7 +101,7 @@ struct LoggingContext<'a, A: Allocate> {
now: Instant,
start_offset: Duration,
t_event_queue: EventQueue<Vec<(Duration, WorkerIdentifier, TimelyEvent)>>,
r_event_queue: EventQueue<FlatStack<ReachabilityEventRegion, MzOffsetOptimized>>,
r_event_queue: EventQueue<FlatStack<ReachabilityEventRegion, MzIndexOptimized>>,
d_event_queue: EventQueue<Vec<(Duration, WorkerIdentifier, DifferentialEvent)>>,
c_event_queue: EventQueue<Vec<(Duration, WorkerIdentifier, ComputeEvent)>>,
shared_state: Rc<RefCell<SharedLoggingState>>,
Expand Down Expand Up @@ -189,7 +190,7 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {
fn reachability_logger(&self) -> Logger<TrackerEvent> {
let event_queue = self.r_event_queue.clone();
type CB = PreallocatingCapacityContainerBuilder<
FlatStack<ReachabilityEventRegion, MzOffsetOptimized>,
FlatStack<ReachabilityEventRegion, MzIndexOptimized>,
>;
let mut logger = BatchLogger::<CB, _>::new(event_queue.link, self.interval_ms);
Logger::new(
Expand Down
14 changes: 7 additions & 7 deletions src/compute/src/logging/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::rc::Rc;
use mz_compute_client::logging::LoggingConfig;
use mz_expr::{permutation_for_arrangement, MirScalarExpr};
use mz_ore::cast::CastFrom;
use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion};
use mz_ore::flatcontainer::{ItemRegion, MzIndexOptimized, MzRegionPreference, OwnedRegionOpinion};
use mz_ore::iter::IteratorExt;
use mz_repr::{Datum, Diff, RowArena, SharedRow, Timestamp};
use mz_timely_util::containers::PreallocatingCapacityContainerBuilder;
Expand All @@ -39,7 +39,7 @@ use crate::typedefs::{FlatKeyValSpineDefault, RowRowSpine};
pub(super) fn construct<A: Allocate>(
worker: &mut timely::worker::Worker<A>,
config: &LoggingConfig,
event_queue: EventQueue<FlatStack<ReachabilityEventRegion, MzOffsetOptimized>>,
event_queue: EventQueue<FlatStack<ReachabilityEventRegion, MzIndexOptimized>>,
) -> BTreeMap<LogVariant, LogCollection> {
let interval_ms = std::cmp::max(1, config.interval.as_millis());
let worker_index = worker.index();
Expand All @@ -55,10 +55,10 @@ pub(super) fn construct<A: Allocate>(
usize,
Option<Timestamp>,
);
type UpdatesRegion = <((UpdatesKey, ()), Timestamp, Diff) as MzRegionPreference>::Region;
type UpdatesRegion =
ItemRegion<<((UpdatesKey, ()), Timestamp, Diff) as MzRegionPreference>::Region>;

type CB =
PreallocatingCapacityContainerBuilder<FlatStack<UpdatesRegion, MzOffsetOptimized>>;
type CB = PreallocatingCapacityContainerBuilder<FlatStack<UpdatesRegion, MzIndexOptimized>>;
let (updates, token) = Some(event_queue.link).mz_replay::<_, CB, _>(
scope,
"reachability logs",
Expand Down Expand Up @@ -103,7 +103,7 @@ pub(super) fn construct<A: Allocate>(
);

let updates =
updates.as_collection(move |(&update_type, addr, &source, &port, ts), _| {
updates.as_collection(move |(update_type, addr, source, port, ts), _| {
let row_arena = RowArena::default();
let update_type = if update_type { "source" } else { "target" };
let binding = SharedRow::get();
Expand All @@ -119,7 +119,7 @@ pub(super) fn construct<A: Allocate>(
Datum::UInt64(u64::cast_from(port)),
Datum::UInt64(u64::cast_from(worker_index)),
Datum::String(update_type),
Datum::from(ts.copied()),
Datum::from(ts),
];
row_builder.packer().extend(key.iter().map(|k| datums[*k]));
let key_row = row_builder.clone();
Expand Down
85 changes: 74 additions & 11 deletions src/compute/src/logging/timely.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ use std::time::Duration;

use mz_compute_client::logging::LoggingConfig;
use mz_ore::cast::CastFrom;
use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion};
use mz_ore::region::LgAllocVec;
use mz_ore::flatcontainer::{ItemRegion, MzIndexOptimized, MzRegionPreference, OwnedRegionOpinion};
use mz_repr::{Datum, Diff, Timestamp};
use mz_timely_util::containers::PreallocatingCapacityContainerBuilder;
use mz_timely_util::replay::MzReplay;
use serde::{Deserialize, Serialize};
use timely::communication::Allocate;
use timely::container::flatcontainer::FlatStack;
use timely::container::flatcontainer::{FlatStack, IntoOwned, MirrorRegion};
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::channels::pushers::buffer::Session;
Expand Down Expand Up @@ -158,7 +157,7 @@ pub(super) fn construct<A: Allocate>(
)
.as_collection(move |id, name| {
packer.pack_slice(&[
Datum::UInt64(u64::cast_from(*id)),
Datum::UInt64(u64::cast_from(id)),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::String(name),
])
Expand Down Expand Up @@ -191,7 +190,7 @@ pub(super) fn construct<A: Allocate>(
.as_collection({
move |id, address| {
packer.pack_by_index(|packer, index| match index {
0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
0 => packer.push(Datum::UInt64(u64::cast_from(id))),
1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
2 => packer
.push_list(address.iter().map(|i| Datum::UInt64(u64::cast_from(*i)))),
Expand Down Expand Up @@ -272,7 +271,7 @@ pub(super) fn construct<A: Allocate>(
)
.as_collection(move |operator, _| {
packer.pack_slice(&[
Datum::UInt64(u64::cast_from(*operator)),
Datum::UInt64(u64::cast_from(operator)),
Datum::UInt64(u64::cast_from(worker_id)),
])
});
Expand Down Expand Up @@ -362,7 +361,7 @@ struct MessageCount {
}

type FlatStackFor<D> =
FlatStack<<(D, Timestamp, Diff) as MzRegionPreference>::Region, MzOffsetOptimized>;
FlatStack<ItemRegion<<(D, Timestamp, Diff) as MzRegionPreference>::Region>, MzIndexOptimized>;

type Pusher<D> = Counter<Timestamp, FlatStackFor<D>, Tee<Timestamp, FlatStackFor<D>>>;
type OutputSession<'a, D> =
Expand Down Expand Up @@ -395,7 +394,23 @@ struct ChannelDatum {

impl MzRegionPreference for ChannelDatum {
type Owned = Self;
type Region = LgAllocVec<Self>;
type Region = MirrorRegion<Self>;
}

impl<'a> IntoOwned<'a> for ChannelDatum {
type Owned = Self;

fn into_owned(self) -> Self::Owned {
self
}

fn clone_onto(self, other: &mut Self::Owned) {
*other = self;
}

fn borrow_as(owned: &'a Self::Owned) -> Self {
*owned
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
Expand All @@ -406,7 +421,23 @@ struct ParkDatum {

impl MzRegionPreference for ParkDatum {
type Owned = Self;
type Region = LgAllocVec<Self>;
type Region = MirrorRegion<Self>;
}

impl<'a> IntoOwned<'a> for ParkDatum {
type Owned = Self;

fn into_owned(self) -> Self::Owned {
self
}

fn clone_onto(self, other: &mut Self::Owned) {
*other = self;
}

fn borrow_as(owned: &'a Self::Owned) -> Self {
*owned
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
Expand All @@ -417,7 +448,23 @@ struct MessageDatum {

impl MzRegionPreference for MessageDatum {
type Owned = Self;
type Region = LgAllocVec<Self>;
type Region = MirrorRegion<Self>;
}

impl<'a> IntoOwned<'a> for MessageDatum {
type Owned = Self;

fn into_owned(self) -> Self::Owned {
self
}

fn clone_onto(self, other: &mut Self::Owned) {
*other = self;
}

fn borrow_as(owned: &'a Self::Owned) -> Self {
*owned
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
Expand All @@ -428,7 +475,23 @@ struct ScheduleHistogramDatum {

impl MzRegionPreference for ScheduleHistogramDatum {
type Owned = Self;
type Region = LgAllocVec<Self>;
type Region = MirrorRegion<Self>;
}

impl<'a> IntoOwned<'a> for ScheduleHistogramDatum {
type Owned = Self;

fn into_owned(self) -> Self::Owned {
self
}

fn clone_onto(self, other: &mut Self::Owned) {
*other = self;
}

fn borrow_as(owned: &'a Self::Owned) -> Self {
*owned
}
}

/// Event handler of the demux operator.
Expand Down
Loading

0 comments on commit c249582

Please sign in to comment.