Skip to content

Commit

Permalink
feat: support since windows in collect aggregations (#583)
Browse files Browse the repository at this point in the history
Also fixes bug where since windows were...never used. The since window
impl is likely more efficient than the more complex two-stacks, so it's
likely better to swap.
  • Loading branch information
jordanrfrazier authored Aug 2, 2023
1 parent 71e8345 commit f1f079a
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 25 deletions.
16 changes: 8 additions & 8 deletions crates/sparrow-compiler/src/ast_to_dfg/window_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@ pub(crate) fn flatten_window_args(
window.args().len()
);

// Since aggregations have a single active window
let duration_id = dfg.add_literal(LiteralValue::Number(String::from("1")).to_scalar()?)?;
let duration = Located::new(
// Since aggregations use a null duration
let null_arg = dfg.add_literal(LiteralValue::Null.to_scalar()?)?;
let null_arg = Located::new(
add_literal(
dfg,
duration_id,
FenlType::Concrete(DataType::Int64),
name.location().clone(),
null_arg,
FenlType::Concrete(DataType::Null),
window.location().clone(),
)?,
name.location().clone(),
window.location().clone(),
);

let condition = crate::ast_to_dfg(data_context, dfg, diagnostics, &window.args()[0])?;
Ok((window.with_value(condition), duration))
Ok((window.with_value(condition), null_arg))
} else if name.inner() == "sliding" {
debug_assert!(
window.args().len() == 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@ operations:
- arguments: []
result_type:
kind:
Primitive: 6
Primitive: 1
output: false
operator:
Literal:
literal:
Int64: 1
literal: ~
- arguments: []
result_type:
kind:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,11 @@ operations:
- arguments: []
result_type:
kind:
Primitive: 6
Primitive: 1
output: false
operator:
Literal:
literal:
Int64: 1
literal: ~
- arguments: []
result_type:
kind:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,11 @@ operations:
- arguments: []
result_type:
kind:
Primitive: 6
Primitive: 1
output: false
operator:
Literal:
literal:
Int64: 1
literal: ~
- arguments: []
result_type:
kind:
Expand Down Expand Up @@ -289,12 +288,11 @@ operations:
- arguments: []
result_type:
kind:
Primitive: 6
Primitive: 1
output: false
operator:
Literal:
literal:
Int64: 1
literal: ~
- arguments: []
result_type:
kind:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ where
pub fn state(&self, index: usize) -> &VecDeque<Option<T>> {
&self.state[index]
}

pub fn reset(&mut self, index: usize) {
self.state[index].clear();
}
}

impl<T> StateToken for CollectToken<T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Evaluator for CollectBooleanEvaluator {
fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result<ArrayRef> {
match (self.tick.is_literal_null(), self.duration.is_literal_null()) {
(true, true) => self.evaluate_non_windowed(info),
(true, false) => unimplemented!("since window aggregation unsupported"),
(false, true) => self.evaluate_since_windowed(info),
(false, false) => panic!("sliding window aggregation should use other evaluator"),
(_, _) => anyhow::bail!("saw invalid combination of tick and duration"),
}
Expand Down Expand Up @@ -105,4 +105,45 @@ impl CollectBooleanEvaluator {

Ok(Arc::new(list_builder.finish()))
}

/// Since windows follow the pattern "update -> emit -> reset".
///
/// i.e. if an input appears in the same row as a tick, then that value will
/// be included in the output before the tick causes the state to be cleared.
/// However, note that ticks are generated with a maximum subsort value, so it is
/// unlikely an input naturally appears in the same row as a tick. It is more likely
/// that an input may appear at the same time, but an earlier subsort value.
fn evaluate_since_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result<ArrayRef> {
let input = info.value(&self.input)?.array_ref()?;
let key_capacity = info.grouping().num_groups();
let entity_indices = info.grouping().group_indices();
assert_eq!(entity_indices.len(), input.len());

self.ensure_entity_capacity(key_capacity);

let input = input.as_boolean();
let ticks = info.value(&self.tick)?.array_ref()?;
let ticks = ticks.as_boolean();

let builder = BooleanBuilder::new();
let mut list_builder = ListBuilder::new(builder);

izip!(entity_indices.values(), ticks, input).for_each(|(entity_index, tick, input)| {
let entity_index = *entity_index as usize;

self.token.add_value(self.max, entity_index, input);
let cur_list = self.token.state(entity_index);

list_builder.append_value(cur_list.iter().copied());

match tick {
Some(t) if t => {
self.token.reset(entity_index);
}
_ => (), // Tick is false or null, so do nothing.
}
});

Ok(Arc::new(list_builder.finish()))
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use arrow::array::{ArrayRef, ListBuilder, PrimitiveBuilder};
use arrow::array::{ArrayRef, AsArray, ListBuilder, PrimitiveBuilder};
use arrow::datatypes::{ArrowPrimitiveType, DataType};

use itertools::izip;
Expand Down Expand Up @@ -81,7 +81,7 @@ where
fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result<ArrayRef> {
match (self.tick.is_literal_null(), self.duration.is_literal_null()) {
(true, true) => self.evaluate_non_windowed(info),
(true, false) => unimplemented!("since window aggregation unsupported"),
(false, true) => self.evaluate_since_windowed(info),
(false, false) => panic!("sliding window aggregation should use other evaluator"),
(_, _) => anyhow::bail!("saw invalid combination of tick and duration"),
}
Expand Down Expand Up @@ -128,4 +128,45 @@ where

Ok(Arc::new(list_builder.finish()))
}

/// Since windows follow the pattern "update -> emit -> reset".
///
/// i.e. if an input appears in the same row as a tick, then that value will
/// be included in the output before the tick causes the state to be cleared.
/// However, note that ticks are generated with a maximum subsort value, so it is
/// unlikely an input naturally appears in the same row as a tick. It is more likely
/// that an input may appear at the same time, but an earlier subsort value.
fn evaluate_since_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result<ArrayRef> {
let input = info.value(&self.input)?.array_ref()?;
let key_capacity = info.grouping().num_groups();
let entity_indices = info.grouping().group_indices();
assert_eq!(entity_indices.len(), input.len());

self.ensure_entity_capacity(key_capacity);

let input = downcast_primitive_array::<T>(input.as_ref())?;
let ticks = info.value(&self.tick)?.array_ref()?;
let ticks = ticks.as_boolean();

let builder = PrimitiveBuilder::<T>::new();
let mut list_builder = ListBuilder::new(builder);

izip!(entity_indices.values(), ticks, input).for_each(|(entity_index, tick, input)| {
let entity_index = *entity_index as usize;

self.token.add_value(self.max, entity_index, input);
let cur_list = self.token.state(entity_index);

list_builder.append_value(cur_list.iter().copied());

match tick {
Some(t) if t => {
self.token.reset(entity_index);
}
_ => (), // Tick is false or null, so do nothing.
}
});

Ok(Arc::new(list_builder.finish()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Evaluator for CollectStringEvaluator {
fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result<ArrayRef> {
match (self.tick.is_literal_null(), self.duration.is_literal_null()) {
(true, true) => self.evaluate_non_windowed(info),
(true, false) => unimplemented!("since window aggregation unsupported"),
(false, true) => self.evaluate_since_windowed(info),
(false, false) => panic!("sliding window aggregation should use other evaluator"),
(_, _) => anyhow::bail!("saw invalid combination of tick and duration"),
}
Expand Down Expand Up @@ -106,4 +106,46 @@ impl CollectStringEvaluator {

Ok(Arc::new(list_builder.finish()))
}

/// Since windows follow the pattern "update -> emit -> reset".
///
/// i.e. if an input appears in the same row as a tick, then that value will
/// be included in the output before the tick causes the state to be cleared.
/// However, note that ticks are generated with a maximum subsort value, so it is
/// unlikely an input naturally appears in the same row as a tick. It is more likely
/// that an input may appear at the same time, but an earlier subsort value.
fn evaluate_since_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result<ArrayRef> {
let input = info.value(&self.input)?.array_ref()?;
let key_capacity = info.grouping().num_groups();
let entity_indices = info.grouping().group_indices();
assert_eq!(entity_indices.len(), input.len());

self.ensure_entity_capacity(key_capacity);

let input = input.as_string::<i32>();
let ticks = info.value(&self.tick)?.array_ref()?;
let ticks = ticks.as_boolean();

let builder = StringBuilder::new();
let mut list_builder = ListBuilder::new(builder);

izip!(entity_indices.values(), ticks, input).for_each(|(entity_index, tick, input)| {
let entity_index = *entity_index as usize;

self.token
.add_value(self.max, entity_index, input.map(|s| s.to_owned()));
let cur_list = self.token.state(entity_index);

list_builder.append_value(cur_list.clone());

match tick {
Some(t) if t => {
self.token.reset(entity_index);
}
_ => (), // Tick is false or null, so do nothing.
}
});

Ok(Arc::new(list_builder.finish()))
}
}
Loading

0 comments on commit f1f079a

Please sign in to comment.