Skip to content

Commit

Permalink
splitted functions in time_series_api
Browse files Browse the repository at this point in the history
  • Loading branch information
kilianp14 committed Oct 16, 2023
1 parent f2d1d54 commit 5a224e6
Showing 1 changed file with 67 additions and 48 deletions.
115 changes: 67 additions & 48 deletions vessim/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class TimeSeriesApi:
The data should contain two indices. One is the "Request Timestamp", marking
the time when the forecast was made. One is the "Forecast Timestamp",
indicating the time the forecast is made for.
- If there is no "Request Timestamp", the data is treated as a global
- If data does not include a "Request Timestamp", it is treated as a global
forecast that does not change over time.
- If `forecast` is not provided, predictions are derived from the
actual data when requesting forecasts.
Expand Down Expand Up @@ -91,7 +91,7 @@ def __init__(
self._fill_method = fill_method

def zones(self) -> List:
"""Returns a list of all available zones."""
"""Returns a list of all zones, where actual data is available."""
return list(self._actual.columns)

def actual(self, dt: DatetimeLike, zone: Optional[str] = None) -> Any:
Expand All @@ -108,15 +108,8 @@ def actual(self, dt: DatetimeLike, zone: Optional[str] = None) -> Any:
Raises:
ValueError: If there is no available data at apecified zone or time.
"""
if zone is None:
if len(self.zones()) == 1:
zone = self.zones()[0]
else:
raise ValueError("Zone needs to be specified.")
try:
zone_actual = self._actual[zone]
except KeyError:
raise ValueError(f"Cannot retrieve actual data at zone '{zone}'.")
zone_index = self._get_zone_index_from_dataframe(self._actual, zone)
zone_actual = self._actual.iloc[:, zone_index]

data_point = zone_actual.reindex(index=[dt], method=self._fill_method).iloc[0]
if pd.isna(data_point):
Expand Down Expand Up @@ -186,7 +179,24 @@ def forecast(
2020-01-01 02:00:00 2.0
Freq: 30T, Name: zone_a, dtype: float64
"""
# Determine which data source to use
data_src = self._get_forecast_data_source(start_time)

zone_index = self._get_zone_index_from_dataframe(data_src, zone)

# Get data of zone and convert to pd.Series
forecast: pd.Series = data_src.iloc[:, zone_index].squeeze()

# Resample the data to get the data to specified frequency
if frequency is not None:
forecast = self._resample_to_frequency(
forecast, start_time, end_time, frequency, resample_method=resample_method
)
return forecast.iloc[1:]

return forecast.loc[(forecast.index > start_time) & (forecast.index <= end_time)]

def _get_forecast_data_source(self, start_time: DatetimeLike):
"""Returns dataframe used to derive forecast prediction."""
data_src: pd.DataFrame
if self._forecast is None:
# Get all data points beginning at the nearest existing timestamp
Expand All @@ -207,54 +217,63 @@ def forecast(
if data_src.empty:
raise ValueError(f"Cannot retrieve forecast data at '{start_time}'.")

return data_src

def _get_zone_index_from_dataframe(self, df: pd.DataFrame, zone: Optional[str]):
"""Return column index of zone from given dataframe.
If zone is not specified, but there is only one column in the dataframe, the
first index is returned.
Raises:
ValueError: If zone index can not be determined.
"""
if zone is None:
if len(self.zones()) == 1:
if len(df.columns) == 1:
zone_index = 0
else:
raise ValueError("Zone needs to be specified.")
else:
try:
zone_index = data_src.columns.get_loc(zone)
zone_index = df.columns.get_loc(zone)
except KeyError:
raise ValueError(f"Cannot retrieve forecast data for zone '{zone}'.")

# Get data of specified zone and convert to pd.Series
forecast: pd.Series = data_src.iloc[:, zone_index].squeeze()
return zone_index

# Resample the data to get the data to specified frequency
if frequency is not None:
frequency = pd.tseries.frequencies.to_offset(frequency)
if frequency is None:
raise ValueError(f"Frequency '{frequency}' invalid.")

# Add NaN values in the specified frequency
new_index = pd.date_range(start=start_time, end=end_time, freq=frequency)
combined_index = forecast.index.union(new_index).sort_values()
forecast = forecast.reindex(combined_index)

# Use specific resample method if specified to fill NaN values
if resample_method == "ffill":
forecast.ffill(inplace=True)
elif resample_method == "bfill":
forecast.bfill(inplace=True)
elif resample_method is not None:
forecast.interpolate(method=resample_method, inplace=True) # type: ignore
elif forecast.isnull().values.any(): # type: ignore
# Check if there are NaN values if resample method is not specified
raise ValueError(
f"Not enough data at frequency '{frequency}'. Specify resample_method"
)

# Get the data to the desired frequency after interpolation
return (
forecast.loc[
(forecast.index >= start_time) & (forecast.index <= end_time)
]
.asfreq(frequency)
.iloc[1:]
def _resample_to_frequency(
self,
df: pd.DataFrame,
start_time: DatetimeLike,
end_time: DatetimeLike,
frequency: Optional[Union[str, pd.DateOffset, timedelta]] = None,
resample_method: Optional[str] = None,
):
"""Transform dataframe into the desired frequency between start and end time."""
frequency = pd.tseries.frequencies.to_offset(frequency)
if frequency is None:
raise ValueError(f"Frequency '{frequency}' invalid.")

# Add NaN values in the specified frequency
new_index = pd.date_range(start=start_time, end=end_time, freq=frequency)
combined_index = df.index.union(new_index).sort_values()
df = df.reindex(combined_index)

# Use specific resample method if specified to fill NaN values
if resample_method == "ffill":
df.ffill(inplace=True)
elif resample_method == "bfill":
df.bfill(inplace=True)
elif resample_method is not None:
df.interpolate(method=resample_method, inplace=True) # type: ignore
elif df.isnull().values.any(): # type: ignore
# Check if there are NaN values if resample method is not specified
raise ValueError(
f"Not enough data at frequency '{frequency}'. Specify resample_method"
)

return forecast.loc[(forecast.index > start_time) & (forecast.index <= end_time)]
# Get the data to the desired frequency after interpolation
return df.loc[(df.index >= start_time) & (df.index <= end_time)].asfreq(frequency)

def next_update(self, dt: DatetimeLike) -> datetime:
"""Returns the next time of when the trace will change.
Expand Down

0 comments on commit 5a224e6

Please sign in to comment.