From 5a224e647faf5c23986ac04e3916679f26abecef Mon Sep 17 00:00:00 2001 From: kilianp14 Date: Mon, 16 Oct 2023 15:11:28 +0200 Subject: [PATCH] splitted functions in time_series_api --- vessim/__init__.py | 115 ++++++++++++++++++++++++++------------------- 1 file changed, 67 insertions(+), 48 deletions(-) diff --git a/vessim/__init__.py b/vessim/__init__.py index 2e4bdae4..c590cbb2 100644 --- a/vessim/__init__.py +++ b/vessim/__init__.py @@ -23,7 +23,7 @@ class TimeSeriesApi: The data should contain two indices. One is the "Request Timestamp", marking the time when the forecast was made. One is the "Forecast Timestamp", indicating the time the forecast is made for. - - If there is no "Request Timestamp", the data is treated as a global + - If data does not include a "Request Timestamp", it is treated as a global forecast that does not change over time. - If `forecast` is not provided, predictions are derived from the actual data when requesting forecasts. @@ -91,7 +91,7 @@ def __init__( self._fill_method = fill_method def zones(self) -> List: - """Returns a list of all available zones.""" + """Returns a list of all zones, where actual data is available.""" return list(self._actual.columns) def actual(self, dt: DatetimeLike, zone: Optional[str] = None) -> Any: @@ -108,15 +108,8 @@ def actual(self, dt: DatetimeLike, zone: Optional[str] = None) -> Any: Raises: ValueError: If there is no available data at apecified zone or time. """ - if zone is None: - if len(self.zones()) == 1: - zone = self.zones()[0] - else: - raise ValueError("Zone needs to be specified.") - try: - zone_actual = self._actual[zone] - except KeyError: - raise ValueError(f"Cannot retrieve actual data at zone '{zone}'.") + zone_index = self._get_zone_index_from_dataframe(self._actual, zone) + zone_actual = self._actual.iloc[:, zone_index] data_point = zone_actual.reindex(index=[dt], method=self._fill_method).iloc[0] if pd.isna(data_point): @@ -186,7 +179,24 @@ def forecast( 2020-01-01 02:00:00 2.0 Freq: 30T, Name: zone_a, dtype: float64 """ - # Determine which data source to use + data_src = self._get_forecast_data_source(start_time) + + zone_index = self._get_zone_index_from_dataframe(data_src, zone) + + # Get data of zone and convert to pd.Series + forecast: pd.Series = data_src.iloc[:, zone_index].squeeze() + + # Resample the data to get the data to specified frequency + if frequency is not None: + forecast = self._resample_to_frequency( + forecast, start_time, end_time, frequency, resample_method=resample_method + ) + return forecast.iloc[1:] + + return forecast.loc[(forecast.index > start_time) & (forecast.index <= end_time)] + + def _get_forecast_data_source(self, start_time: DatetimeLike): + """Returns dataframe used to derive forecast prediction.""" data_src: pd.DataFrame if self._forecast is None: # Get all data points beginning at the nearest existing timestamp @@ -207,54 +217,63 @@ def forecast( if data_src.empty: raise ValueError(f"Cannot retrieve forecast data at '{start_time}'.") + return data_src + + def _get_zone_index_from_dataframe(self, df: pd.DataFrame, zone: Optional[str]): + """Return column index of zone from given dataframe. + + If zone is not specified, but there is only one column in the dataframe, the + first index is returned. + + Raises: + ValueError: If zone index can not be determined. + """ if zone is None: - if len(self.zones()) == 1: + if len(df.columns) == 1: zone_index = 0 else: raise ValueError("Zone needs to be specified.") else: try: - zone_index = data_src.columns.get_loc(zone) + zone_index = df.columns.get_loc(zone) except KeyError: raise ValueError(f"Cannot retrieve forecast data for zone '{zone}'.") - # Get data of specified zone and convert to pd.Series - forecast: pd.Series = data_src.iloc[:, zone_index].squeeze() + return zone_index - # Resample the data to get the data to specified frequency - if frequency is not None: - frequency = pd.tseries.frequencies.to_offset(frequency) - if frequency is None: - raise ValueError(f"Frequency '{frequency}' invalid.") - - # Add NaN values in the specified frequency - new_index = pd.date_range(start=start_time, end=end_time, freq=frequency) - combined_index = forecast.index.union(new_index).sort_values() - forecast = forecast.reindex(combined_index) - - # Use specific resample method if specified to fill NaN values - if resample_method == "ffill": - forecast.ffill(inplace=True) - elif resample_method == "bfill": - forecast.bfill(inplace=True) - elif resample_method is not None: - forecast.interpolate(method=resample_method, inplace=True) # type: ignore - elif forecast.isnull().values.any(): # type: ignore - # Check if there are NaN values if resample method is not specified - raise ValueError( - f"Not enough data at frequency '{frequency}'. Specify resample_method" - ) - - # Get the data to the desired frequency after interpolation - return ( - forecast.loc[ - (forecast.index >= start_time) & (forecast.index <= end_time) - ] - .asfreq(frequency) - .iloc[1:] + def _resample_to_frequency( + self, + df: pd.DataFrame, + start_time: DatetimeLike, + end_time: DatetimeLike, + frequency: Optional[Union[str, pd.DateOffset, timedelta]] = None, + resample_method: Optional[str] = None, + ): + """Transform dataframe into the desired frequency between start and end time.""" + frequency = pd.tseries.frequencies.to_offset(frequency) + if frequency is None: + raise ValueError(f"Frequency '{frequency}' invalid.") + + # Add NaN values in the specified frequency + new_index = pd.date_range(start=start_time, end=end_time, freq=frequency) + combined_index = df.index.union(new_index).sort_values() + df = df.reindex(combined_index) + + # Use specific resample method if specified to fill NaN values + if resample_method == "ffill": + df.ffill(inplace=True) + elif resample_method == "bfill": + df.bfill(inplace=True) + elif resample_method is not None: + df.interpolate(method=resample_method, inplace=True) # type: ignore + elif df.isnull().values.any(): # type: ignore + # Check if there are NaN values if resample method is not specified + raise ValueError( + f"Not enough data at frequency '{frequency}'. Specify resample_method" ) - return forecast.loc[(forecast.index > start_time) & (forecast.index <= end_time)] + # Get the data to the desired frequency after interpolation + return df.loc[(df.index >= start_time) & (df.index <= end_time)].asfreq(frequency) def next_update(self, dt: DatetimeLike) -> datetime: """Returns the next time of when the trace will change.