Skip to content

Commit

Permalink
feat: by default, always use latest version of source per event
Browse files Browse the repository at this point in the history
Signed-off-by: F.N. Claessen <[email protected]>
  • Loading branch information
Flix6x committed Jan 9, 2025
1 parent 1911669 commit 264b167
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 2 deletions.
61 changes: 61 additions & 0 deletions flexmeasures/data/models/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import TYPE_CHECKING, Any, ClassVar
from sqlalchemy.ext.mutable import MutableDict

import pandas as pd
import timely_beliefs as tb

from packaging.version import Version
Expand Down Expand Up @@ -391,3 +392,63 @@ def keep_latest_version(data_sources: list[DataSource]) -> list[DataSource]:
)

return list(sources.values())


def keep_latest_version_in_bdf(bdf: tb.BeliefsDataFrame) -> tb.BeliefsDataFrame:
"""Filters the BeliefsDataFrame to keep the latest version of each source, for each event.
The function performs the following steps:
1. Resets the index to flatten the DataFrame.
2. Adds columns for the source's name, type, model, and version.
3. Sorts the rows by event_start and source.version in descending order.
4. Removes duplicates based on event_start, source.name, source.type, and source.model, keeping the latest version.
5. Drops the temporary columns added for source attributes.
6. Restores the original index.
Parameters:
-----------
bdf : tb.BeliefsDataFrame
The input BeliefsDataFrame containing event_start and source information.
Returns:
--------
tb.BeliefsDataFrame
A new BeliefsDataFrame containing only the latest version of each source
for each event_start, with the original index restored.
"""

# Remember the original index, then reset it
index_levels = bdf.index.names
bdf = bdf.reset_index()

# Add source-related columns using vectorized operations for clarity
bdf[["source.name", "source.type", "source.model", "source.version"]] = bdf[
"source"
].apply(
lambda s: pd.Series(
{
"source.name": s.name,
"source.type": s.type,
"source.model": s.model,
"source.version": Version(
s.version if s.version is not None else "0.0.0"
),
}
)
)

# Sort by event_start and version, keeping only the latest version
bdf = bdf.sort_values(by=["event_start", "source.version"], ascending=[True, False])

# Drop duplicates based on event_start and source identifiers, keeping the latest version
bdf = bdf.drop_duplicates(
["event_start", "source.name", "source.type", "source.model"]
)

# Remove temporary columns and restore the original index
bdf = bdf.drop(
columns=["source.name", "source.type", "source.model", "source.version"]
)
bdf = bdf.set_index(index_levels)

return bdf
16 changes: 15 additions & 1 deletion flexmeasures/data/models/reporting/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,21 @@ def _compute_report(
)

# found multiple sources in the beliefs of df but no source is specified
if len(df.lineage.sources) > 1 and (source is None or len(source) == 0):
unique_sources = df.lineage.sources
properties = [
"name",
"type",
"model",
] # properties to identify different versions of the same source
if (
len(unique_sources) > 1
and not all(
getattr(source, prop) == getattr(unique_sources[0], prop)
for prop in properties
for source in unique_sources
)
and (source is None or len(source) == 0)
):
raise ValueError(
"Missing attribute source or sources. The fields `source` or `sources` is required when having multiple sources within the time window."
)
Expand Down
13 changes: 12 additions & 1 deletion flexmeasures/data/models/reporting/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,11 @@ def setup_dummy_data(db, app, generic_report):
db.session.add(daily_report_sensor)

"""
Create 2 DataSources
Create 3 DataSources
"""
source1 = DataSource("source1")
source2 = DataSource("source2")
source2v02 = DataSource("source2", version="0.2")

"""
Create TimedBeliefs
Expand Down Expand Up @@ -260,6 +261,16 @@ def setup_dummy_data(db, app, generic_report):
source=source1,
)
)
# add a belief belonging to version 0.2 of Source 2 around the end of the day
beliefs.append(
TimedBelief(
event_start=datetime(2023, 4, 24, tzinfo=utc) + timedelta(hours=23),
belief_horizon=timedelta(hours=24),
event_value=3,
sensor=sensor3,
source=source2v02,
)
)

# add data for sensor 4
for t in range(24 * 3):
Expand Down
13 changes: 13 additions & 0 deletions flexmeasures/data/models/reporting/tests/test_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,16 @@ def test_source_transition(setup_dummy_data, db):
output=[dict(sensor=report_sensor)],
belief_time=tz.localize(datetime(2023, 12, 1)),
)[0]["data"]

# the exception to the above is when a new version of the same source recorded a value,
# in which case the latest version takes precedence. This happened in the last hour of the day.
result = agg_reporter.compute(
start=tz.localize(datetime(2023, 4, 24, 18, 0)),
end=tz.localize(datetime(2023, 4, 25)),
input=[dict(sensor=s3)],
output=[dict(sensor=report_sensor)],
belief_time=tz.localize(datetime(2023, 12, 1)),
)[0]["data"]

assert (result[:5] == -1).all().event_value # beliefs from the older version
assert (result[5:] == 3).all().event_value # belief from the latest version
5 changes: 5 additions & 0 deletions flexmeasures/data/models/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from flexmeasures.auth.policy import AuthModelMixin
from flexmeasures.data import db
from flexmeasures.data.models.data_sources import keep_latest_version_in_bdf
from flexmeasures.data.models.parsing_utils import parse_source_arg
from flexmeasures.data.services.annotations import prepare_annotations_for_chart
from flexmeasures.data.services.timerange import get_timerange
Expand Down Expand Up @@ -649,6 +650,7 @@ def search(
user_source_ids: int | list[int] | None = None,
source_types: list[str] | None = None,
exclude_source_types: list[str] | None = None,
use_latest_version_per_event: bool = True,
most_recent_beliefs_only: bool = True,
most_recent_events_only: bool = False,
most_recent_only: bool = False,
Expand All @@ -672,6 +674,7 @@ def search(
:param user_source_ids: Optional list of user source ids to query only specific user sources
:param source_types: Optional list of source type names to query only specific source types *
:param exclude_source_types: Optional list of source type names to exclude specific source types *
:param use_latest_version_per_event: only return the belief from the latest version of a source
:param most_recent_beliefs_only: only return the most recent beliefs for each event from each source (minimum belief horizon). Defaults to True.
:param most_recent_events_only: only return (post knowledge time) beliefs for the most recent event (maximum event start)
:param most_recent_only: only return a single belief, the most recent from the most recent event. Fastest method if you only need one.
Expand Down Expand Up @@ -732,6 +735,8 @@ def search(
custom_filter_criteria=source_criteria,
custom_join_targets=custom_join_targets,
)
if use_latest_version_per_event:
bdf = keep_latest_version_in_bdf(bdf)
if one_deterministic_belief_per_event:
# todo: compute median of collective belief instead of median of first belief (update expected test results accordingly)
# todo: move to timely-beliefs: select mean/median belief
Expand Down

0 comments on commit 264b167

Please sign in to comment.