Skip to content

Commit

Permalink
[core][fix] move slotted data to previous slot, not next (#2226)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias authored Oct 8, 2024
1 parent e0f8435 commit 7f41f80
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 16 deletions.
4 changes: 2 additions & 2 deletions fixcore/fixcore/db/arango_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1157,9 +1157,9 @@ def load_time_series(
time_slot = ctx.next_crs()
slotter = int(granularity.total_seconds())
gran = ctx.add_bind_var(slotter)
offset = int(start.timestamp() - ((start.timestamp() // slotter) * slotter))
offset = ctx.add_bind_var(slotter - int(start.timestamp() - ((start.timestamp() // slotter) * slotter)))
# slot the time by averaging each single group
query += f" LET {time_slot} = (FLOOR(d.at / @{gran}) * @{gran}) + @{ctx.add_bind_var(offset)}"
query += f" LET {time_slot} = (FLOOR((d.at + @{offset}) / @{gran}) * @{gran}) - @{offset}"
query += f" COLLECT group_slot={time_slot}, complete_group=d.group"
if avg_factor: # Required as long as https://github.com/arangodb/arangodb/issues/21096 is not fixed
assert avg_factor > 0, "Given average factor must be greater than 0!"
Expand Down
16 changes: 8 additions & 8 deletions fixcore/tests/fixcore/db/arango_query_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,29 +331,29 @@ def test_load_time_series() -> None:
q, bv = load_time_series("ts", "foo", now - (24 * one_hour), now, one_hour, group_by=[])
assert (
q == "LET m1 = ( FOR d in `ts` FILTER d.ts==@b0 AND d.at>=@b1 AND d.at<@b2 "
"LET m0 = (FLOOR(d.at / @b3) * @b3) + @b4 "
"LET m0 = (FLOOR((d.at + @b4) / @b3) * @b3) - @b4 "
"COLLECT group_slot=m0, complete_group=d.group "
"AGGREGATE slot_avg = AVG(d.v) "
"RETURN {at: group_slot, group: complete_group, v: slot_avg} )\n "
"FOR d in m1 COLLECT group_slot=d.at AGGREGATE agg_val=avg(d.v) "
"SORT group_slot RETURN {at: group_slot, v: agg_val}"
)
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 800}
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 2800}
# no group by defined --> group by all values
q, bv = load_time_series("ts", "foo", now - (24 * one_hour), now, one_hour)
assert (
q == "FOR d in `ts` FILTER d.ts==@b0 AND d.at>=@b1 AND d.at<@b2 "
"LET m0 = (FLOOR(d.at / @b3) * @b3) + @b4 "
"LET m0 = (FLOOR((d.at + @b4) / @b3) * @b3) - @b4 "
"COLLECT group_slot=m0, complete_group=d.group "
"AGGREGATE slot_avg = AVG(d.v) "
"RETURN {at: group_slot, group: complete_group, v: slot_avg}"
)
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 800}
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 2800}
# group by specific group variables
q, bv = load_time_series("ts", "foo", now - (24 * one_hour), now, one_hour, group_by=["a", "b"])
assert (
q == "LET m1 = ( FOR d in `ts` FILTER d.ts==@b0 AND d.at>=@b1 AND d.at<@b2 "
"LET m0 = (FLOOR(d.at / @b3) * @b3) + @b4 "
"LET m0 = (FLOOR((d.at + @b4) / @b3) * @b3) - @b4 "
"COLLECT group_slot=m0, complete_group=d.group "
"AGGREGATE slot_avg = AVG(d.v) "
"RETURN {at: group_slot, group: complete_group, v: slot_avg} )\n "
Expand All @@ -362,22 +362,22 @@ def test_load_time_series() -> None:
"AGGREGATE agg_val=avg(d.v) "
"SORT group_slot RETURN {at: group_slot,group: { a: group_a, b: group_b }, v: agg_val}"
)
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 800}
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 2800}
# group by specific group variables and filter by group variables
q, bv = load_time_series(
"ts", "foo", now - (24 * one_hour), now, one_hour, group_by=["a", "b"], group_filter=[P("a").eq("a")]
)
assert (
q == "LET m1 = ( FOR d in `ts` FILTER d.ts==@b0 AND d.at>=@b1 AND d.at<@b2 FILTER d.group.a == @b3 "
"LET m0 = (FLOOR(d.at / @b4) * @b4) + @b5 "
"LET m0 = (FLOOR((d.at + @b5) / @b4) * @b4) - @b5 "
"COLLECT group_slot=m0, complete_group=d.group "
"AGGREGATE slot_avg = AVG(d.v) RETURN {at: group_slot, group: complete_group, v: slot_avg} )\n "
"FOR d in m1 "
"COLLECT group_slot=d.at, group_a=d.group.a, group_b=d.group.b "
"AGGREGATE agg_val=avg(d.v) "
"SORT group_slot RETURN {at: group_slot,group: { a: group_a, b: group_b }, v: agg_val}"
)
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": "a", "b4": 3600, "b5": 800}
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": "a", "b4": 3600, "b5": 2800}
# use avg-factor
q, _ = load_time_series("ts", "foo", now - (24 * one_hour), now, one_hour, avg_factor=1000)
assert "slot_avg = AVG(d.v / @b" in q # factor divides average
Expand Down
12 changes: 6 additions & 6 deletions fixcore/tests/fixcore/db/timeseriesdb_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ async def create_ts(before_now: timedelta, granularity: timedelta, number_of_ent
"bucket": "Bucket(start=5mo25d, end=2yr, resolution=3d)",
"start": "2021-12-01T00:00:00Z",
"end": "2023-06-04T00:00:00Z",
"data_points": 90, # 24 days (1d->3d) --> 0,3,6,9,12,15,18,21,24 ==> 9 with 10 entries each ==> 90
"data_points": 80, # 24 days (1d->3d) --> 0,3,6,9,12,15,18,21 ==> 8 with 10 entries each ==> 80
},
]
}
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 450
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 440
assert await timeseries_db.downsample(now) == "No changes since last downsample run"
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 450
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 440
assert await timeseries_db.downsample(now=now + timedelta(days=27)) == {
"test": [
{
Expand All @@ -114,7 +114,7 @@ async def create_ts(before_now: timedelta, granularity: timedelta, number_of_ent
},
]
}
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 230
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 220
assert await timeseries_db.downsample(now=now + timedelta(days=200)) == {
"test": [
{
Expand All @@ -125,9 +125,9 @@ async def create_ts(before_now: timedelta, granularity: timedelta, number_of_ent
}
]
}
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 140
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 130
assert await timeseries_db.downsample(now=now + timedelta(days=400)) == {}
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 140
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 130


async def test_acquire_lock(timeseries_db: TimeSeriesDB) -> None:
Expand Down

0 comments on commit 7f41f80

Please sign in to comment.