From b83b69056ed10e616d82ef6f70f6db4c1dfaa5ab Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 29 Nov 2023 16:35:25 -0500 Subject: [PATCH] Non-working commit --- src/algorithms/graphs/bfs.rs | 2 +- src/algorithms/graphs/bijkstra.rs | 2 +- src/algorithms/graphs/propagate.rs | 5 +- src/operators/arrange/agent.rs | 6 +- src/operators/arrange/arrangement.rs | 38 ++-- src/operators/arrange/upsert.rs | 22 +-- src/operators/consolidate.rs | 2 +- src/operators/count.rs | 8 +- src/operators/join.rs | 86 ++++----- src/operators/mod.rs | 3 +- src/operators/reduce.rs | 96 +++++----- src/operators/threshold.rs | 10 +- src/trace/cursor/cursor_list.rs | 18 +- src/trace/cursor/mod.rs | 51 ++++-- src/trace/implementations/mod.rs | 203 ++++++++++++---------- src/trace/implementations/ord.rs | 16 +- src/trace/implementations/ord_neu.rs | 57 +++--- src/trace/implementations/rhh.rs | 115 ++++++------ src/trace/implementations/spine_fueled.rs | 12 +- src/trace/mod.rs | 52 ++++-- src/trace/wrappers/enter.rs | 28 +-- src/trace/wrappers/enter_at.rs | 36 ++-- src/trace/wrappers/filter.rs | 36 ++-- src/trace/wrappers/freeze.rs | 32 ++-- src/trace/wrappers/frontier.rs | 28 +-- src/trace/wrappers/rc.rs | 6 +- 26 files changed, 512 insertions(+), 458 deletions(-) diff --git a/src/algorithms/graphs/bfs.rs b/src/algorithms/graphs/bfs.rs index 8f42cb47d..756b1e631 100644 --- a/src/algorithms/graphs/bfs.rs +++ b/src/algorithms/graphs/bfs.rs @@ -29,7 +29,7 @@ where G: Scope, G::Timestamp: Lattice+Ord, N: ExchangeData+Hash, - Tr: TraceReader+Clone+'static, + Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static, { // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); diff --git a/src/algorithms/graphs/bijkstra.rs b/src/algorithms/graphs/bijkstra.rs index f8a4662be..8a416318d 100644 --- a/src/algorithms/graphs/bijkstra.rs +++ b/src/algorithms/graphs/bijkstra.rs @@ -45,7 +45,7 @@ where G: Scope, G::Timestamp: Lattice+Ord, N: ExchangeData+Hash, - Tr: TraceReader+Clone+'static, + Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static, { forward .stream diff --git a/src/algorithms/graphs/propagate.rs b/src/algorithms/graphs/propagate.rs index 32947d4c7..42f9b58b8 100644 --- a/src/algorithms/graphs/propagate.rs +++ b/src/algorithms/graphs/propagate.rs @@ -5,7 +5,6 @@ use std::hash::Hash; use timely::dataflow::*; use ::{Collection, ExchangeData}; -use ::operators::*; use ::lattice::Lattice; use ::difference::{Abelian, Multiply}; use ::operators::arrange::arrangement::ArrangeByKey; @@ -64,7 +63,7 @@ where R: Multiply, R: From, L: ExchangeData, - Tr: TraceReader+Clone+'static, + Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=R>+Clone+'static, F: Fn(&L)->u64+Clone+'static, { // Morally the code performs the following iterative computation. However, in the interest of a simplified @@ -90,6 +89,8 @@ where use timely::order::Product; + use operators::join::JoinCore; + let edges = edges.enter(scope); let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize)); diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 03ea5af1b..088693c87 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -47,8 +47,10 @@ where Tr: TraceReader, Tr::Time: Lattice+Ord+Clone+'static, { - type Key = Tr::Key; - type Val = Tr::Val; + type Key<'a> = Tr::Key<'a>; + type KeyOwned = Tr::KeyOwned; + type Val<'a> = Tr::Val<'a>; + type ValOwned = Tr::ValOwned; type Time = Tr::Time; type Diff = Tr::Diff; diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 42bf28f87..6e246bc4e 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -37,6 +37,8 @@ use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; use trace::wrappers::enter_at::BatchEnter as BatchEnterAt; use trace::wrappers::filter::{TraceFilter, BatchFilter}; +use trace::cursor::MyTrait; + use super::TraceAgent; /// An arranged collection of `(K,V)` values. @@ -89,8 +91,6 @@ where pub fn enter<'a, TInner>(&self, child: &Child<'a, G, TInner>) -> Arranged, TraceEnter> where - Tr::Key: 'static, - Tr::Val: 'static, Tr::Diff: 'static, G::Timestamp: Clone+'static, TInner: Refines+Lattice+Timestamp+Clone+'static, @@ -108,8 +108,6 @@ where pub fn enter_region<'a>(&self, child: &Child<'a, G, G::Timestamp>) -> Arranged, Tr> where - Tr::Key: 'static, - Tr::Val: 'static, Tr::Diff: 'static, G::Timestamp: Clone+'static, { @@ -127,12 +125,10 @@ where pub fn enter_at<'a, TInner, F, P>(&self, child: &Child<'a, G, TInner>, logic: F, prior: P) -> Arranged, TraceEnterAt> where - Tr::Key: 'static, - Tr::Val: 'static, Tr::Diff: 'static, G::Timestamp: Clone+'static, TInner: Refines+Lattice+Timestamp+Clone+'static, - F: FnMut(&Tr::Key, &Tr::Val, &G::Timestamp)->TInner+Clone+'static, + F: for <'b> FnMut(Tr::Key<'b>, Tr::Val<'b>, &G::Timestamp)->TInner+Clone+'static, P: FnMut(&TInner)->Tr::Time+Clone+'static, { let logic1 = logic.clone(); @@ -177,11 +173,9 @@ where pub fn filter(&self, logic: F) -> Arranged> where - Tr::Key: 'static, - Tr::Val: 'static, Tr::Diff: 'static, G::Timestamp: Clone+'static, - F: FnMut(&Tr::Key, &Tr::Val)->bool+Clone+'static, + F: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>)->bool+Clone+'static, { let logic1 = logic.clone(); let logic2 = logic.clone(); @@ -198,7 +192,7 @@ where pub fn as_collection(&self, mut logic: L) -> Collection where Tr::Diff: Semigroup, - L: FnMut(&Tr::Key, &Tr::Val) -> D+'static, + L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>) -> D+'static, { self.flat_map_ref(move |key, val| Some(logic(key,val))) } @@ -212,7 +206,7 @@ where Tr::Diff: Semigroup, I: IntoIterator, I::Item: Data, - L: FnMut(&Tr::Key, &Tr::Val) -> I+'static, + L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>) -> I+'static, { Self::flat_map_batches(&self.stream, logic) } @@ -229,7 +223,7 @@ where Tr::Diff: Semigroup, I: IntoIterator, I::Item: Data, - L: FnMut(&Tr::Key, &Tr::Val) -> I+'static, + L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>) -> I+'static, { stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| { input.for_each(|time, data| { @@ -258,16 +252,16 @@ where /// /// This method consumes a stream of (key, time) queries and reports the corresponding stream of /// (key, value, time, diff) accumulations in the `self` trace. - pub fn lookup(&self, queries: &Stream) -> Stream + pub fn lookup(&self, queries: &Stream) -> Stream where G::Timestamp: Data+Lattice+Ord+TotalOrder, - Tr::Key: ExchangeData+Hashable, - Tr::Val: ExchangeData, + Tr::KeyOwned: ExchangeData+Hashable, + Tr::ValOwned: ExchangeData, Tr::Diff: ExchangeData+Semigroup, Tr: 'static, { // while the arrangement is already correctly distributed, the query stream may not be. - let exchange = Exchange::new(move |update: &(Tr::Key,G::Timestamp)| update.0.hashed().into()); + let exchange = Exchange::new(move |update: &(Tr::KeyOwned,G::Timestamp)| update.0.hashed().into()); queries.binary_frontier(&self.stream, exchange, Pipeline, "TraceQuery", move |_capability, _info| { let mut trace = Some(self.trace.clone()); @@ -280,8 +274,8 @@ where let mut active = Vec::new(); let mut retain = Vec::new(); - let mut working: Vec<(G::Timestamp, Tr::Val, Tr::Diff)> = Vec::new(); - let mut working2: Vec<(Tr::Val, Tr::Diff)> = Vec::new(); + let mut working: Vec<(G::Timestamp, Tr::ValOwned, Tr::Diff)> = Vec::new(); + let mut working2: Vec<(Tr::ValOwned, Tr::Diff)> = Vec::new(); move |input1, input2, output| { @@ -346,13 +340,13 @@ where same_key += 1; } - cursor.seek_key(&storage, key); - if cursor.get_key(&storage) == Some(key) { + cursor.seek_key_owned(&storage, key); + if cursor.get_key(&storage).map(|k| k.equals(key)).unwrap_or(false) { let mut active = &active[active_finger .. same_key]; while let Some(val) = cursor.get_val(&storage) { - cursor.map_times(&storage, |t,d| working.push((t.clone(), val.clone(), d.clone()))); + cursor.map_times(&storage, |t,d| working.push((t.clone(), val.into_owned(), d.clone()))); cursor.step_val(&storage); } diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index b5b4618bc..d5a5cbcb0 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -136,17 +136,17 @@ use super::TraceAgent; /// understand what a "sequence" of upserts would mean for partially ordered /// timestamps. pub fn arrange_from_upsert( - stream: &Stream, G::Timestamp)>, + stream: &Stream, G::Timestamp)>, name: &str, ) -> Arranged> where G: Scope, G::Timestamp: Lattice+Ord+TotalOrder+ExchangeData, - Tr::Key: ExchangeData+Hashable+std::hash::Hash, - Tr::Val: ExchangeData, + Tr::KeyOwned: ExchangeData+Hashable+std::hash::Hash, + Tr::ValOwned: ExchangeData, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - Tr::Builder: Builder, + Tr::Builder: Builder, { let mut reader: Option> = None; @@ -155,7 +155,7 @@ where let reader = &mut reader; - let exchange = Exchange::new(move |update: &(Tr::Key,Option,G::Timestamp)| (update.0).hashed().into()); + let exchange = Exchange::new(move |update: &(Tr::KeyOwned,Option,G::Timestamp)| (update.0).hashed().into()); stream.unary_frontier(exchange, name, move |_capability, info| { @@ -185,7 +185,7 @@ where let mut prev_frontier = Antichain::from_elem(::minimum()); // For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap). - let mut priority_queue = BinaryHeap::)>>::new(); + let mut priority_queue = BinaryHeap::)>>::new(); let mut updates = Vec::new(); move |input, output| { @@ -252,12 +252,14 @@ where let mut builder = Tr::Builder::new(); for (key, mut list) in to_process.drain(..) { + use trace::cursor::MyTrait; + // The prior value associated with the key. - let mut prev_value: Option = None; + let mut prev_value: Option = None; // Attempt to find the key in the trace. - trace_cursor.seek_key(&trace_storage, &key); - if trace_cursor.get_key(&trace_storage) == Some(&key) { + trace_cursor.seek_key_owned(&trace_storage, &key); + if trace_cursor.get_key(&trace_storage).map(|k| k.equals(&key)).unwrap_or(false) { // Determine the prior value associated with the key. while let Some(val) = trace_cursor.get_val(&trace_storage) { let mut count = 0; @@ -265,7 +267,7 @@ where assert!(count == 0 || count == 1); if count == 1 { assert!(prev_value.is_none()); - prev_value = Some(val.clone()); + prev_value = Some(val.into_owned()); } trace_cursor.step_val(&trace_storage); } diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 7345a3ef9..6240b4b0c 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -56,7 +56,7 @@ where /// As `consolidate` but with the ability to name the operator and specify the trace type. pub fn consolidate_named(&self, name: &str) -> Self where - Tr: crate::trace::Trace+crate::trace::TraceReader+'static, + Tr: for<'a> crate::trace::Trace=&'a D,Val<'a>=&'a (),Time=G::Timestamp,Diff=R>+'static, Tr::Batch: crate::trace::Batch, Tr::Batcher: Batcher, Tr::Builder: Builder, diff --git a/src/operators/count.rs b/src/operators/count.rs index 5d8a6dbeb..77c084da9 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -55,14 +55,14 @@ where G::Timestamp: TotalOrder+Lattice+Ord { } } -impl CountTotal for Arranged +impl CountTotal for Arranged where G::Timestamp: TotalOrder+Lattice+Ord, - T1: TraceReader+Clone+'static, - T1::Key: ExchangeData, + T1: for<'a> TraceReader=&'a K, Val<'a>=&'a (), Time=G::Timestamp>+Clone+'static, + K: ExchangeData, T1::Diff: ExchangeData+Semigroup, { - fn count_total_core>(&self) -> Collection { + fn count_total_core>(&self) -> Collection { let mut trace = self.trace.clone(); let mut buffer = Vec::new(); diff --git a/src/operators/join.rs b/src/operators/join.rs index 67b7ebfb4..e9c6cda53 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -181,29 +181,33 @@ where } } -impl Join for Arranged +impl Join for Arranged where G: Scope, G::Timestamp: Lattice+Ord, - Tr: TraceReader+Clone+'static, - Tr::Key: Data+Hashable, - Tr::Val: Data, + Tr: for<'a> TraceReader = &'a K, Val<'a> = &'a V>+Clone+'static, + K: ExchangeData+Hashable, + V: Data + 'static, Tr::Diff: Semigroup, { - fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> - where Tr::Key: ExchangeData, Tr::Diff: Multiply, >::Output: Semigroup, L: FnMut(&Tr::Key, &Tr::Val, &V2)->D+'static { + fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> + where + Tr::Diff: Multiply, + >::Output: Semigroup, + L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>, &V2)->D+'static, + { let arranged2 = other.arrange_by_key(); self.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2))) } - fn semijoin(&self, other: &Collection) -> Collection>::Output> - where Tr::Key: ExchangeData, Tr::Diff: Multiply, >::Output: Semigroup { + fn semijoin(&self, other: &Collection) -> Collection>::Output> + where Tr::Diff: Multiply, >::Output: Semigroup { let arranged2 = other.arrange_by_self(); self.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone()))) } - fn antijoin(&self, other: &Collection) -> Collection - where Tr::Key: ExchangeData, Tr::Diff: Multiply, Tr::Diff: Abelian { + fn antijoin(&self, other: &Collection) -> Collection + where Tr::Diff: Multiply, Tr::Diff: Abelian { self.as_collection(|k,v| (k.clone(), v.clone())) .concat(&self.semijoin(other).negate()) } @@ -253,14 +257,14 @@ pub trait JoinCore (&self, stream2: &Arranged, result: L) -> Collection>::Output> where - Tr2: TraceReader+Clone+'static, - Tr2::Val: Ord+'static, + Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, + // Tr2::Val: Ord+'static, Tr2::Diff: Semigroup, R: Multiply, >::Output: Semigroup, I: IntoIterator, I::Item: Data, - L: FnMut(&K,&V,&Tr2::Val)->I+'static, + L: for<'a> FnMut(&K,&V,Tr2::Val<'a>)->I+'static, ; /// An unsafe variant of `join_core` where the `result` closure takes additional arguments for `time` and @@ -303,13 +307,13 @@ pub trait JoinCore (&self, stream2: &Arranged, result: L) -> Collection where - Tr2: TraceReader+Clone+'static, - Tr2::Val: Ord+'static, + Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, + // Tr2::Val: Ord+'static, Tr2::Diff: Semigroup, D: Data, ROut: Semigroup, I: IntoIterator, - L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::Diff)->I+'static, + L: for<'a> FnMut(&K,&V,Tr2::Val<'a>,&G::Timestamp,&R,&Tr2::Diff)->I+'static, ; } @@ -324,14 +328,14 @@ where { fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> where - Tr2: TraceReader+Clone+'static, - Tr2::Val: Ord+'static, + Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, + // Tr2::Val: Ord+'static, Tr2::Diff: Semigroup, R: Multiply, >::Output: Semigroup, I: IntoIterator, I::Item: Data, - L: FnMut(&K,&V,&Tr2::Val)->I+'static, + L: for<'a> FnMut(&K,&V,Tr2::Val<'a>)->I+'static, { self.arrange_by_key() .join_core(stream2, result) @@ -339,41 +343,41 @@ where fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection where - Tr2: TraceReader+Clone+'static, - Tr2::Val: Ord+'static, + Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, + // Tr2::Val: Ord+'static, Tr2::Diff: Semigroup, R: Semigroup, D: Data, ROut: Semigroup, I: IntoIterator, - L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::Diff)->I+'static, + L: for<'a> FnMut(&K,&V,Tr2::Val<'a>,&G::Timestamp,&R,&Tr2::Diff)->I+'static, { self.arrange_by_key().join_core_internal_unsafe(stream2, result) } } -impl JoinCore for Arranged +impl JoinCore for Arranged where G: Scope, G::Timestamp: Lattice+Ord, - T1: TraceReader+Clone+'static, - T1::Key: Ord+'static, - T1::Val: Ord+'static, + T1: for<'a> TraceReader = &'a K, Val<'a> = &'a V, Time=G::Timestamp>+Clone+'static, + K: Ord+'static, + V: Ord+'static, T1::Diff: Semigroup, { fn join_core(&self, other: &Arranged, mut result: L) -> Collection>::Output> where - Tr2::Val: Ord+'static, - Tr2: TraceReader+Clone+'static, + // Tr2::Val: Ord+'static, + Tr2: for<'a> TraceReader=T1::Key<'a>,Time=G::Timestamp>+Clone+'static, Tr2::Diff: Semigroup, T1::Diff: Multiply, >::Output: Semigroup, I: IntoIterator, I::Item: Data, - L: FnMut(&T1::Key,&T1::Val,&Tr2::Val)->I+'static + L: for<'a> FnMut(T1::Key<'a>,T1::Val<'a>,Tr2::Val<'a>)->I+'static { - let result = move |k: &T1::Key, v1: &T1::Val, v2: &Tr2::Val, t: &G::Timestamp, r1: &T1::Diff, r2: &Tr2::Diff| { + let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: Tr2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &Tr2::Diff| { let t = t.clone(); let r = (r1.clone()).multiply(r2); result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone())) @@ -383,13 +387,13 @@ impl JoinCore for Arranged fn join_core_internal_unsafe (&self, other: &Arranged, result: L) -> Collection where - Tr2: TraceReader+Clone+'static, - Tr2::Val: Ord+'static, + Tr2: for<'a> TraceReader=T1::Key<'a>, Time=G::Timestamp>+Clone+'static, + // Tr2::Val: Ord+'static, Tr2::Diff: Semigroup, D: Data, ROut: Semigroup, I: IntoIterator, - L: FnMut(&T1::Key,&T1::Val,&Tr2::Val,&G::Timestamp,&T1::Diff,&Tr2::Diff)->I+'static, + L: for<'a> FnMut(T1::Key<'a>,T1::Val<'a>,Tr2::Val<'a>,&'a G::Timestamp,&'a T1::Diff,&'a Tr2::Diff)->I+'static, { join_traces(self, other, result) } @@ -408,16 +412,13 @@ where G: Scope, G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, - T1::Key: Ord, - T1::Val: Ord, T1::Diff: Semigroup, - T2: TraceReader+Clone+'static, - T2::Val: Ord, + T2: for<'a> TraceReader=T1::Key<'a>, Time=G::Timestamp>+Clone+'static, T2::Diff: Semigroup, D: Data, R: Semigroup, I: IntoIterator, - L: FnMut(&T1::Key,&T1::Val,&T2::Val,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static, + L: for<'a> FnMut(T1::Key<'a>,T1::Val<'a>,T2::Val<'a>,&'a G::Timestamp,&'a T1::Diff,&'a T2::Diff)->I+'static, { // Rename traces for symmetry from here on out. let mut trace1 = arranged1.trace.clone(); @@ -665,7 +666,7 @@ where T: Timestamp+Lattice+Ord, R: Semigroup, C1: Cursor, - C2: Cursor, + C2: for<'a> Cursor=C1::Key<'a>, Time=T>, C1::Diff: Semigroup, C2::Diff: Semigroup, D: Ord+Clone+Data, @@ -681,9 +682,8 @@ where impl Deferred where - C1::Key: Ord+Eq, C1: Cursor, - C2: Cursor, + C2: for<'a> Cursor=C1::Key<'a>, Time=T>, C1::Diff: Semigroup, C2::Diff: Semigroup, T: Timestamp+Lattice+Ord, @@ -711,7 +711,7 @@ where fn work(&mut self, output: &mut OutputHandle>, mut logic: L, fuel: &mut usize) where I: IntoIterator, - L: for<'a> FnMut(&C1::Key, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff)->I, + L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff)->I, { let meet = self.capability.time(); @@ -730,7 +730,7 @@ where while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) && effort < *fuel { - match trace.key(trace_storage).cmp(batch.key(batch_storage)) { + match trace.key(trace_storage).cmp(&batch.key(batch_storage)) { Ordering::Less => trace.seek_key(trace_storage, batch.key(batch_storage)), Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)), Ordering::Equal => { diff --git a/src/operators/mod.rs b/src/operators/mod.rs index 84a658b33..53ddb1d6b 100644 --- a/src/operators/mod.rs +++ b/src/operators/mod.rs @@ -121,11 +121,10 @@ where &'history mut self, cursor: &mut C, storage: &'storage C::Storage, - key: &C::Key, + key: C::Key<'storage>, logic: L ) -> HistoryReplay<'storage, 'history, C> where - C::Key: Eq, L: Fn(&C::Time)->C::Time, { self.clear(); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 8a0303b54..4c2d609f6 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -26,7 +26,7 @@ use trace::implementations::{KeySpine, ValSpine}; use trace::TraceReader; /// Extension trait for the `reduce` differential dataflow method. -pub trait Reduce where G::Timestamp: Lattice+Ord { +pub trait Reduce : ReduceCore where G::Timestamp: Lattice+Ord { /// Applies a reduction function on records grouped by key. /// /// Input data must be structured as `(key, val)` pairs. @@ -88,7 +88,7 @@ impl Reduce for Collection impl Reduce for Arranged where G::Timestamp: Lattice+Ord, - T1: TraceReader+Clone+'static, + T1: for<'a> TraceReader=&'a K, KeyOwned=K, Val<'a>=&'a V, Time=G::Timestamp, Diff=R>+Clone+'static, { fn reduce_named(&self, name: &str, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { @@ -175,7 +175,7 @@ where G::Timestamp: Lattice+Ord { impl Threshold for Arranged where G::Timestamp: Lattice+Ord, - T1: TraceReader+Clone+'static, + T1: for<'a> TraceReader=&'a K, KeyOwned=K, Val<'a>=&'a (), Time=G::Timestamp, Diff=R1>+Clone+'static, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { self.reduce_abelian::<_,KeySpine<_,_,_>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) @@ -230,7 +230,7 @@ where impl Count for Arranged where G::Timestamp: Lattice+Ord, - T1: TraceReader+Clone+'static, + T1: for<'a> TraceReader=&'a K, KeyOwned=K, Val<'a>=&'a (), Time=G::Timestamp, Diff=R>+Clone+'static, { fn count_core>(&self) -> Collection { self.reduce_abelian::<_,ValSpine<_,_,_,_>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) @@ -272,13 +272,11 @@ pub trait ReduceCore(&self, name: &str, mut logic: L) -> Arranged> where - T2: Trace+TraceReader+'static, - T2::Val: Data, - T2::Val: Ord + ToOwned::ValOwned>, - ::Owned: Data, + T2: for<'a> Trace= &'a K, Time=G::Timestamp>+'static, + T2::ValOwned: Data, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder::Owned), T2::Time, T2::Diff)>, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned, T2::Diff)>)+'static, { self.reduce_core::<_,T2>(name, move |key, input, output, change| { @@ -297,13 +295,11 @@ pub trait ReduceCore(&self, name: &str, logic: L) -> Arranged> where - T2: Trace+TraceReader+'static, - T2::Val: Data, - T2::Val: Ord + ToOwned::ValOwned>, - ::Owned: Data, + T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, + T2::ValOwned: Data, T2::Diff: Semigroup, T2::Batch: Batch, - T2::Builder: Builder::Owned), T2::Time, T2::Diff)>, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, ; } @@ -319,13 +315,11 @@ where { fn reduce_core(&self, name: &str, logic: L) -> Arranged> where - T2::Val: Data, - T2::Val: Ord + ToOwned::ValOwned>, - ::Owned: Data, + T2::ValOwned: Data, T2::Diff: Semigroup, - T2: Trace+TraceReader+'static, + T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, T2::Batch: Batch, - T2::Builder: Builder::Owned), T2::Time, T2::Diff)>, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -339,16 +333,15 @@ where K::Owned: Data, V: ToOwned + Ord + ?Sized, G::Timestamp: Lattice+Ord, - T1: TraceReader+Clone+'static, + T1: for<'a> TraceReader=&'a K, KeyOwned = ::Owned, Val<'a>=&'a V, Time=G::Timestamp, Diff=R>+Clone+'static, { fn reduce_core(&self, name: &str, logic: L) -> Arranged> where - T2: Trace+TraceReader+'static, - T2::Val: Ord + ToOwned::ValOwned>, - ::Owned: Data, + T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, + T2::ValOwned: Data, T2::Diff: Semigroup, T2::Batch: Batch, - T2::Builder: Builder::Owned), T2::Time, T2::Diff)>, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, { reduce_trace(self, name, logic) @@ -360,17 +353,13 @@ where G: Scope, G::Timestamp: Lattice+Ord, T1: TraceReader + Clone + 'static, - T1::Key: Ord + ToOwned, - ::Owned: Ord, - T1::Val: Ord, T1::Diff: Semigroup, - T2: Trace+TraceReader + 'static, - T2::Val: Ord + ToOwned::ValOwned>, - ::Owned: Data, + T2: for<'a> Trace=T1::Key<'a>, Time=G::Timestamp> + 'static, + T2::ValOwned: Data, T2::Diff: Semigroup, T2::Batch: Batch, - T2::Builder: Builder::Owned, ::Owned), T2::Time, T2::Diff)>, - L: FnMut(&T1::Key, &[(&T1::Val, T1::Diff)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, + T2::Builder: Builder, + L: for<'a> FnMut(T1::Key<'a>, &[(T1::Val<'a>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static, { let mut result_trace = None; @@ -407,7 +396,7 @@ where // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times, // as well as capabilities for these times (or their lower envelope, at least). - let mut interesting = Vec::<(::Owned, G::Timestamp)>::new(); + let mut interesting = Vec::<(T1::KeyOwned, G::Timestamp)>::new(); let mut capabilities = Vec::>::new(); // buffers and logic for computing per-key interesting times "efficiently". @@ -530,13 +519,14 @@ where while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() { use std::borrow::Borrow; - + use trace::cursor::MyTrait; + // Determine the next key we will work on; could be synthetic, could be from a batch. - let key1 = exposed.get(exposed_position).map(|x| &x.0); + let key1 = exposed.get(exposed_position).map(|x| <_ as MyTrait>::borrow_as(&x.0)); let key2 = batch_cursor.get_key(&batch_storage); let key = match (key1, key2) { - (Some(key1), Some(key2)) => ::std::cmp::min(key1.borrow(), key2), - (Some(key1), None) => key1.borrow(), + (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2), + (Some(key1), None) => key1, (None, Some(key2)) => key2, (None, None) => unreachable!(), }; @@ -548,7 +538,7 @@ where interesting_times.clear(); // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key. - while exposed.get(exposed_position).map(|x| x.0.borrow()) == Some(key) { + while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.equals(k)).unwrap_or(false) { interesting_times.push(exposed[exposed_position].1.clone()); exposed_position += 1; } @@ -576,7 +566,7 @@ where // Record future warnings about interesting times (and assert they should be "future"). for time in new_interesting_times.drain(..) { debug_assert!(upper_limit.less_equal(&time)); - interesting.push((key.to_owned(), time)); + interesting.push((key.into_owned(), time)); } // Sort each buffer by value and move into the corresponding builder. @@ -586,7 +576,7 @@ where for index in 0 .. buffers.len() { buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - builders[index].push(((key.to_owned(), val), time, diff)); + builders[index].push(((key.into_owned(), val), time, diff)); } } } @@ -683,8 +673,8 @@ fn sort_dedup(list: &mut Vec) { trait PerKeyCompute<'a, C1, C2, C3> where C1: Cursor, - C2: Cursor, - C3: Cursor = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, + C2: Cursor = C1::Key<'a>, Time = C1::Time>, + C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, C2::ValOwned: Ord + Clone, C1::Time: Lattice+Ord+Clone, C1::Diff: Semigroup, @@ -693,7 +683,7 @@ where fn new() -> Self; fn compute( &mut self, - key: &C1::Key, + key: C1::Key<'a>, source_cursor: (&mut C1, &'a C1::Storage), output_cursor: (&mut C2, &'a C2::Storage), batch_cursor: (&mut C3, &'a C3::Storage), @@ -703,9 +693,9 @@ where outputs: &mut [(C2::Time, Vec<(C2::ValOwned, C2::Time, C2::Diff)>)], new_interesting: &mut Vec) -> (usize, usize) where - C1::Key: Eq, L: FnMut( - &C1::Key, &[(C1::Val<'a>, C1::Diff)], + C1::Key<'a>, + &[(C1::Val<'a>, C1::Diff)], &mut Vec<(C2::ValOwned, C2::Diff)>, &mut Vec<(C2::ValOwned, C2::Diff)>, ); @@ -730,8 +720,8 @@ mod history_replay { pub struct HistoryReplayer<'a, C1, C2, C3>//V1, V2, T, R1, R2> where C1: Cursor, - C2: Cursor, - C3: Cursor = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, + C2: Cursor = C1::Key<'a>, Time = C1::Time>, + C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, C2::ValOwned: Ord + Clone, C1::Time: Lattice+Ord+Clone, C1::Diff: Semigroup, @@ -753,8 +743,8 @@ mod history_replay { impl<'a, C1, C2, C3> PerKeyCompute<'a, C1, C2, C3> for HistoryReplayer<'a, C1, C2, C3> where C1: Cursor, - C2: Cursor, - C3: Cursor = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, + C2: Cursor = C1::Key<'a>, Time = C1::Time>, + C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, C2::ValOwned: Ord + Clone, C1::Time: Lattice+Ord+Clone, C1::Diff: Semigroup, @@ -778,7 +768,7 @@ mod history_replay { #[inline(never)] fn compute( &mut self, - key: &C1::Key, + key: C1::Key<'a>, (source_cursor, source_storage): (&mut C1, &'a C1::Storage), (output_cursor, output_storage): (&mut C2, &'a C2::Storage), (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage), @@ -788,9 +778,9 @@ mod history_replay { outputs: &mut [(C2::Time, Vec<(C2::ValOwned, C2::Time, C2::Diff)>)], new_interesting: &mut Vec) -> (usize, usize) where - C1::Key: Eq, L: FnMut( - &C1::Key, &[(C1::Val<'a>, C1::Diff)], + C1::Key<'a>, + &[(C1::Val<'a>, C1::Diff)], &mut Vec<(C2::ValOwned, C2::Diff)>, &mut Vec<(C2::ValOwned, C2::Diff)>, ) @@ -956,7 +946,7 @@ mod history_replay { for &((value, ref time), ref diff) in output_replay.buffer().iter() { if time.less_equal(&next_time) { use trace::cursor::MyTrait; - self.output_buffer.push((<_ as MyTrait>::to_owned(value), diff.clone())); + self.output_buffer.push((<_ as MyTrait>::into_owned(value), diff.clone())); } else { self.temporary.push(next_time.join(time)); diff --git a/src/operators/threshold.rs b/src/operators/threshold.rs index d17078b12..9feb2386f 100644 --- a/src/operators/threshold.rs +++ b/src/operators/threshold.rs @@ -102,17 +102,17 @@ where G::Timestamp: TotalOrder+Lattice+Ord { } } -impl ThresholdTotal for Arranged +impl ThresholdTotal for Arranged where G::Timestamp: TotalOrder+Lattice+Ord, - T1: TraceReader+Clone+'static, - T1::Key: ExchangeData, + T1: for<'a> TraceReader=&'a K, Val<'a>=&'a (), Time=G::Timestamp>+Clone+'static, + K: ExchangeData, T1::Diff: ExchangeData+Semigroup, { - fn threshold_semigroup(&self, mut thresh: F) -> Collection + fn threshold_semigroup(&self, mut thresh: F) -> Collection where R2: Semigroup, - F: FnMut(&T1::Key,&T1::Diff,Option<&T1::Diff>)->Option+'static, + F: for<'a> FnMut(T1::Key<'a>,&T1::Diff,Option<&T1::Diff>)->Option+'static, { let mut trace = self.trace.clone(); diff --git a/src/trace/cursor/cursor_list.rs b/src/trace/cursor/cursor_list.rs index 3b0bcd623..6192ce66a 100644 --- a/src/trace/cursor/cursor_list.rs +++ b/src/trace/cursor/cursor_list.rs @@ -13,11 +13,7 @@ pub struct CursorList { min_val: Vec, } -impl CursorList -where - C: Cursor, - C::Key: Ord, -{ +impl CursorList { /// Creates a new cursor list from pre-existing cursors. pub fn new(cursors: Vec, storage: &[C::Storage]) -> Self { let mut result = CursorList { @@ -88,11 +84,9 @@ where } } -impl Cursor for CursorList -where - C::Key: Ord, -{ - type Key = C::Key; +impl Cursor for CursorList { + type Key<'a> = C::Key<'a>; + type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; type ValOwned = C::ValOwned; type Time = C::Time; @@ -108,7 +102,7 @@ where // accessors #[inline] - fn key<'a>(&self, storage: &'a Vec) -> &'a Self::Key { + fn key<'a>(&self, storage: &'a Vec) -> Self::Key<'a> { debug_assert!(self.key_valid(storage)); debug_assert!(self.cursors[self.min_key[0]].key_valid(&storage[self.min_key[0]])); self.cursors[self.min_key[0]].key(&storage[self.min_key[0]]) @@ -136,7 +130,7 @@ where self.minimize_keys(storage); } #[inline] - fn seek_key(&mut self, storage: &Vec, key: &Self::Key) { + fn seek_key<'a>(&mut self, storage: &Vec, key: Self::Key<'a>) { for index in 0 .. self.cursors.len() { self.cursors[index].seek_key(&storage[index], key); } diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index 431247b5b..cd602bae2 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -12,23 +12,41 @@ pub mod cursor_list; pub use self::cursor_list::CursorList; use std::borrow::Borrow; +use std::cmp::Ordering; + /// A type that may be converted into and compared with another type. /// /// The type must also be comparable with itself, and follow the same /// order as if converting instances to `T` and comparing the results. -pub trait MyTrait : Ord { +pub trait MyTrait<'a> : Ord { /// Owned type into which this type can be converted. type Owned; /// Conversion from an instance of this type to the owned type. - fn to_owned(self) -> Self::Owned; + fn into_owned(self) -> Self::Owned; + /// + fn clone_onto(&self, other: &mut Self::Owned); /// Indicates that `self <= other`; used for sorting. - fn less_equal(&self, other: &Self::Owned) -> bool; + fn compare(&self, other: &Self::Owned) -> Ordering; + fn less_equals(&self, other: &Self::Owned) -> bool { + self.compare(other) != Ordering::Greater + } + fn equals(&self, other: &Self::Owned) -> bool { + self.compare(other) == Ordering::Equal + } + fn less_than(&self, other: &Self::Owned) -> bool { + self.compare(other) == Ordering::Less + } + fn borrow_as(other: &'a Self::Owned) -> Self; } -impl<'a, T: Ord+ToOwned+?Sized> MyTrait for &'a T { +impl<'a, T: Ord+ToOwned+?Sized> MyTrait<'a> for &'a T { type Owned = T::Owned; - fn to_owned(self) -> Self::Owned { self.to_owned() } - fn less_equal(&self, other: &Self::Owned) -> bool { self.le(&other.borrow()) } + fn into_owned(self) -> Self::Owned { self.to_owned() } + fn clone_onto(&self, other: &mut Self::Owned) { ::clone_into(self, other) } + fn compare(&self, other: &Self::Owned) -> Ordering { self.cmp(&other.borrow()) } + fn borrow_as(other: &'a Self::Owned) -> Self { + other.borrow() + } } @@ -36,9 +54,11 @@ impl<'a, T: Ord+ToOwned+?Sized> MyTrait for &'a T { pub trait Cursor { /// Key by which updates are indexed. - type Key: ?Sized; + type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>; + /// Owned version of the above. + type KeyOwned: Ord + Clone; /// Values associated with keys. - type Val<'a>: Copy + Clone + MyTrait; + type Val<'a>: Copy + Clone + MyTrait<'a, Owned = Self::ValOwned>; /// Owned version of the above. type ValOwned: Ord + Clone; /// Timestamps associated with updates @@ -59,12 +79,12 @@ pub trait Cursor { fn val_valid(&self, storage: &Self::Storage) -> bool; /// A reference to the current key. Asserts if invalid. - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key; + fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a>; /// A reference to the current value. Asserts if invalid. fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a>; /// Returns a reference to the current key, if valid. - fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<&'a Self::Key> { + fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { if self.key_valid(storage) { Some(self.key(storage)) } else { None } } /// Returns a reference to the current value, if valid. @@ -79,7 +99,11 @@ pub trait Cursor { /// Advances the cursor to the next key. fn step_key(&mut self, storage: &Self::Storage); /// Advances the cursor to the specified key. - fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key); + fn seek_key<'a>(&mut self, storage: &Self::Storage, key: Self::Key<'a>); + /// Convenience method to get access by reference to an owned key. + fn seek_key_owned<'a>(&mut self, storage: &Self::Storage, key: &'a Self::KeyOwned) { + self.seek_key(storage, as MyTrait<'a>>::borrow_as(key)); + } /// Advances the cursor to the next value. fn step_val(&mut self, storage: &Self::Storage); @@ -92,9 +116,8 @@ pub trait Cursor { fn rewind_vals(&mut self, storage: &Self::Storage); /// Rewinds the cursor and outputs its contents to a Vec - fn to_vec(&mut self, storage: &Self::Storage) -> Vec<((Self::Key, Self::ValOwned), Vec<(Self::Time, Self::Diff)>)> + fn to_vec(&mut self, storage: &Self::Storage) -> Vec<((Self::KeyOwned, Self::ValOwned), Vec<(Self::Time, Self::Diff)>)> where - Self::Key: Clone, Self::Time: Clone, Self::Diff: Clone, { @@ -107,7 +130,7 @@ pub trait Cursor { self.map_times(storage, |ts, r| { kv_out.push((ts.clone(), r.clone())); }); - out.push(((self.key(storage).clone(), self.val(storage).to_owned()), kv_out)); + out.push(((self.key(storage).into_owned(), self.val(storage).into_owned()), kv_out)); self.step_val(storage); } self.step_key(storage); diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index cc5c01512..a3c6a8405 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -45,17 +45,18 @@ pub(crate) mod merge_batcher_col; pub use self::merge_batcher::MergeBatcher as Batcher; -pub mod ord; +// pub mod ord; pub mod ord_neu; pub mod rhh; // Opinionated takes on default spines. -pub use self::ord::OrdValSpine as ValSpine; -pub use self::ord::OrdKeySpine as KeySpine; +pub use self::ord_neu::OrdValSpine as ValSpine; +pub type KeySpine = ValSpine; +// pub use self::ord::OrdKeySpine as KeySpine; use std::ops::{Add, Sub}; use std::convert::{TryInto, TryFrom}; -use std::borrow::{Borrow, ToOwned}; +use std::borrow::{ToOwned}; use timely::container::columnation::{Columnation, TimelyStack}; use lattice::Lattice; @@ -63,14 +64,10 @@ use difference::Semigroup; /// A type that names constituent update types. pub trait Update { - /// We will be able to read out references to this type, and must supply `Key::Owned` as input. - type Key: Ord + ToOwned + ?Sized; /// Key by which data are grouped. - type KeyOwned: Ord+Clone + Borrow; + type Key: Ord + Clone + 'static; /// Values associated with the key. - type Val: Ord + ToOwned + ?Sized + 'static; - /// Values associated with the key, in owned form - type ValOwned: Ord+Clone + Borrow; + type Val: Ord + Clone + 'static; /// Time at which updates occur. type Time: Ord+Lattice+timely::progress::Timestamp+Clone; /// Way in which updates occur. @@ -79,15 +76,13 @@ pub trait Update { impl Update for ((K, V), T, R) where - K: Ord+Clone, + K: Ord+Clone+'static, V: Ord+Clone+'static, T: Ord+Lattice+timely::progress::Timestamp+Clone, R: Semigroup+Clone, { type Key = K; - type KeyOwned = K; type Val = V; - type ValOwned = V; type Time = T; type Diff = R; } @@ -102,15 +97,13 @@ pub trait Layout { type ValOffset: OrdOffset; /// Container for update keys. type KeyContainer: - RetainFrom<::Key>+ - BatchContainer::Key>; + BatchContainer::Key>; /// Container for update vals. type ValContainer: - RetainFrom<::Val>+ - BatchContainer::Val>; + BatchContainer::Val>; /// Container for update vals. type UpdContainer: - BatchContainer::Time, ::Diff)>; + for<'a> BatchContainer::Time, ::Diff), ReadItem<'a> = &'a (::Time, ::Diff)>; } /// A layout that uses vectors @@ -120,8 +113,11 @@ pub struct Vector { impl Layout for Vector where - U::Key: ToOwned + Sized + Clone, - U::Val: ToOwned + Sized + Clone, + U::Key: 'static, + U::Val: 'static, +// where +// U::Key: ToOwned + Sized + Clone + 'static, +// U::Val: ToOwned + Sized + Clone + 'static, { type Target = U; type KeyOffset = O; @@ -136,10 +132,10 @@ pub struct TStack { phantom: std::marker::PhantomData<(U, O)>, } -impl Layout for TStack +impl Layout for TStack where - U::Key: Columnation + ToOwned, - U::Val: Columnation + ToOwned, + U::Key: Columnation + 'static, + U::Val: Columnation + 'static, U::Time: Columnation, U::Diff: Columnation, { @@ -156,14 +152,14 @@ where /// Examples include types that implement `Clone` who prefer pub trait PreferredContainer : ToOwned { /// The preferred container for the type. - type Container: BatchContainer + RetainFrom; + type Container: BatchContainer; } -impl PreferredContainer for T { +impl PreferredContainer for T { type Container = Vec; } -impl PreferredContainer for [T] { +impl PreferredContainer for [T] { type Container = SliceContainer; } @@ -174,18 +170,16 @@ pub struct Preferred { impl Update for Preferred where - K: Ord+ToOwned + ?Sized, - K::Owned: Ord+Clone, - V: Ord+ToOwned + ?Sized + 'static, + K: ToOwned + ?Sized, + K::Owned: Ord+Clone+'static, + V: ToOwned + ?Sized + 'static, V::Owned: Ord+Clone, T: Ord+Lattice+timely::progress::Timestamp+Clone, R: Semigroup+Clone, O: OrdOffset, { - type Key = K; - type KeyOwned = K::Owned; - type Val = V; - type ValOwned = V::Owned; + type Key = K::Owned; + type Val = V::Owned; type Time = T; type Diff = R; } @@ -193,7 +187,8 @@ where impl Layout for Preferred where K: Ord+ToOwned+PreferredContainer + ?Sized, - K::Owned: Ord+Clone, + K::Owned: Ord+Clone+'static, + for<'a> K::Container: BatchContainer = &'a K>, V: Ord+ToOwned+PreferredContainer + ?Sized + 'static, V::Owned: Ord+Clone, T: Ord+Lattice+timely::progress::Timestamp+Clone, @@ -209,35 +204,35 @@ where } -/// A container that can retain/discard from some offset onward. -pub trait RetainFrom { - /// Retains elements from an index onwards that satisfy a predicate. - fn retain_frombool>(&mut self, index: usize, predicate: P); -} - -impl RetainFrom for Vec { - fn retain_frombool>(&mut self, index: usize, mut predicate: P) { - let mut write_position = index; - for position in index .. self.len() { - if predicate(position, &self[position]) { - self.swap(position, write_position); - write_position += 1; - } - } - self.truncate(write_position); - } -} - -impl RetainFrom for TimelyStack { - fn retain_frombool>(&mut self, index: usize, mut predicate: P) { - let mut position = index; - self.retain_from(index, |item| { - let result = predicate(position, item); - position += 1; - result - }) - } -} +// /// A container that can retain/discard from some offset onward. +// pub trait RetainFrom { +// /// Retains elements from an index onwards that satisfy a predicate. +// fn retain_frombool>(&mut self, index: usize, predicate: P); +// } + +// impl RetainFrom for Vec { +// fn retain_frombool>(&mut self, index: usize, mut predicate: P) { +// let mut write_position = index; +// for position in index .. self.len() { +// if predicate(position, &self[position]) { +// self.swap(position, write_position); +// write_position += 1; +// } +// } +// self.truncate(write_position); +// } +// } + +// impl RetainFrom for TimelyStack { +// fn retain_frombool>(&mut self, index: usize, mut predicate: P) { +// let mut position = index; +// self.retain_from(index, |item| { +// let result = predicate(position, item); +// position += 1; +// result +// }) +// } +// } /// Trait for types used as offsets into an ordered layer. /// This is usually `usize`, but `u32` can also be used in applications @@ -258,19 +253,24 @@ pub mod containers { use timely::container::columnation::{Columnation, TimelyStack}; use std::borrow::{Borrow, ToOwned}; + use trace::MyTrait; /// A general-purpose container resembling `Vec`. - pub trait BatchContainer: Default { + pub trait BatchContainer: Default + 'static { /// The type of contained item. /// /// The container only supplies references to the item, so it needn't be sized. - type Item: ?Sized; + type PushItem; + /// The type that can be read back out of the container. + type ReadItem<'a>: Copy + MyTrait<'a, Owned = Self::PushItem>; /// Inserts an owned item. - fn push(&mut self, item: ::Owned) where Self::Item: ToOwned; + fn push(&mut self, item: Self::PushItem); + /// Inserts an owned item. + fn copy_push(&mut self, item: &Self::PushItem); /// Inserts a borrowed item. - fn copy(&mut self, item: &Self::Item); + fn copy<'a>(&mut self, item: Self::ReadItem<'a>); /// Extends from a slice of items. - fn copy_slice(&mut self, slice: &[::Owned]) where Self::Item: ToOwned; + fn copy_slice(&mut self, slice: &[Self::PushItem]); /// Extends from a range of items in another`Self`. fn copy_range(&mut self, other: &Self, start: usize, end: usize); /// Creates a new container with sufficient capacity. @@ -281,11 +281,11 @@ pub mod containers { fn merge_capacity(cont1: &Self, cont2: &Self) -> Self; /// Reference to the element at this position. - fn index(&self, index: usize) -> &Self::Item; + fn index<'a>(&'a self, index: usize) -> Self::ReadItem<'a>; /// Number of contained elements fn len(&self) -> usize; /// Returns the last item if the container is non-empty. - fn last(&self) -> Option<&Self::Item> { + fn last<'a>(&'a self) -> Option> { if self.len() > 0 { Some(self.index(self.len()-1)) } @@ -300,7 +300,7 @@ pub mod containers { /// stays false once it becomes false, a joint property of the predicate /// and the layout of `Self. This allows `advance` to use exponential search to /// count the number of elements in time logarithmic in the result. - fn advancebool>(&self, start: usize, end: usize, function: F) -> usize { + fn advance Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize { let small_limit = 8; @@ -341,15 +341,20 @@ pub mod containers { // All `T: Clone` also implement `ToOwned`, but without the constraint Rust // struggles to understand why the owned type must be `T` (i.e. the one blanket impl). - impl> BatchContainer for Vec { - type Item = T; + impl BatchContainer for Vec { + type PushItem = T; + type ReadItem<'a> = &'a Self::PushItem; + fn push(&mut self, item: T) { self.push(item); } + fn copy_push(&mut self, item: &T) { + self.copy(item); + } fn copy(&mut self, item: &T) { self.push(item.clone()); } - fn copy_slice(&mut self, slice: &[T]) where T: Sized { + fn copy_slice(&mut self, slice: &[T]) { self.extend_from_slice(slice); } fn copy_range(&mut self, other: &Self, start: usize, end: usize) { @@ -364,7 +369,7 @@ pub mod containers { fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { Vec::with_capacity(cont1.len() + cont2.len()) } - fn index(&self, index: usize) -> &Self::Item { + fn index<'a>(&'a self, index: usize) -> Self::ReadItem<'a> { &self[index] } fn len(&self) -> usize { @@ -374,15 +379,20 @@ pub mod containers { // The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now // be presented with the actual contained type, rather than a type that borrows into it. - impl> BatchContainer for TimelyStack { - type Item = T; - fn push(&mut self, item: ::Owned) where Self::Item: ToOwned { + impl + 'static> BatchContainer for TimelyStack { + type PushItem = T; + type ReadItem<'a> = &'a Self::PushItem; + + fn push(&mut self, item: Self::PushItem) { + self.copy(item.borrow()); + } + fn copy_push(&mut self, item: &Self::PushItem) { self.copy(item.borrow()); } fn copy(&mut self, item: &T) { self.copy(item); } - fn copy_slice(&mut self, slice: &[::Owned]) where Self::Item: ToOwned { + fn copy_slice(&mut self, slice: &[Self::PushItem]) { self.reserve_items(slice.iter()); for item in slice.iter() { self.copy(item); @@ -405,7 +415,7 @@ pub mod containers { new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2))); new } - fn index(&self, index: usize) -> &Self::Item { + fn index<'a>(&'a self, index: usize) -> Self::ReadItem<'a> { &self[index] } fn len(&self) -> usize { @@ -426,23 +436,26 @@ pub mod containers { impl BatchContainer for SliceContainer where - B: Clone + Sized, - [B]: ToOwned>, + B: Ord + Clone + Sized + 'static, { - type Item = [B]; - fn push(&mut self, item: Vec) where Self::Item: ToOwned { + type PushItem = Vec; + type ReadItem<'a> = &'a [B]; + fn push(&mut self, item: Vec) { for x in item.into_iter() { self.inner.push(x); } self.offsets.push(self.inner.len()); } - fn copy(&mut self, item: &Self::Item) { + fn copy_push(&mut self, item: &Vec) { + self.copy(&item[..]); + } + fn copy<'a>(&mut self, item: Self::ReadItem<'a>) { for x in item.iter() { self.inner.copy(x); } self.offsets.push(self.inner.len()); } - fn copy_slice(&mut self, slice: &[Vec]) where Self::Item: ToOwned { + fn copy_slice(&mut self, slice: &[Vec]) { for item in slice { self.copy(item); } @@ -470,7 +483,7 @@ pub mod containers { inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()), } } - fn index(&self, index: usize) -> &Self::Item { + fn index<'a>(&'a self, index: usize) -> Self::ReadItem<'a> { let lower = self.offsets[index]; let upper = self.offsets[index+1]; &self.inner[lower .. upper] @@ -490,12 +503,12 @@ pub mod containers { } } - use trace::implementations::RetainFrom; - /// A container that can retain/discard from some offset onward. - impl RetainFrom<[B]> for SliceContainer { - /// Retains elements from an index onwards that satisfy a predicate. - fn retain_frombool>(&mut self, _index: usize, _predicate: P) { - unimplemented!() - } - } + // use trace::implementations::RetainFrom; + // /// A container that can retain/discard from some offset onward. + // impl RetainFrom<[B]> for SliceContainer { + // /// Retains elements from an index onwards that satisfy a predicate. + // fn retain_frombool>(&mut self, _index: usize, _predicate: P) { + // unimplemented!() + // } + // } } diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index ade2fc48a..d0cb63953 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -98,11 +98,11 @@ where // Type aliases to make certain types readable. type TDLayer = OrderedLeaf<<::Target as Update>::Time, <::Target as Update>::Diff>; -type VTDLayer = OrderedLayer<<::Target as Update>::Val, TDLayer, ::ValOffset, ::ValContainer>; +type VTDLayer = OrderedLayer<<::Target as Update>::ValOwned, TDLayer, ::ValOffset, ::ValContainer>; type KTDLayer = OrderedLayer<<::Target as Update>::Key, TDLayer, ::KeyOffset, ::KeyContainer>; type KVTDLayer = OrderedLayer<<::Target as Update>::Key, VTDLayer, ::KeyOffset, ::KeyContainer>; type TDBuilder = OrderedLeafBuilder<<::Target as Update>::Time, <::Target as Update>::Diff>; -type VTDBuilder = OrderedBuilder<<::Target as Update>::Val, TDBuilder, ::ValOffset, ::ValContainer>; +type VTDBuilder = OrderedBuilder<<::Target as Update>::ValOwned, TDBuilder, ::ValOffset, ::ValContainer>; type KTDBuilder = OrderedBuilder<<::Target as Update>::Key, TDBuilder, ::KeyOffset, ::KeyContainer>; type KVTDBuilder = OrderedBuilder<<::Target as Update>::Key, VTDBuilder, ::KeyOffset, ::KeyContainer>; @@ -112,7 +112,7 @@ where ::Val: Sized + Clone, { type Key = ::Key; - type Val = ::Val; + type Val<'a> = ::Val<'a>; type Time = ::Time; type Diff = ::Diff; @@ -343,7 +343,7 @@ where ::Val: Sized + Clone, { type Key = ::Key; - type Val<'a> = &'a ::Val; + type Val<'a> = ::Val<'a>; type ValOwned = ::ValOwned; type Time = ::Time; type Diff = ::Diff; @@ -383,9 +383,9 @@ impl Builder for OrdValBuilder where ::Key: Sized + Clone, ::Val: Sized + Clone, - OrdValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff> + // OrdValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, { - type Item = ((::Key, ::Val), ::Time, ::Diff); + type Item = ((::Key, ::ValOwned), ::Time, ::Diff); type Time = ::Time; type Output = OrdValBatch; @@ -451,7 +451,7 @@ where impl Batch for OrdKeyBatch where ::Key: Sized + Clone, - L::Target: Update, + L::Target: Update, { type Merger = OrdKeyMerger; @@ -680,7 +680,7 @@ where impl Builder for OrdKeyBuilder where ::Key: Sized + Clone, - OrdKeyBatch: Batch::Key, Val=(), Time=::Time, Diff=::Diff> + // OrdKeyBatch: Batch::Key, Val=(), Time=::Time, Diff=::Diff>, { type Item = ((::Key, ()), ::Time, ::Diff); type Time = ::Time; diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 3470942a0..e560f1782 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -55,7 +55,8 @@ mod val_batch { use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use trace::implementations::BatchContainer; - + use trace::cursor::MyTrait; + use super::{Layout, Update}; /// An immutable collection of update tuples, from a contiguous interval of logical times. @@ -120,8 +121,10 @@ mod val_batch { } impl BatchReader for OrdValBatch { - type Key = ::Key; - type Val = ::Val; + type Key<'a> = ::ReadItem<'a>; + type KeyOwned = ::Key; + type Val<'a> = ::ReadItem<'a>; + type ValOwned = ::Val; type Time = ::Time; type Diff = ::Diff; @@ -276,7 +279,7 @@ mod val_batch { /// if the updates cancel either directly or after compaction. fn merge_key(&mut self, source1: &OrdValStorage, source2: &OrdValStorage) { use ::std::cmp::Ordering; - match source1.keys.index(self.key_cursor1).cmp(source2.keys.index(self.key_cursor2)) { + match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) { Ordering::Less => { self.copy_key(source1, self.key_cursor1); self.key_cursor1 += 1; @@ -315,7 +318,7 @@ mod val_batch { // if they are non-empty post-consolidation, we write the value. // We could multi-way merge and it wouldn't be very complicated. use ::std::cmp::Ordering; - match source1.vals.index(lower1).cmp(source2.vals.index(lower2)) { + match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) { Ordering::Less => { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source1, lower1); @@ -377,7 +380,7 @@ mod val_batch { let (lower, upper) = source.updates_for_value(index); for i in lower .. upper { // NB: Here is where we would need to look back if `lower == upper`. - let (time, diff) = &source.updates.index(i); + let (time, diff) = &source.updates.index(i).into_owned(); use lattice::Lattice; let mut new_time = time.clone(); new_time.advance_by(self.description.since().borrow()); @@ -392,7 +395,7 @@ mod val_batch { if !self.update_stash.is_empty() { // If there is a single element, equal to a just-prior recorded update, // we push nothing and report an unincremented offset to encode this case. - if self.update_stash.len() == 1 && self.update_stash.last() == self.result.updates.last() { + if self.update_stash.len() == 1 && self.update_stash.last().map(|ud| ud.equals(self.result.updates.last().unwrap())).unwrap_or(false) { // Just clear out update_stash, as we won't drain it here. self.update_stash.clear(); self.singletons += 1; @@ -420,16 +423,20 @@ mod val_batch { phantom: PhantomData, } - impl Cursor for OrdValCursor { - type Key = ::Key; - type Val<'a> = &'a ::Val; - type ValOwned = ::ValOwned; + impl Cursor for OrdValCursor + // where + // L::KeyContainer: for <'a> BatchContainer = &'a ::Key>, + { + type Key<'a> = ::ReadItem<'a>; + type KeyOwned = ::Key; + type Val<'a> = ::ReadItem<'a>; + type ValOwned = ::Val; type Time = ::Time; type Diff = ::Diff; type Storage = OrdValBatch; - fn key<'a>(&self, storage: &'a OrdValBatch) -> &'a Self::Key { storage.storage.keys.index(self.key_cursor) } + fn key<'a>(&self, storage: &'a OrdValBatch) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) } fn val<'a>(&self, storage: &'a OrdValBatch) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) } fn map_times(&mut self, storage: &OrdValBatch, mut logic: L2) { let (lower, upper) = storage.storage.updates_for_value(self.val_cursor); @@ -449,8 +456,8 @@ mod val_batch { self.key_cursor = storage.storage.keys.len(); } } - fn seek_key(&mut self, storage: &OrdValBatch, key: &Self::Key) { - self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key)); + fn seek_key<'a>(&mut self, storage: &OrdValBatch, key: Self::Key<'a>) { + self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(&key)); if self.key_valid(storage) { self.rewind_vals(storage); } @@ -462,7 +469,7 @@ mod val_batch { } } fn seek_val<'a>(&mut self, storage: &OrdValBatch, val: Self::Val<'a>) { - self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(val)); + self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(&val)); } fn rewind_keys(&mut self, storage: &OrdValBatch) { self.key_cursor = 0; @@ -516,10 +523,10 @@ mod val_batch { impl Builder for OrdValBuilder where - OrdValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, - ::KeyOwned: Borrow<::Key>, + // OrdValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, + // ::Key: Borrow<::Key>, { - type Item = ((::KeyOwned, ::ValOwned), ::Time, ::Diff); + type Item = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; type Output = OrdValBatch; @@ -542,9 +549,9 @@ mod val_batch { fn push(&mut self, ((key, val), time, diff): Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(key.borrow()) { + if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { // Perhaps this is a continuation of an already received value. - if self.result.vals.last() == Some(val.borrow()) { + if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { self.push_update(time, diff); } else { // New value; complete representation of prior value. @@ -568,9 +575,9 @@ mod val_batch { fn copy(&mut self, ((key, val), time, diff): &Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(key.borrow()) { + if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { // Perhaps this is a continuation of an already received value. - if self.result.vals.last() == Some(val.borrow()) { + if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { // TODO: here we could look for repetition, and not push the update in that case. // More logic (and state) would be required to correctly wrangle this. self.push_update(time.clone(), diff.clone()); @@ -580,7 +587,7 @@ mod val_batch { // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time.clone(), diff.clone()); - self.result.vals.copy(val.borrow()); + self.result.vals.copy_push(val.borrow()); } } else { // New key; complete representation of prior key. @@ -589,8 +596,8 @@ mod val_batch { if self.singleton.take().is_some() { self.singletons += 1; } self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap()); self.push_update(time.clone(), diff.clone()); - self.result.vals.copy(val.borrow()); - self.result.keys.copy(key.borrow()); + self.result.vals.copy_push(val.borrow()); + self.result.keys.copy_push(key.borrow()); } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 1eba162fd..63ad3c45b 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -76,9 +76,12 @@ mod val_batch { use std::marker::PhantomData; use timely::progress::{Antichain, frontier::AntichainRef}; + use hashable::Hashable; + use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use trace::implementations::BatchContainer; - + use trace::cursor::MyTrait; + use super::{Layout, Update, HashOrdered}; /// Update tuples organized as a Robin Hood Hash map, ordered by `(hash(Key), Key, Val, Time)`. @@ -95,11 +98,10 @@ mod val_batch { /// We will use the `Hashable` trait here, but any consistent hash function should work out ok. /// We specifically want to use the highest bits of the result (we will) because the low bits have /// likely been spent shuffling the data between workers (by key), and are likely low entropy. - #[derive(Abomonation, Debug)] + #[derive(Abomonation)] pub struct RhhValStorage where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { /// The requested capacity for `keys`. We use this when determining where a key with a certain hash @@ -109,6 +111,8 @@ mod val_batch { pub divisor: usize, /// The number of present keys, distinct from `keys.len()` which contains pub key_count: usize, + /// + pub key_owned: <::Key as ToOwned>::Owned, /// An ordered list of keys, corresponding to entries in `keys_offs`. pub keys: L::KeyContainer, @@ -133,8 +137,7 @@ mod val_batch { impl RhhValStorage where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. fn values_for_key(&self, index: usize) -> (usize, usize) { @@ -179,7 +182,7 @@ mod val_batch { // Now we insert the key. Even if it is no longer the desired location because of contention. // If an offset has been supplied we insert it, and otherwise leave it for future determination. - self.keys.copy(key); + self.keys.copy_push(key); if let Some(offset) = offset { self.keys_offs.push(offset); } @@ -187,16 +190,15 @@ mod val_batch { } /// Indicates both the desired location and the hash signature of the key. - fn desired_location(&self, key: &::Key) -> usize { - use hashable::Hashable; + fn desired_location(&self, key: &K) -> usize { let hash: usize = key.hashed().into().try_into().unwrap(); hash / self.divisor } /// Returns true if one should advance one's index in the search for `key`. - fn advance_key(&self, index: usize, key: &::Key) -> bool { + fn advance_key<'a>(&self, index: usize, key: ::ReadItem<'a>) -> bool { // Ideally this short-circuits, as `self.keys[index]` is bogus data. - !self.live_key(index) || self.keys.index(index).lt(key) + !self.live_key(index) || self.keys.index(index).lt(&key) } /// Indicates that a key is valid, rather than dead space, by looking for a valid offset range. @@ -229,8 +231,7 @@ mod val_batch { #[derive(Abomonation)] pub struct RhhValBatch where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { /// The updates themselves. pub storage: RhhValStorage, @@ -246,11 +247,13 @@ mod val_batch { impl BatchReader for RhhValBatch where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, + for<'a> ::ReadItem<'a>: HashOrdered, { - type Key = ::Key; - type Val = ::Val; + type Key<'a> = ::ReadItem<'a>; + type KeyOwned = ::Key; + type Val<'a> = ::ReadItem<'a>; + type ValOwned = ::Val; type Time = ::Time; type Diff = ::Diff; @@ -274,8 +277,8 @@ mod val_batch { impl Batch for RhhValBatch where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, + for<'a> ::ReadItem<'a>: HashOrdered, { type Merger = RhhValMerger; @@ -287,8 +290,7 @@ mod val_batch { /// State for an in-progress merge. pub struct RhhValMerger where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { /// Key position to merge next in the first batch. key_cursor1: usize, @@ -310,8 +312,7 @@ mod val_batch { impl Merger> for RhhValMerger where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, RhhValBatch: Batch::Time>, { fn new(batch1: &RhhValBatch, batch2: &RhhValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { @@ -340,6 +341,7 @@ mod val_batch { key_count: 0, key_capacity: rhh_cap, divisor: RhhValStorage::::divisor_for_capacity(rhh_cap), + key_owned: Default::default(), }; storage.keys_offs.push(0.try_into().ok().unwrap()); @@ -401,10 +403,8 @@ mod val_batch { // Helper methods in support of merging batches. impl RhhValMerger where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { - /// Copy the next key in `source`. /// /// The method extracts the key in `source` at `cursor`, and merges it in to `self`. @@ -427,7 +427,8 @@ mod val_batch { // If we have pushed any values, copy the key as well. if self.result.vals.len() > init_vals { - self.result.insert_key(source.keys.index(cursor), Some(self.result.vals.len().try_into().ok().unwrap())); + source.keys.index(cursor).clone_onto(&mut self.result.key_owned); + self.result.insert_key(&self.result.key_owned, Some(self.result.vals.len().try_into().ok().unwrap())); } } /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. @@ -437,7 +438,7 @@ mod val_batch { fn merge_key(&mut self, source1: &RhhValStorage, source2: &RhhValStorage) { use ::std::cmp::Ordering; - match source1.keys.index(self.key_cursor1).cmp(source2.keys.index(self.key_cursor2)) { + match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) { Ordering::Less => { self.copy_key(source1, self.key_cursor1); self.key_cursor1 += 1; @@ -447,7 +448,8 @@ mod val_batch { let (lower1, upper1) = source1.values_for_key(self.key_cursor1); let (lower2, upper2) = source2.values_for_key(self.key_cursor2); if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) { - self.result.insert_key(source1.keys.index(self.key_cursor1), Some(off)); + source1.keys.index(self.key_cursor1).clone_onto(&mut self.result.key_owned); + self.result.insert_key(&self.result.key_owned, Some(off)); } // Increment cursors in either case; the keys are merged. self.key_cursor1 += 1; @@ -475,7 +477,7 @@ mod val_batch { // if they are non-empty post-consolidation, we write the value. // We could multi-way merge and it wouldn't be very complicated. use ::std::cmp::Ordering; - match source1.vals.index(lower1).cmp(source2.vals.index(lower2)) { + match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) { Ordering::Less => { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source1, lower1); @@ -538,8 +540,8 @@ mod val_batch { for i in lower .. upper { // NB: Here is where we would need to look back if `lower == upper`. let (time, diff) = &source.updates.index(i); - use lattice::Lattice; let mut new_time = time.clone(); + use lattice::Lattice; new_time.advance_by(self.description.since().borrow()); self.update_stash.push((new_time, diff.clone())); } @@ -552,7 +554,7 @@ mod val_batch { if !self.update_stash.is_empty() { // If there is a single element, equal to a just-prior recorded update, // we push nothing and report an unincremented offset to encode this case. - if self.update_stash.len() == 1 && self.update_stash.last() == self.result.updates.last() { + if self.update_stash.len() == 1 && self.result.updates.last().map(|l| l.equals(self.update_stash.last().unwrap())).unwrap_or(false) { // Just clear out update_stash, as we won't drain it here. self.update_stash.clear(); self.singletons += 1; @@ -580,8 +582,7 @@ mod val_batch { /// the cursor, rather than internal state. pub struct RhhValCursor where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { /// Absolute position of the current key. key_cursor: usize, @@ -593,18 +594,19 @@ mod val_batch { impl Cursor for RhhValCursor where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, + for<'a> ::ReadItem<'a>: HashOrdered, { - type Key = ::Key; - type Val<'a> = &'a ::Val; - type ValOwned = ::ValOwned; + type Key<'a> = ::ReadItem<'a>; + type KeyOwned = ::Key; + type Val<'a> = ::ReadItem<'a>; + type ValOwned = ::Val; type Time = ::Time; type Diff = ::Diff; type Storage = RhhValBatch; - fn key<'a>(&self, storage: &'a RhhValBatch) -> &'a Self::Key { + fn key<'a>(&self, storage: &'a RhhValBatch) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) } fn val<'a>(&self, storage: &'a RhhValBatch) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) } @@ -629,9 +631,9 @@ mod val_batch { self.key_cursor = storage.storage.keys.len(); } } - fn seek_key(&mut self, storage: &RhhValBatch, key: &Self::Key) { + fn seek_key<'a>(&mut self, storage: &RhhValBatch, key: Self::Key<'a>) { // self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key)); - let desired = storage.storage.desired_location(key); + let desired = storage.storage.desired_location(&key); // Advance the cursor, if `desired` is ahead of it. if self.key_cursor < desired { self.key_cursor = desired; @@ -656,7 +658,7 @@ mod val_batch { } } fn seek_val<'a>(&mut self, storage: &RhhValBatch, val: Self::Val<'a>) { - self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(val)); + self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(&val)); } fn rewind_keys(&mut self, storage: &RhhValBatch) { self.key_cursor = 0; @@ -674,8 +676,7 @@ mod val_batch { /// A builder for creating layers from unsorted update tuples. pub struct RhhValBuilder where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { result: RhhValStorage, singleton: Option<(::Time, ::Diff)>, @@ -688,8 +689,7 @@ mod val_batch { impl RhhValBuilder where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { /// Pushes a single update, which may set `self.singleton` rather than push. /// @@ -720,12 +720,10 @@ mod val_batch { impl Builder for RhhValBuilder where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, - ::KeyOwned: Borrow<::Key>, - RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff> + ::Key: Default + HashOrdered, + // RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, { - type Item = ((::KeyOwned, ::ValOwned), ::Time, ::Diff); + type Item = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; type Output = RhhValBatch; @@ -750,6 +748,7 @@ mod val_batch { key_count: 0, key_capacity: rhh_capacity, divisor, + key_owned: Default::default(), }, singleton: None, singletons: 0, @@ -760,9 +759,9 @@ mod val_batch { fn push(&mut self, ((key, val), time, diff): Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(key.borrow()) { + if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { // Perhaps this is a continuation of an already received value. - if self.result.vals.last() == Some(val.borrow()) { + if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { self.push_update(time, diff); } else { // New value; complete representation of prior value. @@ -787,9 +786,9 @@ mod val_batch { fn copy(&mut self, ((key, val), time, diff): &Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(key.borrow()) { + if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { // Perhaps this is a continuation of an already received value. - if self.result.vals.last() == Some(val.borrow()) { + if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { // TODO: here we could look for repetition, and not push the update in that case. // More logic (and state) would be required to correctly wrangle this. self.push_update(time.clone(), diff.clone()); @@ -799,7 +798,7 @@ mod val_batch { // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time.clone(), diff.clone()); - self.result.vals.copy(val.borrow()); + self.result.vals.copy_push(val.borrow()); } } else { // New key; complete representation of prior key. @@ -808,7 +807,7 @@ mod val_batch { if self.singleton.take().is_some() { self.singletons += 1; } self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap()); self.push_update(time.clone(), diff.clone()); - self.result.vals.copy(val.borrow()); + self.result.vals.copy_push(val.borrow()); // Insert the key, but with no specified offset. self.result.insert_key(key.borrow(), None); } diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index adab0d121..9988f9ada 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -112,13 +112,13 @@ where impl TraceReader for Spine where B: Batch+Clone+'static, - B::Key: Ord, // Clone is required by `batch::advance_*` (in-place could remove). - B::Val: Ord, // Clone is required by `batch::advance_*` (in-place could remove). B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, B::Diff: Semigroup, { - type Key = B::Key; - type Val = B::Val; + type Key<'a> = B::Key<'a>; + type KeyOwned = B::KeyOwned; + type Val<'a> = B::Val<'a>; + type ValOwned = B::ValOwned; type Time = B::Time; type Diff = B::Diff; @@ -260,8 +260,6 @@ where impl Trace for Spine where B: Batch+Clone+'static, - B::Key: Ord, - B::Val: Ord, B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, B::Diff: Semigroup, BA: Batcher