Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into trait_reorganization
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 22, 2023
2 parents f3cbb6b + 5ea6fe5 commit f4c3dd1
Showing 1 changed file with 61 additions and 27 deletions.
88 changes: 61 additions & 27 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! and should consume fewer resources (computation and memory) when it applies.
use std::rc::Rc;
use timely::container::columnation::TimelyStack;

use trace::implementations::spine_fueled::Spine;

Expand All @@ -18,21 +19,24 @@ use self::val_batch::{OrdValBatch};


/// A trace implementation using a spine of ordered lists.
pub type OrdValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R), O>>>>;
pub type OrdValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R), O>, Vec<((K,V),T,R)>>>>;
// /// A trace implementation for empty values using a spine of ordered lists.
// pub type OrdKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R), O>>>>;

/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R), O>>>>;
pub type ColValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R), O>, TimelyStack<((K,V),T,R)>>>>;
// /// A trace implementation backed by columnar storage.
// pub type ColKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R), O>>>>;

mod val_batch {

use std::convert::TryInto;
use std::marker::PhantomData;
use timely::container::columnation::{Columnation, TimelyStack};
use timely::progress::{Antichain, frontier::AntichainRef};

use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use trace::implementations::merge_batcher_col::ColumnatedMergeBatcher;
use trace::layers::BatchContainer;

use super::{Layout, Update};
Expand Down Expand Up @@ -82,8 +86,11 @@ mod val_batch {
}

/// An immutable collection of update tuples, from a contiguous interval of logical times.
///
/// The `L` parameter captures how the updates should be laid out, and `C` determines which
/// merge batcher to select.
#[derive(Abomonation)]
pub struct OrdValBatch<L: Layout> {
pub struct OrdValBatch<L: Layout, C> {
/// The updates themselves.
pub storage: OrdValStorage<L>,
/// Description of the update times this layer represents.
Expand All @@ -94,20 +101,22 @@ mod val_batch {
/// we may have many more updates than `storage.updates.len()`. It should equal that
/// length, plus the number of singleton optimizations employed.
pub updates: usize,
/// Phantom marker for Rust happiness.
pub phantom: PhantomData<C>,
}

impl<L: Layout> BatchReader for OrdValBatch<L> {
impl<L: Layout, C> BatchReader for OrdValBatch<L, C> {
type Key = <L::Target as Update>::Key;
type Val = <L::Target as Update>::Val;
type Time = <L::Target as Update>::Time;
type R = <L::Target as Update>::Diff;

type Cursor = OrdValCursor<L>;
type Cursor = OrdValCursor<L, C>;
fn cursor(&self) -> Self::Cursor {
OrdValCursor {
key_cursor: 0,
val_cursor: 0,
phantom: std::marker::PhantomData,
phantom: PhantomData,
}
}
fn len(&self) -> usize {
Expand All @@ -118,7 +127,7 @@ mod val_batch {
fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
}

impl<L: Layout> Batch for OrdValBatch<L> {
impl<L: Layout> Batch for OrdValBatch<L, Vec<L::Target>> {
type Batcher = MergeBatcher<Self>;
type Builder = OrdValBuilder<L>;
type Merger = OrdValMerger<L>;
Expand All @@ -128,6 +137,23 @@ mod val_batch {
}
}

impl<L: Layout> Batch for OrdValBatch<L, TimelyStack<L::Target>>
where
<L as Layout>::Target: Columnation,
Self::Key: Columnation + 'static,
Self::Val: Columnation + 'static,
Self::Time: Columnation + 'static,
Self::R: Columnation + 'static,
{
type Batcher = ColumnatedMergeBatcher<Self>;
type Builder = OrdValBuilder<L>;
type Merger = OrdValMerger<L>;

fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
OrdValMerger::new(self, other, compaction_frontier)
}
}

/// State for an in-progress merge.
pub struct OrdValMerger<L: Layout> {
/// Key position to merge next in the first batch.
Expand All @@ -148,8 +174,11 @@ mod val_batch {
singletons: usize,
}

impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L> {
fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
impl<L: Layout, C> Merger<OrdValBatch<L, C>> for OrdValMerger<L>
where
OrdValBatch<L, C>: Batch<Time=<L::Target as Update>::Time>
{
fn new(batch1: &OrdValBatch<L, C>, batch2: &OrdValBatch<L, C>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {

assert!(batch1.upper() == batch2.lower());
use lattice::Lattice;
Expand Down Expand Up @@ -181,14 +210,15 @@ mod val_batch {
singletons: 0,
}
}
fn done(self) -> OrdValBatch<L> {
fn done(self) -> OrdValBatch<L, C> {
OrdValBatch {
updates: self.result.updates.len() + self.singletons,
storage: self.result,
description: self.description,
phantom: PhantomData
}
}
fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
fn work(&mut self, source1: &OrdValBatch<L, C>, source2: &OrdValBatch<L, C>, fuel: &mut isize) {

// An (incomplete) indication of the amount of work we've done so far.
let starting_updates = self.result.updates.len();
Expand Down Expand Up @@ -387,33 +417,33 @@ mod val_batch {
}

/// A cursor for navigating a single layer.
pub struct OrdValCursor<L: Layout> {
pub struct OrdValCursor<L: Layout, C> {
/// Absolute position of the current key.
key_cursor: usize,
/// Absolute position of the current value.
val_cursor: usize,
/// Phantom marker for Rust happiness.
phantom: std::marker::PhantomData<L>,
phantom: PhantomData<(L, C)>,
}

impl<L: Layout> Cursor<OrdValBatch<L>> for OrdValCursor<L> {
impl<L: Layout, C> Cursor<OrdValBatch<L, C>> for OrdValCursor<L, C> {
type Key = <L::Target as Update>::Key;
type Val = <L::Target as Update>::Val;
type Time = <L::Target as Update>::Time;
type R = <L::Target as Update>::Diff;

fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> &'a Self::Key { &storage.storage.keys.index(self.key_cursor.try_into().ok().unwrap()) }
fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> &'a Self::Val { &storage.storage.vals.index(self.val_cursor.try_into().ok().unwrap()) }
fn map_times<L2: FnMut(&Self::Time, &Self::R)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
fn key<'a>(&self, storage: &'a OrdValBatch<L, C>) -> &'a Self::Key { storage.storage.keys.index(self.key_cursor) }
fn val<'a>(&self, storage: &'a OrdValBatch<L, C>) -> &'a Self::Val { storage.storage.vals.index(self.val_cursor) }
fn map_times<L2: FnMut(&Self::Time, &Self::R)>(&mut self, storage: &OrdValBatch<L, C>, mut logic: L2) {
let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
for index in lower .. upper {
let (time, diff) = &storage.storage.updates.index(index);
logic(time, diff);
}
}
fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
fn step_key(&mut self, storage: &OrdValBatch<L>){
fn key_valid(&self, storage: &OrdValBatch<L, C>) -> bool { self.key_cursor < storage.storage.keys.len() }
fn val_valid(&self, storage: &OrdValBatch<L, C>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
fn step_key(&mut self, storage: &OrdValBatch<L, C>){
self.key_cursor += 1;
if self.key_valid(storage) {
self.rewind_vals(storage);
Expand All @@ -422,28 +452,28 @@ mod val_batch {
self.key_cursor = storage.storage.keys.len();
}
}
fn seek_key(&mut self, storage: &OrdValBatch<L>, key: &Self::Key) {
fn seek_key(&mut self, storage: &OrdValBatch<L, C>, key: &Self::Key) {
self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key));
if self.key_valid(storage) {
self.rewind_vals(storage);
}
}
fn step_val(&mut self, storage: &OrdValBatch<L>) {
fn step_val(&mut self, storage: &OrdValBatch<L, C>) {
self.val_cursor += 1;
if !self.val_valid(storage) {
self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
}
}
fn seek_val(&mut self, storage: &OrdValBatch<L>, val: &Self::Val) {
fn seek_val(&mut self, storage: &OrdValBatch<L, C>, val: &Self::Val) {
self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(val));
}
fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
fn rewind_keys(&mut self, storage: &OrdValBatch<L, C>) {
self.key_cursor = 0;
if self.key_valid(storage) {
self.rewind_vals(storage)
}
}
fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
fn rewind_vals(&mut self, storage: &OrdValBatch<L, C>) {
self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
}
}
Expand Down Expand Up @@ -487,7 +517,10 @@ mod val_batch {
}
}

impl<L: Layout> Builder<OrdValBatch<L>> for OrdValBuilder<L> {
impl<L: Layout, C> Builder<OrdValBatch<L, C>> for OrdValBuilder<L>
where
OrdValBatch<L, C>: Batch<Key=<L::Target as Update>::Key, Val=<L::Target as Update>::Val, Time=<L::Target as Update>::Time, R=<L::Target as Update>::Diff>
{

type Item = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
type Time = <L::Target as Update>::Time;
Expand Down Expand Up @@ -565,7 +598,7 @@ mod val_batch {
}

#[inline(never)]
fn done(mut self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> OrdValBatch<L> {
fn done(mut self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> OrdValBatch<L, C> {
// Record the final offsets
self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap());
// Remove any pending singleton, and if it was set increment our count.
Expand All @@ -575,6 +608,7 @@ mod val_batch {
updates: self.result.updates.len() + self.singletons,
storage: self.result,
description: Description::new(lower, upper, since),
phantom: PhantomData,
}
}
}
Expand Down

0 comments on commit f4c3dd1

Please sign in to comment.