Skip to content

Commit

Permalink
Merge pull request #60 from NREL/feat/convert-storage
Browse files Browse the repository at this point in the history
Feat/convert storage
  • Loading branch information
daniel-thom authored Dec 12, 2024
2 parents 60995ae + 2848f23 commit 581a786
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 61 deletions.
45 changes: 27 additions & 18 deletions src/infrasys/arrow_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from uuid import UUID

import numpy as np
from numpy.typing import NDArray
import pyarrow as pa
import pint
from loguru import logger

from infrasys.exceptions import ISNotStored
Expand Down Expand Up @@ -55,20 +55,25 @@ def add_time_series(
metadata: TimeSeriesMetadata,
time_series: TimeSeriesData,
) -> None:
fpath = self._ts_directory.joinpath(f"{metadata.time_series_uuid}{EXTENSION}")
if isinstance(time_series, SingleTimeSeries):
self.add_raw_single_time_series(metadata.time_series_uuid, time_series.data_array)
else:
msg = f"Bug: need to implement add_time_series for {type(time_series)}"
raise NotImplementedError(msg)

def add_raw_single_time_series(
self, time_series_uuid: UUID, time_series_data: NDArray
) -> None:
fpath = self._ts_directory.joinpath(f"{time_series_uuid}{EXTENSION}")
if not fpath.exists():
if isinstance(time_series, SingleTimeSeries):
arrow_batch = self._convert_to_record_batch(time_series, metadata.variable_name)
with pa.OSFile(str(fpath), "wb") as sink: # type: ignore
with pa.ipc.new_file(sink, arrow_batch.schema) as writer:
writer.write(arrow_batch)
else:
msg = f"Bug: need to implement add_time_series for {type(time_series)}"
raise NotImplementedError(msg)
arrow_batch = self._convert_to_record_batch(time_series_data, str(time_series_uuid))
with pa.OSFile(str(fpath), "wb") as sink: # type: ignore
with pa.ipc.new_file(sink, arrow_batch.schema) as writer:
writer.write(arrow_batch)
logger.trace("Saving time series to {}", fpath)
logger.debug("Added {} to time series storage", time_series.summary)
logger.debug("Added {} to time series storage", time_series_uuid)
else:
logger.debug("{} was already stored", time_series.summary)
logger.debug("{} was already stored", time_series_uuid)

