Skip to content

Commit

Permalink
Relax Arrange trait to input types
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 26, 2023
1 parent d19b934 commit d9ec6db
Showing 1 changed file with 37 additions and 38 deletions.
75 changes: 37 additions & 38 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,85 +434,84 @@ where
}
}

/// A type that can be arranged into a trace of type `T`.
/// A type that can be arranged as if a collection of updates shaped as `((K,V),G::Timestamp,R)`.
///
/// This trait is implemented for appropriately typed collections and all traces that might accommodate them,
/// as well as by arranged data for their corresponding trace type.
pub trait Arrange<G: Scope, K, V, R: Semigroup>
/// This trait is primarily implemented by `Collection<G,(K,V),R>`.
///
/// The resulting arrangements may not present as `((K,V),T,R)`, as their output types are unconstrained.
/// This allows e.g. for `Vec<u8>` inputs to present as `&[u8]` when read, but that relationship is not
/// constrained by this trait.
pub trait Arrange<G, K, V, R>
where
G: Scope,
G::Timestamp: Lattice,
K: ToOwned + ?Sized,
K::Owned: Data,
V: Data,
R: Semigroup
{
/// Arranges a stream of `(Key, Val)` updates by `Key`. Accepts an empty instance of the trace type.
/// Arranges a stream of `(Key, Val)` updates by `Key`.
///
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// This trace is current for all times marked completed in the output stream, and probing this stream
/// is the correct way to determine that times in the shared trace are committed.
fn arrange<Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
where
K::Owned: ExchangeData+Hashable,
Tr: Trace<Time=G::Timestamp> + 'static,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace<Key=K>+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.arrange_named("Arrange")
}

/// Arranges a stream of `(Key, Val)` updates by `Key`. Accepts an empty instance of the trace type.
/// Arranges a stream of `(Key, Val)` updates by `Key`, and presents with a `name` argument.
///
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// This trace is current for all times marked completed in the output stream, and probing this stream
/// is the correct way to determine that times in the shared trace are committed.
fn arrange_named<Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
K::Owned: ExchangeData+Hashable,
Tr: Trace<Time=G::Timestamp> + 'static,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace<Key=K>+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
let exchange = Exchange::new(move |update: &((K::Owned,V),G::Timestamp,R)| (update.0).0.hashed().into());
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
}

/// Arranges a stream of `(Key, Val)` updates by `Key`. Accepts an empty instance of the trace type.
/// Arranges a stream of `(Key, Val)` updates by `Key`, configured with a name and a parallelization contract.
///
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// This trace is current for all times marked completed in the output stream, and probing this stream
/// is the correct way to determine that times in the shared trace are committed.
/// It uses the supplied parallelization contract to distribute the data, which does not need to
/// be consistently by key (though this is the most common).
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K::Owned,V),G::Timestamp,R)>,
Tr: Trace<Key=K>+TraceReader<Time=G::Timestamp>+'static,
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
K: Clone,
V: Clone,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
;
}

impl<G, K, V, R> Arrange<G, K, V, R> for Collection<G, (K::Owned, V), R>
impl<G, K, V, R> Arrange<G, K, V, R> for Collection<G, (K, V), R>
where
G: Scope,
G::Timestamp: Lattice+Ord,
K: ToOwned + ?Sized,
K::Owned: Data,
V: Data,
G::Timestamp: Lattice,
K: Clone + 'static,
V: Clone + 'static,
R: Semigroup,
{
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K::Owned,V),G::Timestamp,R)>,
Tr: Trace<Key=K>+TraceReader<Time=G::Timestamp>+'static,
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K::Owned,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
// The `Arrange` operator is tasked with reacting to an advancing input
// frontier by producing the sequence of batches whose lower and upper
Expand Down

0 comments on commit d9ec6db

Please sign in to comment.