From 264b16796bf9bd82d5a72bb6ab78d6e2d9ea1420 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Thu, 9 Jan 2025 12:42:29 +0100 Subject: [PATCH] feat: by default, always use latest version of source per event Signed-off-by: F.N. Claessen --- flexmeasures/data/models/data_sources.py | 61 +++++++++++++++++++ .../data/models/reporting/aggregator.py | 16 ++++- .../data/models/reporting/tests/conftest.py | 13 +++- .../models/reporting/tests/test_aggregator.py | 13 ++++ flexmeasures/data/models/time_series.py | 5 ++ 5 files changed, 106 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/models/data_sources.py b/flexmeasures/data/models/data_sources.py index 7ace27edb..3b3060287 100644 --- a/flexmeasures/data/models/data_sources.py +++ b/flexmeasures/data/models/data_sources.py @@ -4,6 +4,7 @@ from typing import TYPE_CHECKING, Any, ClassVar from sqlalchemy.ext.mutable import MutableDict +import pandas as pd import timely_beliefs as tb from packaging.version import Version @@ -391,3 +392,63 @@ def keep_latest_version(data_sources: list[DataSource]) -> list[DataSource]: ) return list(sources.values()) + + +def keep_latest_version_in_bdf(bdf: tb.BeliefsDataFrame) -> tb.BeliefsDataFrame: + """Filters the BeliefsDataFrame to keep the latest version of each source, for each event. + + The function performs the following steps: + 1. Resets the index to flatten the DataFrame. + 2. Adds columns for the source's name, type, model, and version. + 3. Sorts the rows by event_start and source.version in descending order. + 4. Removes duplicates based on event_start, source.name, source.type, and source.model, keeping the latest version. + 5. Drops the temporary columns added for source attributes. + 6. Restores the original index. + + Parameters: + ----------- + bdf : tb.BeliefsDataFrame + The input BeliefsDataFrame containing event_start and source information. + + Returns: + -------- + tb.BeliefsDataFrame + A new BeliefsDataFrame containing only the latest version of each source + for each event_start, with the original index restored. + """ + + # Remember the original index, then reset it + index_levels = bdf.index.names + bdf = bdf.reset_index() + + # Add source-related columns using vectorized operations for clarity + bdf[["source.name", "source.type", "source.model", "source.version"]] = bdf[ + "source" + ].apply( + lambda s: pd.Series( + { + "source.name": s.name, + "source.type": s.type, + "source.model": s.model, + "source.version": Version( + s.version if s.version is not None else "0.0.0" + ), + } + ) + ) + + # Sort by event_start and version, keeping only the latest version + bdf = bdf.sort_values(by=["event_start", "source.version"], ascending=[True, False]) + + # Drop duplicates based on event_start and source identifiers, keeping the latest version + bdf = bdf.drop_duplicates( + ["event_start", "source.name", "source.type", "source.model"] + ) + + # Remove temporary columns and restore the original index + bdf = bdf.drop( + columns=["source.name", "source.type", "source.model", "source.version"] + ) + bdf = bdf.set_index(index_levels) + + return bdf diff --git a/flexmeasures/data/models/reporting/aggregator.py b/flexmeasures/data/models/reporting/aggregator.py index 6aabe17d9..2502eb746 100644 --- a/flexmeasures/data/models/reporting/aggregator.py +++ b/flexmeasures/data/models/reporting/aggregator.py @@ -72,7 +72,21 @@ def _compute_report( ) # found multiple sources in the beliefs of df but no source is specified - if len(df.lineage.sources) > 1 and (source is None or len(source) == 0): + unique_sources = df.lineage.sources + properties = [ + "name", + "type", + "model", + ] # properties to identify different versions of the same source + if ( + len(unique_sources) > 1 + and not all( + getattr(source, prop) == getattr(unique_sources[0], prop) + for prop in properties + for source in unique_sources + ) + and (source is None or len(source) == 0) + ): raise ValueError( "Missing attribute source or sources. The fields `source` or `sources` is required when having multiple sources within the time window." ) diff --git a/flexmeasures/data/models/reporting/tests/conftest.py b/flexmeasures/data/models/reporting/tests/conftest.py index 0f26276b2..4e5cbaef4 100644 --- a/flexmeasures/data/models/reporting/tests/conftest.py +++ b/flexmeasures/data/models/reporting/tests/conftest.py @@ -166,10 +166,11 @@ def setup_dummy_data(db, app, generic_report): db.session.add(daily_report_sensor) """ - Create 2 DataSources + Create 3 DataSources """ source1 = DataSource("source1") source2 = DataSource("source2") + source2v02 = DataSource("source2", version="0.2") """ Create TimedBeliefs @@ -260,6 +261,16 @@ def setup_dummy_data(db, app, generic_report): source=source1, ) ) + # add a belief belonging to version 0.2 of Source 2 around the end of the day + beliefs.append( + TimedBelief( + event_start=datetime(2023, 4, 24, tzinfo=utc) + timedelta(hours=23), + belief_horizon=timedelta(hours=24), + event_value=3, + sensor=sensor3, + source=source2v02, + ) + ) # add data for sensor 4 for t in range(24 * 3): diff --git a/flexmeasures/data/models/reporting/tests/test_aggregator.py b/flexmeasures/data/models/reporting/tests/test_aggregator.py index 08f15b93e..f52395171 100644 --- a/flexmeasures/data/models/reporting/tests/test_aggregator.py +++ b/flexmeasures/data/models/reporting/tests/test_aggregator.py @@ -225,3 +225,16 @@ def test_source_transition(setup_dummy_data, db): output=[dict(sensor=report_sensor)], belief_time=tz.localize(datetime(2023, 12, 1)), )[0]["data"] + + # the exception to the above is when a new version of the same source recorded a value, + # in which case the latest version takes precedence. This happened in the last hour of the day. + result = agg_reporter.compute( + start=tz.localize(datetime(2023, 4, 24, 18, 0)), + end=tz.localize(datetime(2023, 4, 25)), + input=[dict(sensor=s3)], + output=[dict(sensor=report_sensor)], + belief_time=tz.localize(datetime(2023, 12, 1)), + )[0]["data"] + + assert (result[:5] == -1).all().event_value # beliefs from the older version + assert (result[5:] == 3).all().event_value # belief from the latest version diff --git a/flexmeasures/data/models/time_series.py b/flexmeasures/data/models/time_series.py index 07767f978..6abf8fa03 100644 --- a/flexmeasures/data/models/time_series.py +++ b/flexmeasures/data/models/time_series.py @@ -17,6 +17,7 @@ from flexmeasures.auth.policy import AuthModelMixin from flexmeasures.data import db +from flexmeasures.data.models.data_sources import keep_latest_version_in_bdf from flexmeasures.data.models.parsing_utils import parse_source_arg from flexmeasures.data.services.annotations import prepare_annotations_for_chart from flexmeasures.data.services.timerange import get_timerange @@ -649,6 +650,7 @@ def search( user_source_ids: int | list[int] | None = None, source_types: list[str] | None = None, exclude_source_types: list[str] | None = None, + use_latest_version_per_event: bool = True, most_recent_beliefs_only: bool = True, most_recent_events_only: bool = False, most_recent_only: bool = False, @@ -672,6 +674,7 @@ def search( :param user_source_ids: Optional list of user source ids to query only specific user sources :param source_types: Optional list of source type names to query only specific source types * :param exclude_source_types: Optional list of source type names to exclude specific source types * + :param use_latest_version_per_event: only return the belief from the latest version of a source :param most_recent_beliefs_only: only return the most recent beliefs for each event from each source (minimum belief horizon). Defaults to True. :param most_recent_events_only: only return (post knowledge time) beliefs for the most recent event (maximum event start) :param most_recent_only: only return a single belief, the most recent from the most recent event. Fastest method if you only need one. @@ -732,6 +735,8 @@ def search( custom_filter_criteria=source_criteria, custom_join_targets=custom_join_targets, ) + if use_latest_version_per_event: + bdf = keep_latest_version_in_bdf(bdf) if one_deterministic_belief_per_event: # todo: compute median of collective belief instead of median of first belief (update expected test results accordingly) # todo: move to timely-beliefs: select mean/median belief