def get_time_series(
self,
Expand Down Expand Up @@ -111,7 +116,7 @@ def _get_single_time_series(
base_ts = pa.ipc.open_file(source).get_record_batch(0)
logger.trace("Reading time series from {}", fpath)
index, length = metadata.get_range(start_time=start_time, length=length)
data = base_ts[metadata.variable_name][index : index + length]
data = base_ts[str(metadata.time_series_uuid)][index : index + length]
if metadata.quantity_metadata is not None:
np_array = metadata.quantity_metadata.quantity_type(
data, metadata.quantity_metadata.units
Expand All @@ -127,14 +132,18 @@ def _get_single_time_series(
normalization=metadata.normalization,
)

def get_raw_single_time_series(self, time_series_uuid: UUID) -> NDArray:
fpath = self._ts_directory.joinpath(f"{time_series_uuid}{EXTENSION}")
with pa.OSFile(str(fpath), "r") as source:
base_ts = pa.ipc.open_file(source).get_record_batch(0)
logger.trace("Reading time series from {}", fpath)
return base_ts[str(time_series_uuid)].to_numpy()

def _convert_to_record_batch(
self, time_series: SingleTimeSeries, variable_name: str
self, time_series_array: NDArray, variable_name: str
) -> pa.RecordBatch:
"""Create record batch to save array to disk."""
if isinstance(time_series.data, pint.Quantity):
pa_array = pa.array(time_series.data.magnitude)
else:
pa_array = pa.array(time_series.data)
pa_array = pa.array(time_series_array)
schema = pa.schema([pa.field(variable_name, pa_array.type)])
return pa.record_batch([pa_array], schema=schema)

Expand Down
83 changes: 54 additions & 29 deletions src/infrasys/in_memory_time_series_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,73 @@

from datetime import datetime
from pathlib import Path
from typing import Optional
import numpy as np
from numpy.typing import NDArray
from typing import Optional, TypeAlias
from uuid import UUID

from loguru import logger
from infrasys.arrow_storage import ArrowTimeSeriesStorage

from infrasys.exceptions import ISNotStored
from infrasys.exceptions import ISNotStored, ISOperationNotAllowed
from infrasys.time_series_models import (
SingleTimeSeries,
SingleTimeSeriesMetadataBase,
SingleTimeSeriesMetadata,
TimeSeriesData,
TimeSeriesMetadata,
)
from infrasys.time_series_storage_base import TimeSeriesStorageBase

DataStoreType: TypeAlias = NDArray


class InMemoryTimeSeriesStorage(TimeSeriesStorageBase):
"""Stores time series in memory."""

def __init__(self) -> None:
self._arrays: dict[UUID, TimeSeriesData] = {} # Time series UUID, not metadata UUID
self._arrays: dict[UUID, DataStoreType] = {} # Time series UUID, not metadata UUID

def get_time_series_directory(self) -> None:
return None

def add_time_series(self, metadata: TimeSeriesMetadata, time_series: TimeSeriesData) -> None:
if metadata.time_series_uuid not in self._arrays:
self._arrays[metadata.time_series_uuid] = time_series
logger.debug("Added {} to store", time_series.summary)
if isinstance(time_series, SingleTimeSeries):
if metadata.time_series_uuid not in self._arrays:
self._arrays[metadata.time_series_uuid] = time_series.data_array
logger.debug("Added {} to store", time_series.summary)
else:
logger.debug("{} was already stored", time_series.summary)

else:
msg = f"add_time_series not implemented for {type(time_series)}"
raise NotImplementedError(msg)

def add_raw_single_time_series(
self, time_series_uuid: UUID, time_series_data: DataStoreType
) -> None:
if time_series_uuid not in self._arrays:
self._arrays[time_series_uuid] = time_series_data
logger.debug("Added {} to store", time_series_uuid)
else:
logger.debug("{} was already stored", time_series.summary)
logger.debug("{} was already stored", time_series_uuid)

def get_time_series(
self,
metadata: TimeSeriesMetadata,
start_time: datetime | None = None,
length: int | None = None,
) -> TimeSeriesData:
time_series = self._arrays.get(metadata.time_series_uuid)
if time_series is None:
msg = f"No time series with {metadata.time_series_uuid} is stored"
raise ISNotStored(msg)

if isinstance(metadata, SingleTimeSeriesMetadataBase):
return self._get_single_time_series(metadata, start_time=start_time, length=length)
if isinstance(metadata, SingleTimeSeriesMetadata):
return self._get_single_time_series(metadata, start_time, length)
raise NotImplementedError(str(metadata.get_time_series_data_type()))

def get_raw_single_time_series(self, time_series_uuid: UUID) -> NDArray:
data_array = self._arrays[time_series_uuid]
if not isinstance(data_array, np.ndarray):
msg = f"Can't retrieve type: {type(data_array)} as single_time_series"
raise ISOperationNotAllowed(msg)
return data_array

def remove_time_series(self, uuid: UUID) -> None:
time_series = self._arrays.pop(uuid, None)
if time_series is None:
Expand All @@ -58,28 +78,33 @@ def remove_time_series(self, uuid: UUID) -> None:
def serialize(self, dst: Path | str, _: Optional[Path | str] = None) -> None:
base_directory = dst if isinstance(dst, Path) else Path(dst)
storage = ArrowTimeSeriesStorage.create_with_permanent_directory(base_directory)
for ts in self._arrays.values():
metadata_type = ts.get_time_series_metadata_type()
metadata = metadata_type.from_data(ts)
storage.add_time_series(metadata, ts)
for ts_uuid, ts in self._arrays.items():
storage.add_raw_single_time_series(ts_uuid, ts)

def _get_single_time_series(
self,
metadata: SingleTimeSeriesMetadataBase,
metadata: SingleTimeSeriesMetadata,
start_time: datetime | None = None,
length: int | None = None,
) -> SingleTimeSeries:
base_ts = self._arrays[metadata.time_series_uuid]
assert isinstance(base_ts, SingleTimeSeries)
if start_time is None and length is None:
return base_ts
ts_data = self._arrays.get(metadata.time_series_uuid)
if ts_data is None:
msg = f"No time series with {metadata.time_series_uuid} is stored"
raise ISNotStored(msg)

if start_time or length:
index, length = metadata.get_range(start_time=start_time, length=length)
ts_data = ts_data[index : index + length]

index, length = metadata.get_range(start_time=start_time, length=length)
if metadata.quantity_metadata is not None:
ts_data = metadata.quantity_metadata.quantity_type(
ts_data, metadata.quantity_metadata.units
)
return SingleTimeSeries(
uuid=metadata.time_series_uuid,
variable_name=base_ts.variable_name,
resolution=base_ts.resolution,
initial_time=start_time or base_ts.initial_time,
data=base_ts.data[index : index + length],
variable_name=metadata.variable_name,
resolution=metadata.resolution,
initial_time=start_time or metadata.initial_time,
data=ts_data,
normalization=metadata.normalization,
)
1 change: 1 addition & 0 deletions src/infrasys/loggers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Contains logging configuration data."""

import sys

# Logger printing formats
Expand Down
38 changes: 38 additions & 0 deletions src/infrasys/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,44 @@ def merge_system(self, other: "System") -> None:

# TODO: add delete methods that (1) don't raise if not found and (2) don't return anything?

def convert_storage(self, **kwargs) -> None:
"""
Converts the time series storage medium.
Parameters
----------
**kwargs:
The same keys as TIME_SERIES_KWARGS in time_series_manager.py
{
"time_series_in_memory": bool = False,
"time_series_read_only": bool = False,
"time_series_directory": Path | None = None,
}
Only arguments that need to be changed from the default TIME_SERIES_KWARGS
need to be passed
Examples
--------
# Initialize the system (defaults to Arrow storage)
>>> system = infrasys.System(auto_add_composed_components=True)
# Add components and time series data
>>> generator, bus, load_data = create_some_data()
>>> system.add_components(generator, bus)
>>> system.add_time_series(load_data, generator)
# Convert the storage to in_memory
>>> kwargs = {"time_series_in_memory": True}
>>> system.convert_storage(**kwargs)
# Check the time series storage type
>>> isinstance(system._time_series_mgr._storage, InMemoryTimeSeriesStorage)
True
"""
return self._time_series_mgr.convert_storage(**kwargs)

@property
def _components(self) -> ComponentManager:
"""Return the component manager."""
Expand Down
52 changes: 40 additions & 12 deletions src/infrasys/time_series_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from infrasys.arrow_storage import ArrowTimeSeriesStorage
from infrasys import Component
from infrasys.exceptions import ISOperationNotAllowed
from infrasys.exceptions import ISInvalidParameter, ISOperationNotAllowed
from infrasys.in_memory_time_series_storage import InMemoryTimeSeriesStorage
from infrasys.time_series_metadata_store import TimeSeriesMetadataStore
from infrasys.time_series_models import (
Expand Down Expand Up @@ -40,17 +40,29 @@ def __init__(
initialize: bool = True,
**kwargs,
) -> None:
base_directory: Path | None = _process_time_series_kwarg("time_series_directory", **kwargs)
self._read_only = _process_time_series_kwarg("time_series_read_only", **kwargs)
self._storage = storage or (
InMemoryTimeSeriesStorage()
if _process_time_series_kwarg("time_series_in_memory", **kwargs)
else ArrowTimeSeriesStorage.create_with_temp_directory(base_directory=base_directory)
)
self._storage = storage or self.create_new_storage(**kwargs)
self._metadata_store = TimeSeriesMetadataStore(con, initialize=initialize)

# TODO: create parsing mechanism? CSV, CSV + JSON

@staticmethod
def create_new_storage(permanent: bool = False, **kwargs):
base_directory: Path | None = _process_time_series_kwarg("time_series_directory", **kwargs)

if _process_time_series_kwarg("time_series_in_memory", **kwargs):
return InMemoryTimeSeriesStorage()
else:
if permanent:
if base_directory is None:
msg = "Can't convert to perminant storage without a base directory"
raise ISInvalidParameter(msg)
return ArrowTimeSeriesStorage.create_with_permanent_directory(
directory=base_directory
)

return ArrowTimeSeriesStorage.create_with_temp_directory(base_directory=base_directory)

@property
def metadata_store(self) -> TimeSeriesMetadataStore:
"""Return the time series metadata store."""
Expand Down Expand Up @@ -264,10 +276,6 @@ def deserialize(
"""Deserialize the class. Must also call add_reference_counts after deserializing
components.
"""
if _process_time_series_kwarg("time_series_in_memory", **kwargs):
msg = "De-serialization does not support time_series_in_memory"
raise ISOperationNotAllowed(msg)

time_series_dir = Path(parent_dir) / data["directory"]

if _process_time_series_kwarg("time_series_read_only", **kwargs):
Expand All @@ -276,9 +284,29 @@ def deserialize(
storage = ArrowTimeSeriesStorage.create_with_temp_directory()
storage.serialize(src=time_series_dir, dst=storage.get_time_series_directory())

return cls(con, storage=storage, initialize=False, **kwargs)
cls_instance = cls(con, storage=storage, initialize=False, **kwargs)

if _process_time_series_kwarg("time_series_in_memory", **kwargs):
cls_instance.convert_storage(**kwargs)

return cls_instance

def _handle_read_only(self) -> None:
if self._read_only:
msg = "Cannot modify time series in read-only mode."
raise ISOperationNotAllowed(msg)

def convert_storage(self, **kwargs) -> None:
"""
Create a new storage instance and copy all time series from the current to new storage
"""
new_storage = self.create_new_storage(**kwargs)
for time_series_uuid in self.metadata_store.unique_uuids_by_type(
SingleTimeSeries.__name__
):
new_storage.add_raw_single_time_series(
time_series_uuid, self._storage.get_raw_single_time_series(time_series_uuid)
)

self._storage = new_storage
return None
8 changes: 8 additions & 0 deletions src/infrasys/time_series_metadata_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,14 @@ def _try_has_time_series_metadata_by_full_params(
)
return text is not None

def unique_uuids_by_type(self, time_series_type: str):
query = (
f"SELECT DISTINCT time_series_uuid from {self.TABLE_NAME} where time_series_type = ?"
)
params = (time_series_type,)
uuid_strings = self.sql(query, params)
return [UUID(ustr[0]) for ustr in uuid_strings]


@dataclass
class TimeSeriesCounts:
Expand Down
7 changes: 7 additions & 0 deletions src/infrasys/time_series_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ def from_time_array(
def get_time_series_metadata_type() -> Type:
return SingleTimeSeriesMetadata

@property
def data_array(self) -> NDArray:
if isinstance(self.data, pint.Quantity):
return self.data.magnitude
return self.data


class SingleTimeSeriesScalingFactor(SingleTimeSeries):
"""Defines a time array with a single dimension of floats that are 0-1 scaling factors."""
Expand Down Expand Up @@ -252,6 +258,7 @@ class TimeSeriesMetadata(InfraSysBaseModel, abc.ABC):
user_attributes: dict[str, Any] = {}
quantity_metadata: Optional[QuantityMetadata] = None
normalization: NormalizationModel = None
# TODO: refactor to type_ to avoid overriding builtin?
type: Literal["SingleTimeSeries", "SingleTimeSeriesScalingFactor"]

@property
Expand Down
Loading

0 comments on commit 581a786

Please sign in to comment.