Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trait reorganization #424

Merged
5 changes: 3 additions & 2 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use timely::progress::{Antichain, frontier::AntichainRef};
use timely::dataflow::operators::CapabilitySet;

use lattice::Lattice;
use trace::{Trace, TraceReader, Batch, BatchReader, Cursor};
use trace::{Trace, TraceReader, Batch, BatchReader};

use trace::wrappers::rc::TraceBox;

Expand Down Expand Up @@ -53,6 +53,7 @@ where
type R = Tr::R;

type Batch = Tr::Batch;
type Storage = Tr::Storage;
type Cursor = Tr::Cursor;

fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
Expand All @@ -77,7 +78,7 @@ where
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
self.physical_compaction.borrow()
}
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor>::Storage)> {
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
self.trace.borrow_mut().trace.cursor_through(frontier)
}
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
Expand Down
28 changes: 21 additions & 7 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,10 @@ where
K: ExchangeData+Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
{
self.arrange_named("Arrange")
}
Expand All @@ -470,8 +472,10 @@ where
K: ExchangeData+Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
frankmcsherry marked this conversation as resolved.
Show resolved Hide resolved
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
Expand All @@ -485,8 +489,10 @@ where
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
;
}

Expand All @@ -503,7 +509,9 @@ where
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace + TraceReader<Key=K, Val=V, Time=G::Timestamp, R=R> + 'static, Tr::Batch: Batch
Tr: Trace + TraceReader<Time=G::Timestamp> + 'static, Tr::Batch: Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
{
self.arrange_named("Arrange")
}
Expand All @@ -513,7 +521,9 @@ where
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace + TraceReader<Key=K, Val=V, Time=G::Timestamp, R=R> + 'static, Tr::Batch: Batch
Tr: Trace + TraceReader<Time=G::Timestamp> + 'static, Tr::Batch: Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
Expand All @@ -522,8 +532,10 @@ where
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
{
// The `Arrange` operator is tasked with reacting to an advancing input
// frontier by producing the sequence of batches whose lower and upper
Expand Down Expand Up @@ -685,8 +697,10 @@ where
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
Tr: Trace+TraceReader<Key=K, Val=(), Time=G::Timestamp, R=R>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,()),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
{
self.map(|k| (k, ()))
.arrange_core(pact, name)
Expand Down
5 changes: 3 additions & 2 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ where
Tr::Val: ExchangeData,
Tr: Trace+TraceReader<Time=G::Timestamp,R=isize>+'static,
Tr::Batch: Batch,
<Tr::Batch as Batch>::Builder: Builder<Tr::Batch, Item = ((Tr::Key, Tr::Val), Tr::Time, Tr::R)>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down Expand Up @@ -277,10 +278,10 @@ where
for (time, std::cmp::Reverse(next)) in list {
if prev_value != next {
if let Some(prev) = prev_value {
updates.push((key.clone(), prev, time.clone(), -1));
updates.push(((key.clone(), prev), time.clone(), -1));
}
if let Some(next) = next.as_ref() {
updates.push((key.clone(), next.clone(), time.clone(), 1));
updates.push(((key.clone(), next.clone()), time.clone(), 1));
}
prev_value = next;
}
Expand Down
3 changes: 3 additions & 0 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use ::difference::Semigroup;

use Data;
use lattice::Lattice;
use trace::{Batch, Batcher};

/// Methods which require data be arrangeable.
impl<G, D, R> Collection<G, D, R>
Expand Down Expand Up @@ -57,6 +58,8 @@ where
where
Tr: crate::trace::Trace+crate::trace::TraceReader<Key=D,Val=(),Time=G::Timestamp,R=R>+'static,
Tr::Batch: crate::trace::Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((D,()),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
{
use operators::arrange::arrangement::Arrange;
self.map(|k| (k, ()))
Expand Down
18 changes: 9 additions & 9 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,12 +633,12 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
/// The structure wraps cursors which allow us to play out join computation at whatever rate we like.
/// This allows us to avoid producing and buffering massive amounts of data, without giving the timely
/// dataflow system a chance to run operators that can consume and aggregate the data.
struct Deferred<K, T, R, C1, C2, D>
struct Deferred<K, T, R, S1, S2, C1, C2, D>
where
T: Timestamp+Lattice+Ord+Debug,
R: Semigroup,
C1: Cursor<Key=K, Time=T>,
C2: Cursor<Key=K, Time=T>,
C1: Cursor<S1, Key=K, Time=T>,
C2: Cursor<S2, Key=K, Time=T>,
C1::Val: Ord+Clone,
C2::Val: Ord+Clone,
C1::R: Semigroup,
Expand All @@ -647,19 +647,19 @@ where
{
phant: ::std::marker::PhantomData<K>,
trace: C1,
trace_storage: C1::Storage,
trace_storage: S1,
batch: C2,
batch_storage: C2::Storage,
batch_storage: S2,
capability: Capability<T>,
done: bool,
temp: Vec<((D, T), R)>,
}

impl<K, T, R, C1, C2, D> Deferred<K, T, R, C1, C2, D>
impl<K, T, R, S1, S2, C1, C2, D> Deferred<K, T, R, S1, S2, C1, C2, D>
where
K: Ord+Debug+Eq,
C1: Cursor<Key=K, Time=T>,
C2: Cursor<Key=K, Time=T>,
C1: Cursor<S1, Key=K, Time=T>,
C2: Cursor<S2, Key=K, Time=T>,
C1::Val: Ord+Clone+Debug,
C2::Val: Ord+Clone+Debug,
C1::R: Semigroup,
Expand All @@ -668,7 +668,7 @@ where
R: Semigroup,
D: Clone+Data,
{
fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability<T>) -> Self {
fn new(trace: C1, trace_storage: S1, batch: C2, batch_storage: S2, capability: Capability<T>) -> Self {
Deferred {
phant: ::std::marker::PhantomData,
trace,
Expand Down
14 changes: 7 additions & 7 deletions src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ impl<'a, V:'a, T, R> EditList<'a, V, T, R> where T: Ord+Clone, R: Semigroup {
}
}
/// Loads the contents of a cursor.
fn load<C, L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
where V: Clone, C: Cursor<Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T {
fn load<S, C, L>(&mut self, cursor: &mut C, storage: &'a S, logic: L)
where V: Clone, C: Cursor<S, Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T {
self.clear();
while cursor.val_valid(storage) {
cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.clone()));
Expand Down Expand Up @@ -101,22 +101,22 @@ impl<'storage, V: Ord+Clone+'storage, T: Lattice+Ord+Clone, R: Semigroup> ValueH
self.history.clear();
self.buffer.clear();
}
fn load<C, L>(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L)
where C: Cursor<Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T {
fn load<S, C, L>(&mut self, cursor: &mut C, storage: &'storage S, logic: L)
where C: Cursor<S, Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T {
self.edits.load(cursor, storage, logic);
}

/// Loads and replays a specified key.
///
/// If the key is absent, the replayed history will be empty.
fn replay_key<'history, C, L>(
fn replay_key<'history, S, C, L>(
&'history mut self,
cursor: &mut C,
storage: &'storage C::Storage,
storage: &'storage S,
key: &C::Key,
logic: L
) -> HistoryReplay<'storage, 'history, V, T, R>
where C: Cursor<Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T
where C: Cursor<S, Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T
{
self.clear();
cursor.seek_key(storage, key);
Expand Down
34 changes: 19 additions & 15 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
T2::Val: Data,
T2::R: Abelian,
T2::Batch: Batch,
<T2::Batch as Batch>::Builder: Builder<T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static,
{
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
Expand All @@ -298,6 +299,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
T2::Val: Data,
T2::R: Semigroup,
T2::Batch: Batch,
<T2::Batch as Batch>::Builder: Builder<T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static
;
}
Expand All @@ -316,6 +318,7 @@ where
T2::R: Semigroup,
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
T2::Batch: Batch,
<T2::Batch as Batch>::Builder: Builder<T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
Expand All @@ -334,6 +337,7 @@ where
T2::Val: Data,
T2::R: Semigroup,
T2::Batch: Batch,
<T2::Batch as Batch>::Builder: Builder<T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static {

let mut result_trace = None;
Expand Down Expand Up @@ -548,7 +552,7 @@ where
for index in 0 .. buffers.len() {
buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
for (val, time, diff) in buffers[index].1.drain(..) {
builders[index].push((key.clone(), val, time, diff));
builders[index].push(((key.clone(), val), time, diff));
}
}
}
Expand Down Expand Up @@ -651,22 +655,22 @@ where
R2: Semigroup,
{
fn new() -> Self;
fn compute<K, C1, C2, C3, L>(
fn compute<K, S1, S2, S3, C1, C2, C3, L>(
&mut self,
key: &K,
source_cursor: (&mut C1, &'a C1::Storage),
output_cursor: (&mut C2, &'a C2::Storage),
batch_cursor: (&mut C3, &'a C3::Storage),
source_cursor: (&mut C1, &'a S1),
output_cursor: (&mut C2, &'a S2),
batch_cursor: (&mut C3, &'a S3),
times: &mut Vec<T>,
logic: &mut L,
upper_limit: &Antichain<T>,
outputs: &mut [(T, Vec<(V2, T, R2)>)],
new_interesting: &mut Vec<T>) -> (usize, usize)
where
K: Eq+Clone,
C1: Cursor<Key = K, Val = V1, Time = T, R = R1>,
C2: Cursor<Key = K, Val = V2, Time = T, R = R2>,
C3: Cursor<Key = K, Val = V1, Time = T, R = R1>,
C1: Cursor<S1, Key = K, Val = V1, Time = T, R = R1>,
C2: Cursor<S2, Key = K, Val = V2, Time = T, R = R2>,
C3: Cursor<S3, Key = K, Val = V1, Time = T, R = R1>,
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>);
}

Expand Down Expand Up @@ -729,22 +733,22 @@ mod history_replay {
}
}
#[inline(never)]
fn compute<K, C1, C2, C3, L>(
fn compute<K, S1, S2, S3, C1, C2, C3, L>(
&mut self,
key: &K,
(source_cursor, source_storage): (&mut C1, &'a C1::Storage),
(output_cursor, output_storage): (&mut C2, &'a C2::Storage),
(batch_cursor, batch_storage): (&mut C3, &'a C3::Storage),
(source_cursor, source_storage): (&mut C1, &'a S1),
(output_cursor, output_storage): (&mut C2, &'a S2),
(batch_cursor, batch_storage): (&mut C3, &'a S3),
times: &mut Vec<T>,
logic: &mut L,
upper_limit: &Antichain<T>,
outputs: &mut [(T, Vec<(V2, T, R2)>)],
new_interesting: &mut Vec<T>) -> (usize, usize)
where
K: Eq+Clone,
C1: Cursor<Key = K, Val = V1, Time = T, R = R1>,
C2: Cursor<Key = K, Val = V2, Time = T, R = R2>,
C3: Cursor<Key = K, Val = V1, Time = T, R = R1>,
C1: Cursor<S1, Key = K, Val = V1, Time = T, R = R1>,
C2: Cursor<S2, Key = K, Val = V2, Time = T, R = R2>,
C3: Cursor<S3, Key = K, Val = V1, Time = T, R = R1>,
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>)
{

Expand Down
Loading