From f1f079afd17c8767ddfefe5d15f6f9086d892b5a Mon Sep 17 00:00:00 2001 From: jordanrfrazier <122494242+jordanrfrazier@users.noreply.github.com> Date: Wed, 2 Aug 2023 10:54:54 -0700 Subject: [PATCH] feat: support since windows in collect aggregations (#583) Also fixes bug where since windows were...never used. The since window impl is likely more efficient than the more complex two-stacks, so it's likely better to swap. --- .../src/ast_to_dfg/window_args.rs | 16 ++-- ...iler_golden_tests__count_since_window.snap | 5 +- .../compiler_golden_tests__since_daily.snap | 5 +- ...en_tests__since_daily_multiple_passes.snap | 10 +-- .../aggregation/token/collect_token.rs | 4 + .../src/evaluators/list/collect_boolean.rs | 43 ++++++++- .../src/evaluators/list/collect_primitive.rs | 45 +++++++++- .../src/evaluators/list/collect_string.rs | 44 +++++++++- .../sparrow-main/tests/e2e/collect_tests.rs | 87 +++++++++++++++++++ .../src/execute/operation/tick.rs | 4 +- 10 files changed, 238 insertions(+), 25 deletions(-) diff --git a/crates/sparrow-compiler/src/ast_to_dfg/window_args.rs b/crates/sparrow-compiler/src/ast_to_dfg/window_args.rs index ddb5ed951..337b49c71 100644 --- a/crates/sparrow-compiler/src/ast_to_dfg/window_args.rs +++ b/crates/sparrow-compiler/src/ast_to_dfg/window_args.rs @@ -25,20 +25,20 @@ pub(crate) fn flatten_window_args( window.args().len() ); - // Since aggregations have a single active window - let duration_id = dfg.add_literal(LiteralValue::Number(String::from("1")).to_scalar()?)?; - let duration = Located::new( + // Since aggregations use a null duration + let null_arg = dfg.add_literal(LiteralValue::Null.to_scalar()?)?; + let null_arg = Located::new( add_literal( dfg, - duration_id, - FenlType::Concrete(DataType::Int64), - name.location().clone(), + null_arg, + FenlType::Concrete(DataType::Null), + window.location().clone(), )?, - name.location().clone(), + window.location().clone(), ); let condition = crate::ast_to_dfg(data_context, dfg, diagnostics, &window.args()[0])?; - Ok((window.with_value(condition), duration)) + Ok((window.with_value(condition), null_arg)) } else if name.inner() == "sliding" { debug_assert!( window.args().len() == 2, diff --git a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__count_since_window.snap b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__count_since_window.snap index e539af234..79515b038 100644 --- a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__count_since_window.snap +++ b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__count_since_window.snap @@ -48,12 +48,11 @@ operations: - arguments: [] result_type: kind: - Primitive: 6 + Primitive: 1 output: false operator: Literal: - literal: - Int64: 1 + literal: ~ - arguments: [] result_type: kind: diff --git a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__since_daily.snap b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__since_daily.snap index 9c96c1950..69dd3dd18 100644 --- a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__since_daily.snap +++ b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__since_daily.snap @@ -122,12 +122,11 @@ operations: - arguments: [] result_type: kind: - Primitive: 6 + Primitive: 1 output: false operator: Literal: - literal: - Int64: 1 + literal: ~ - arguments: [] result_type: kind: diff --git a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__since_daily_multiple_passes.snap b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__since_daily_multiple_passes.snap index 04051408a..09e53dc22 100644 --- a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__since_daily_multiple_passes.snap +++ b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__since_daily_multiple_passes.snap @@ -153,12 +153,11 @@ operations: - arguments: [] result_type: kind: - Primitive: 6 + Primitive: 1 output: false operator: Literal: - literal: - Int64: 1 + literal: ~ - arguments: [] result_type: kind: @@ -289,12 +288,11 @@ operations: - arguments: [] result_type: kind: - Primitive: 6 + Primitive: 1 output: false operator: Literal: - literal: - Int64: 1 + literal: ~ - arguments: [] result_type: kind: diff --git a/crates/sparrow-instructions/src/evaluators/aggregation/token/collect_token.rs b/crates/sparrow-instructions/src/evaluators/aggregation/token/collect_token.rs index ad0dcb73d..6de14d642 100644 --- a/crates/sparrow-instructions/src/evaluators/aggregation/token/collect_token.rs +++ b/crates/sparrow-instructions/src/evaluators/aggregation/token/collect_token.rs @@ -37,6 +37,10 @@ where pub fn state(&self, index: usize) -> &VecDeque> { &self.state[index] } + + pub fn reset(&mut self, index: usize) { + self.state[index].clear(); + } } impl StateToken for CollectToken diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs index 7e0945dd5..84c46d5d3 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_boolean.rs @@ -62,7 +62,7 @@ impl Evaluator for CollectBooleanEvaluator { fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { match (self.tick.is_literal_null(), self.duration.is_literal_null()) { (true, true) => self.evaluate_non_windowed(info), - (true, false) => unimplemented!("since window aggregation unsupported"), + (false, true) => self.evaluate_since_windowed(info), (false, false) => panic!("sliding window aggregation should use other evaluator"), (_, _) => anyhow::bail!("saw invalid combination of tick and duration"), } @@ -105,4 +105,45 @@ impl CollectBooleanEvaluator { Ok(Arc::new(list_builder.finish())) } + + /// Since windows follow the pattern "update -> emit -> reset". + /// + /// i.e. if an input appears in the same row as a tick, then that value will + /// be included in the output before the tick causes the state to be cleared. + /// However, note that ticks are generated with a maximum subsort value, so it is + /// unlikely an input naturally appears in the same row as a tick. It is more likely + /// that an input may appear at the same time, but an earlier subsort value. + fn evaluate_since_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { + let input = info.value(&self.input)?.array_ref()?; + let key_capacity = info.grouping().num_groups(); + let entity_indices = info.grouping().group_indices(); + assert_eq!(entity_indices.len(), input.len()); + + self.ensure_entity_capacity(key_capacity); + + let input = input.as_boolean(); + let ticks = info.value(&self.tick)?.array_ref()?; + let ticks = ticks.as_boolean(); + + let builder = BooleanBuilder::new(); + let mut list_builder = ListBuilder::new(builder); + + izip!(entity_indices.values(), ticks, input).for_each(|(entity_index, tick, input)| { + let entity_index = *entity_index as usize; + + self.token.add_value(self.max, entity_index, input); + let cur_list = self.token.state(entity_index); + + list_builder.append_value(cur_list.iter().copied()); + + match tick { + Some(t) if t => { + self.token.reset(entity_index); + } + _ => (), // Tick is false or null, so do nothing. + } + }); + + Ok(Arc::new(list_builder.finish())) + } } diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs b/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs index 3dcf31b57..176cb6a9f 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_primitive.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use arrow::array::{ArrayRef, ListBuilder, PrimitiveBuilder}; +use arrow::array::{ArrayRef, AsArray, ListBuilder, PrimitiveBuilder}; use arrow::datatypes::{ArrowPrimitiveType, DataType}; use itertools::izip; @@ -81,7 +81,7 @@ where fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { match (self.tick.is_literal_null(), self.duration.is_literal_null()) { (true, true) => self.evaluate_non_windowed(info), - (true, false) => unimplemented!("since window aggregation unsupported"), + (false, true) => self.evaluate_since_windowed(info), (false, false) => panic!("sliding window aggregation should use other evaluator"), (_, _) => anyhow::bail!("saw invalid combination of tick and duration"), } @@ -128,4 +128,45 @@ where Ok(Arc::new(list_builder.finish())) } + + /// Since windows follow the pattern "update -> emit -> reset". + /// + /// i.e. if an input appears in the same row as a tick, then that value will + /// be included in the output before the tick causes the state to be cleared. + /// However, note that ticks are generated with a maximum subsort value, so it is + /// unlikely an input naturally appears in the same row as a tick. It is more likely + /// that an input may appear at the same time, but an earlier subsort value. + fn evaluate_since_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { + let input = info.value(&self.input)?.array_ref()?; + let key_capacity = info.grouping().num_groups(); + let entity_indices = info.grouping().group_indices(); + assert_eq!(entity_indices.len(), input.len()); + + self.ensure_entity_capacity(key_capacity); + + let input = downcast_primitive_array::(input.as_ref())?; + let ticks = info.value(&self.tick)?.array_ref()?; + let ticks = ticks.as_boolean(); + + let builder = PrimitiveBuilder::::new(); + let mut list_builder = ListBuilder::new(builder); + + izip!(entity_indices.values(), ticks, input).for_each(|(entity_index, tick, input)| { + let entity_index = *entity_index as usize; + + self.token.add_value(self.max, entity_index, input); + let cur_list = self.token.state(entity_index); + + list_builder.append_value(cur_list.iter().copied()); + + match tick { + Some(t) if t => { + self.token.reset(entity_index); + } + _ => (), // Tick is false or null, so do nothing. + } + }); + + Ok(Arc::new(list_builder.finish())) + } } diff --git a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs index b60e0aa34..3886929b0 100644 --- a/crates/sparrow-instructions/src/evaluators/list/collect_string.rs +++ b/crates/sparrow-instructions/src/evaluators/list/collect_string.rs @@ -62,7 +62,7 @@ impl Evaluator for CollectStringEvaluator { fn evaluate(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { match (self.tick.is_literal_null(), self.duration.is_literal_null()) { (true, true) => self.evaluate_non_windowed(info), - (true, false) => unimplemented!("since window aggregation unsupported"), + (false, true) => self.evaluate_since_windowed(info), (false, false) => panic!("sliding window aggregation should use other evaluator"), (_, _) => anyhow::bail!("saw invalid combination of tick and duration"), } @@ -106,4 +106,46 @@ impl CollectStringEvaluator { Ok(Arc::new(list_builder.finish())) } + + /// Since windows follow the pattern "update -> emit -> reset". + /// + /// i.e. if an input appears in the same row as a tick, then that value will + /// be included in the output before the tick causes the state to be cleared. + /// However, note that ticks are generated with a maximum subsort value, so it is + /// unlikely an input naturally appears in the same row as a tick. It is more likely + /// that an input may appear at the same time, but an earlier subsort value. + fn evaluate_since_windowed(&mut self, info: &dyn RuntimeInfo) -> anyhow::Result { + let input = info.value(&self.input)?.array_ref()?; + let key_capacity = info.grouping().num_groups(); + let entity_indices = info.grouping().group_indices(); + assert_eq!(entity_indices.len(), input.len()); + + self.ensure_entity_capacity(key_capacity); + + let input = input.as_string::(); + let ticks = info.value(&self.tick)?.array_ref()?; + let ticks = ticks.as_boolean(); + + let builder = StringBuilder::new(); + let mut list_builder = ListBuilder::new(builder); + + izip!(entity_indices.values(), ticks, input).for_each(|(entity_index, tick, input)| { + let entity_index = *entity_index as usize; + + self.token + .add_value(self.max, entity_index, input.map(|s| s.to_owned())); + let cur_list = self.token.state(entity_index); + + list_builder.append_value(cur_list.clone()); + + match tick { + Some(t) if t => { + self.token.reset(entity_index); + } + _ => (), // Tick is false or null, so do nothing. + } + }); + + Ok(Arc::new(list_builder.finish())) + } } diff --git a/crates/sparrow-main/tests/e2e/collect_tests.rs b/crates/sparrow-main/tests/e2e/collect_tests.rs index f1ffafa08..989a4fc76 100644 --- a/crates/sparrow-main/tests/e2e/collect_tests.rs +++ b/crates/sparrow-main/tests/e2e/collect_tests.rs @@ -22,6 +22,7 @@ pub(crate) async fn collect_data_fixture() -> DataFixture { 1996-12-19T16:39:57-08:00,0,A,hEllo,0,true,0 1996-12-19T16:40:57-08:00,0,A,hi,2,false,1 1996-12-19T16:41:57-08:00,0,A,hey,9,,2 + 1996-12-19T16:42:00-08:00,0,A,heylo,-7,false,2 1996-12-19T16:42:57-08:00,0,A,ay,-1,true,1 1996-12-19T16:43:57-08:00,0,A,hIlo,10,true, 1996-12-20T16:40:57-08:00,0,B,h,5,false,0 @@ -46,6 +47,7 @@ async fn test_collect_with_null_max() { 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo + 1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,5,false,h @@ -67,6 +69,7 @@ async fn test_collect_to_list_i64() { 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,0 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,0 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,0 + 1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A,0 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,0 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A,0 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,5 @@ -88,6 +91,7 @@ async fn test_collect_to_list_i64_dynamic() { 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,0 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,2 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,9 + 1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A,9 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,2 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A, 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,5 @@ -109,6 +113,7 @@ async fn test_collect_to_small_list_i64() { 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,0 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,2 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A, 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,-1 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A, 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,5 @@ -130,6 +135,7 @@ async fn test_collect_to_list_string() { 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,hEllo 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,hEllo 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,hEllo + 1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A,hEllo 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,hEllo 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A,hEllo 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,h @@ -151,6 +157,7 @@ async fn test_collect_to_list_string_dynamic() { 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,hEllo 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,hi 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,hey + 1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A,hey 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,hi 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A, 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,h @@ -172,6 +179,7 @@ async fn test_collect_to_small_list_string() { 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,hEllo 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,hi 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A, 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,ay 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A, 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,h @@ -193,6 +201,7 @@ async fn test_collect_to_list_boolean() { 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,true 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,true 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,true + 1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A,true 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,true 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A,true 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,false @@ -214,6 +223,7 @@ async fn test_collect_to_list_boolean_dynamic() { 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,true 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,false 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A, 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,false 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A, 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,false @@ -235,6 +245,7 @@ async fn test_collect_to_small_list_boolean() { 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,true 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,false 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A, + 1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A, 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,true 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A, 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,false @@ -249,6 +260,82 @@ async fn test_collect_to_small_list_boolean() { "###); } +#[tokio::test] +async fn test_collect_primitive_since_minutely() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.n | collect(10, window=since(minutely())) | index(0) | when(is_valid($input)) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,0 + 1996-12-20T00:40:00.000000000,18446744073709551615,12960666915911099378,A,0 + 1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,2 + 1996-12-20T00:41:00.000000000,18446744073709551615,12960666915911099378,A,2 + 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,9 + 1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A,9 + 1996-12-20T00:42:00.000000000,18446744073709551615,12960666915911099378,A,9 + 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,-1 + 1996-12-20T00:43:00.000000000,18446744073709551615,12960666915911099378,A,-1 + 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A,10 + 1996-12-20T00:44:00.000000000,18446744073709551615,12960666915911099378,A,10 + 1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,5 + 1996-12-21T00:41:00.000000000,18446744073709551615,2867199309159137213,B,5 + 1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B,-2 + 1996-12-21T00:42:00.000000000,18446744073709551615,2867199309159137213,B,-2 + 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,2 + 1996-12-21T00:44:00.000000000,18446744073709551615,2867199309159137213,B,2 + 1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C,1 + 1996-12-22T00:45:00.000000000,18446744073709551615,2521269998124177631,C,1 + 1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,2 + 1996-12-22T00:46:00.000000000,18446744073709551615,2521269998124177631,C,2 + 1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C,3 + 1996-12-22T00:47:00.000000000,18446744073709551615,2521269998124177631,C,3 + 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,4 + "###); +} + +#[tokio::test] +async fn test_collect_primitive_since_minutely_1() { + // Only two rows in this set exist within the same minute, hence these results when + // getting the second item. + insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.n | collect(10, window=since(minutely())) | index(1) | when(is_valid($input)) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A,-7 + 1996-12-20T00:42:00.000000000,18446744073709551615,12960666915911099378,A,-7 + "###); +} + +#[tokio::test] +async fn test_collect_string_since_hourly() { + // note that `B` is empty because we collect `null` as a valid value in a list currently + insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.s | collect(10, window=since(hourly())) | index(2) | when(is_valid($input)) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,hey + 1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A,hey + 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,hey + 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A,hey + 1996-12-20T01:00:00.000000000,18446744073709551615,12960666915911099378,A,hey + 1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B, + 1996-12-21T01:00:00.000000000,18446744073709551615,2867199309159137213,B, + 1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C,goo + 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,goo + "###); +} + +#[tokio::test] +async fn test_collect_boolean_since_hourly() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.b | collect(10, window=since(hourly())) | index(3) | when(is_valid($input)) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A,false + 1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,false + 1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A,false + 1996-12-20T01:00:00.000000000,18446744073709551615,12960666915911099378,A,false + 1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,false + 1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B,false + 1996-12-21T01:00:00.000000000,18446744073709551615,2867199309159137213,B,false + 1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,true + "###); +} + #[tokio::test] async fn test_require_literal_max() { // TODO: We should figure out how to not report the second error -- type variables with diff --git a/crates/sparrow-runtime/src/execute/operation/tick.rs b/crates/sparrow-runtime/src/execute/operation/tick.rs index 367285d86..23d71a26d 100644 --- a/crates/sparrow-runtime/src/execute/operation/tick.rs +++ b/crates/sparrow-runtime/src/execute/operation/tick.rs @@ -453,8 +453,10 @@ async fn send_tick_batch( // The subsort value is set to `u64::MAX` in order to ensure ticks are // processed after all other rows at the same time. let subsort_column = - // SAFETY: We create the iterator with a known / fixed length. + + // SAFETY: We create the iterator with a known / fixed length. unsafe { UInt64Array::from_trusted_len_iter(std::iter::repeat(Some(u64::MAX)).take(len)) }; + let subsort_column: ArrayRef = Arc::new(subsort_column); // Create a tick column consisting of booleans set to `true`.