Skip to content

Commit

Permalink
Non-working commit
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 29, 2023
1 parent 74fffc9 commit b83b690
Show file tree
Hide file tree
Showing 26 changed files with 512 additions and 458 deletions.
2 changes: 1 addition & 1 deletion src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static,
{
forward
.stream
Expand Down
5 changes: 3 additions & 2 deletions src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::hash::Hash;
use timely::dataflow::*;

use ::{Collection, ExchangeData};
use ::operators::*;
use ::lattice::Lattice;
use ::difference::{Abelian, Multiply};
use ::operators::arrange::arrangement::ArrangeByKey;
Expand Down Expand Up @@ -64,7 +63,7 @@ where
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, Diff=R>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=R>+Clone+'static,
F: Fn(&L)->u64+Clone+'static,
{
// Morally the code performs the following iterative computation. However, in the interest of a simplified
Expand All @@ -90,6 +89,8 @@ where

use timely::order::Product;

use operators::join::JoinCore;

let edges = edges.enter(scope);
let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));

Expand Down
6 changes: 4 additions & 2 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ where
Tr: TraceReader,
Tr::Time: Lattice+Ord+Clone+'static,
{
type Key = Tr::Key;
type Val = Tr::Val;
type Key<'a> = Tr::Key<'a>;
type KeyOwned = Tr::KeyOwned;
type Val<'a> = Tr::Val<'a>;
type ValOwned = Tr::ValOwned;
type Time = Tr::Time;
type Diff = Tr::Diff;

Expand Down
38 changes: 16 additions & 22 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
use trace::wrappers::filter::{TraceFilter, BatchFilter};

use trace::cursor::MyTrait;

use super::TraceAgent;

/// An arranged collection of `(K,V)` values.
Expand Down Expand Up @@ -89,8 +91,6 @@ where
pub fn enter<'a, TInner>(&self, child: &Child<'a, G, TInner>)
-> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
where
Tr::Key: 'static,
Tr::Val: 'static,
Tr::Diff: 'static,
G::Timestamp: Clone+'static,
TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
Expand All @@ -108,8 +108,6 @@ where
pub fn enter_region<'a>(&self, child: &Child<'a, G, G::Timestamp>)
-> Arranged<Child<'a, G, G::Timestamp>, Tr>
where
Tr::Key: 'static,
Tr::Val: 'static,
Tr::Diff: 'static,
G::Timestamp: Clone+'static,
{
Expand All @@ -127,12 +125,10 @@ where
pub fn enter_at<'a, TInner, F, P>(&self, child: &Child<'a, G, TInner>, logic: F, prior: P)
-> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
where
Tr::Key: 'static,
Tr::Val: 'static,
Tr::Diff: 'static,
G::Timestamp: Clone+'static,
TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
F: FnMut(&Tr::Key, &Tr::Val, &G::Timestamp)->TInner+Clone+'static,
F: for <'b> FnMut(Tr::Key<'b>, Tr::Val<'b>, &G::Timestamp)->TInner+Clone+'static,
P: FnMut(&TInner)->Tr::Time+Clone+'static,
{
let logic1 = logic.clone();
Expand Down Expand Up @@ -177,11 +173,9 @@ where
pub fn filter<F>(&self, logic: F)
-> Arranged<G, TraceFilter<Tr, F>>
where
Tr::Key: 'static,
Tr::Val: 'static,
Tr::Diff: 'static,
G::Timestamp: Clone+'static,
F: FnMut(&Tr::Key, &Tr::Val)->bool+Clone+'static,
F: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>)->bool+Clone+'static,
{
let logic1 = logic.clone();
let logic2 = logic.clone();
Expand All @@ -198,7 +192,7 @@ where
pub fn as_collection<D: Data, L>(&self, mut logic: L) -> Collection<G, D, Tr::Diff>
where
Tr::Diff: Semigroup,
L: FnMut(&Tr::Key, &Tr::Val) -> D+'static,
L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>) -> D+'static,
{
self.flat_map_ref(move |key, val| Some(logic(key,val)))
}
Expand All @@ -212,7 +206,7 @@ where
Tr::Diff: Semigroup,
I: IntoIterator,
I::Item: Data,
L: FnMut(&Tr::Key, &Tr::Val) -> I+'static,
L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>) -> I+'static,
{
Self::flat_map_batches(&self.stream, logic)
}
Expand All @@ -229,7 +223,7 @@ where
Tr::Diff: Semigroup,
I: IntoIterator,
I::Item: Data,
L: FnMut(&Tr::Key, &Tr::Val) -> I+'static,
L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>) -> I+'static,
{
stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
input.for_each(|time, data| {
Expand Down Expand Up @@ -258,16 +252,16 @@ where
///
/// This method consumes a stream of (key, time) queries and reports the corresponding stream of
/// (key, value, time, diff) accumulations in the `self` trace.
pub fn lookup(&self, queries: &Stream<G, (Tr::Key, G::Timestamp)>) -> Stream<G, (Tr::Key, Tr::Val, G::Timestamp, Tr::Diff)>
pub fn lookup(&self, queries: &Stream<G, (Tr::KeyOwned, G::Timestamp)>) -> Stream<G, (Tr::KeyOwned, Tr::ValOwned, G::Timestamp, Tr::Diff)>
where
G::Timestamp: Data+Lattice+Ord+TotalOrder,
Tr::Key: ExchangeData+Hashable,
Tr::Val: ExchangeData,
Tr::KeyOwned: ExchangeData+Hashable,
Tr::ValOwned: ExchangeData,
Tr::Diff: ExchangeData+Semigroup,
Tr: 'static,
{
// while the arrangement is already correctly distributed, the query stream may not be.
let exchange = Exchange::new(move |update: &(Tr::Key,G::Timestamp)| update.0.hashed().into());
let exchange = Exchange::new(move |update: &(Tr::KeyOwned,G::Timestamp)| update.0.hashed().into());
queries.binary_frontier(&self.stream, exchange, Pipeline, "TraceQuery", move |_capability, _info| {

let mut trace = Some(self.trace.clone());
Expand All @@ -280,8 +274,8 @@ where
let mut active = Vec::new();
let mut retain = Vec::new();

let mut working: Vec<(G::Timestamp, Tr::Val, Tr::Diff)> = Vec::new();
let mut working2: Vec<(Tr::Val, Tr::Diff)> = Vec::new();
let mut working: Vec<(G::Timestamp, Tr::ValOwned, Tr::Diff)> = Vec::new();
let mut working2: Vec<(Tr::ValOwned, Tr::Diff)> = Vec::new();

move |input1, input2, output| {

Expand Down Expand Up @@ -346,13 +340,13 @@ where
same_key += 1;
}

cursor.seek_key(&storage, key);
if cursor.get_key(&storage) == Some(key) {
cursor.seek_key_owned(&storage, key);
if cursor.get_key(&storage).map(|k| k.equals(key)).unwrap_or(false) {

let mut active = &active[active_finger .. same_key];

while let Some(val) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t,d| working.push((t.clone(), val.clone(), d.clone())));
cursor.map_times(&storage, |t,d| working.push((t.clone(), val.into_owned(), d.clone())));
cursor.step_val(&storage);
}

Expand Down
22 changes: 12 additions & 10 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,17 @@ use super::TraceAgent;
/// understand what a "sequence" of upserts would mean for partially ordered
/// timestamps.
pub fn arrange_from_upsert<G, Tr>(
stream: &Stream<G, (Tr::Key, Option<Tr::Val>, G::Timestamp)>,
stream: &Stream<G, (Tr::KeyOwned, Option<Tr::ValOwned>, G::Timestamp)>,
name: &str,
) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope,
G::Timestamp: Lattice+Ord+TotalOrder+ExchangeData,
Tr::Key: ExchangeData+Hashable+std::hash::Hash,
Tr::Val: ExchangeData,
Tr::KeyOwned: ExchangeData+Hashable+std::hash::Hash,
Tr::ValOwned: ExchangeData,
Tr: Trace+TraceReader<Time=G::Timestamp,Diff=isize>+'static,
Tr::Batch: Batch,
Tr::Builder: Builder<Item = ((Tr::Key, Tr::Val), Tr::Time, Tr::Diff)>,
Tr::Builder: Builder<Item = ((Tr::KeyOwned, Tr::ValOwned), Tr::Time, Tr::Diff)>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand All @@ -155,7 +155,7 @@ where

let reader = &mut reader;

let exchange = Exchange::new(move |update: &(Tr::Key,Option<Tr::Val>,G::Timestamp)| (update.0).hashed().into());
let exchange = Exchange::new(move |update: &(Tr::KeyOwned,Option<Tr::ValOwned>,G::Timestamp)| (update.0).hashed().into());

stream.unary_frontier(exchange, name, move |_capability, info| {

Expand Down Expand Up @@ -185,7 +185,7 @@ where
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());

// For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap).
let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, Tr::Key, Option<Tr::Val>)>>::new();
let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, Tr::KeyOwned, Option<Tr::ValOwned>)>>::new();
let mut updates = Vec::new();

move |input, output| {
Expand Down Expand Up @@ -252,20 +252,22 @@ where
let mut builder = Tr::Builder::new();
for (key, mut list) in to_process.drain(..) {

use trace::cursor::MyTrait;

// The prior value associated with the key.
let mut prev_value: Option<Tr::Val> = None;
let mut prev_value: Option<Tr::ValOwned> = None;

// Attempt to find the key in the trace.
trace_cursor.seek_key(&trace_storage, &key);
if trace_cursor.get_key(&trace_storage) == Some(&key) {
trace_cursor.seek_key_owned(&trace_storage, &key);
if trace_cursor.get_key(&trace_storage).map(|k| k.equals(&key)).unwrap_or(false) {
// Determine the prior value associated with the key.
while let Some(val) = trace_cursor.get_val(&trace_storage) {
let mut count = 0;
trace_cursor.map_times(&trace_storage, |_time, diff| count += *diff);
assert!(count == 0 || count == 1);
if count == 1 {
assert!(prev_value.is_none());
prev_value = Some(val.clone());
prev_value = Some(val.into_owned());
}
trace_cursor.step_val(&trace_storage);
}
Expand Down
2 changes: 1 addition & 1 deletion src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where
/// As `consolidate` but with the ability to name the operator and specify the trace type.
pub fn consolidate_named<Tr>(&self, name: &str) -> Self
where
Tr: crate::trace::Trace+crate::trace::TraceReader<Key=D,Val=(),Time=G::Timestamp,Diff=R>+'static,
Tr: for<'a> crate::trace::Trace<Key<'a>=&'a D,Val<'a>=&'a (),Time=G::Timestamp,Diff=R>+'static,
Tr::Batch: crate::trace::Batch,
Tr::Batcher: Batcher<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
Expand Down
8 changes: 4 additions & 4 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ where G::Timestamp: TotalOrder+Lattice+Ord {
}
}

impl<G: Scope, T1> CountTotal<G, T1::Key, T1::Diff> for Arranged<G, T1>
impl<G: Scope, K, T1> CountTotal<G, K, T1::Diff> for Arranged<G, T1>
where
G::Timestamp: TotalOrder+Lattice+Ord,
T1: TraceReader<Val=(), Time=G::Timestamp>+Clone+'static,
T1::Key: ExchangeData,
T1: for<'a> TraceReader<Key<'a>=&'a K, Val<'a>=&'a (), Time=G::Timestamp>+Clone+'static,
K: ExchangeData,
T1::Diff: ExchangeData+Semigroup,
{
fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (T1::Key, T1::Diff), R2> {
fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (K, T1::Diff), R2> {

let mut trace = self.trace.clone();
let mut buffer = Vec::new();
Expand Down
Loading

0 comments on commit b83b690

Please sign in to comment.