Skip to content

Commit

Permalink
[resotocore][feat] Add timeseries downsampling functionality (#1861)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias authored Dec 20, 2023
1 parent feaf489 commit 54b5f52
Show file tree
Hide file tree
Showing 9 changed files with 345 additions and 31 deletions.
34 changes: 34 additions & 0 deletions resotocore/resotocore/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -6038,6 +6038,8 @@ class TimeSeriesCommand(CLICommand):
timeseries snapshot --name <time series> <aggregate search>
timeseries get --name <time series> --start <time> --end <time> --granularity <duration|integer>
--group <group> --filter <var><op><value>
timeseries list
timeseries downsample
```
The timeseries command can be used to add data to a time series or to retrieve data from a time series.
Expand All @@ -6051,6 +6053,13 @@ class TimeSeriesCommand(CLICommand):
To retrieve data from a time series, you need to specify the name of the time series, as well as a time range.
It is possible to group the data by a certain variable, and to filter the data by a certain variable.
All available timeseries can be retrieved with the `timeseries list` command.
The `timeseries downsample` command can be used to downsample all time series.
See resoto.core db configuration to configure the buckets that are used for downsampling.
General idea of downsample: higher resolution and more datapoints for recent data,
lower resolution and less datapoints for older data.
## Parameters
- `--name` - The name of the time series.
Expand All @@ -6069,6 +6078,12 @@ class TimeSeriesCommand(CLICommand):
# retrieve the number of instances per instance type since yesterday
> timeseries get --name instances --start @yesterday@ --end @now@
# downsample existing time series
> timeseries downsample
# list all time series
> timeseries list
```
"""

Expand All @@ -6081,6 +6096,7 @@ def info(self) -> str:

def args_info(self) -> ArgsInfo:
return {
"list": [],
"snapshot": [
ArgInfo("--name", help_text="<time series>."),
ArgInfo(expects_value=True, value_hint="search"),
Expand All @@ -6093,6 +6109,7 @@ def args_info(self) -> ArgsInfo:
ArgInfo("--filter", expects_value=True, can_occur_multiple_times=True, help_text="<var><op><value>"),
ArgInfo("--granularity", expects_value=True, help_text="<duration|integer>"),
],
"downsample": [],
}

def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwargs: Any) -> CLIAction:
Expand Down Expand Up @@ -6128,13 +6145,30 @@ def parse_duration_or_int(s: str) -> Union[int, timedelta]:
)
return CLISourceContext(cursor.count(), cursor.full_count()), cursor

async def list_ts() -> Tuple[int, JsGen]:
ts = await self.dependencies.db_access.time_series_db.list_time_series()
return len(ts), stream.iterate([to_js(a) for a in ts])

async def downsample() -> Tuple[int, JsGen]:
ts = await self.dependencies.db_access.time_series_db.downsample()
if isinstance(ts, str):
return 1, stream.just(ts)
elif ts:
return len(ts), stream.iterate([{k: v} for k, v in ts.items()])
else:
return 1, stream.just("No time series to downsample.")

args = re.split("\\s+", arg, maxsplit=1) if arg else []
if arg and len(args) == 2 and args[0] == "snapshot":
return CLISource.single(
partial(snapshot_time_series, args[1].strip()), required_permissions={Permission.read}
)
elif arg and len(args) == 2 and args[0] == "get":
return CLISource(partial(load_time_series, args[1].strip()), required_permissions={Permission.read})
elif arg and len(args) == 1 and args[0] == "list":
return CLISource.only_count(list_ts, required_permissions={Permission.read})
elif arg and len(args) == 1 and args[0] == "downsample":
return CLISource.only_count(downsample, required_permissions={Permission.read})
else:
return CLISource.single(
lambda: stream.just(self.rendered_help(ctx)), required_permissions={Permission.read}
Expand Down
38 changes: 34 additions & 4 deletions resotocore/resotocore/core_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ def access_token_expiration(self) -> timedelta:
)


@define()
class TimeSeriesBucketConfig(ConfigObject):
kind: ClassVar[str] = f"{ResotoCoreRoot}_timeseries_bucket_config"
start: int = field(metadata={"description": "Start of the bucket in seconds."})
resolution: int = field(metadata={"description": "Resolution of the bucket in seconds."})


@define()
class DatabaseConfig(ConfigObject):
kind: ClassVar[str] = f"{ResotoCoreRoot}_database_config"
Expand All @@ -192,9 +199,6 @@ class DatabaseConfig(ConfigObject):
default=False, metadata={"description": "If the connection should not be verified (default: False)"}
)
request_timeout: int = field(default=900, metadata={"description": "Request timeout in seconds (default: 900)"})
time_series_ttl: timedelta = field(
default=timedelta(days=90), metadata={"description": "Time series TTL (default: 90d)"}
)


@define(order=True, hash=True, frozen=True)
Expand Down Expand Up @@ -572,6 +576,26 @@ class RunConfig(ConfigObject):
verify: Union[bool, str, None] = None


@define
class TimeSeriesConfig(ConfigObject):
kind: ClassVar[str] = f"{ResotoCoreRoot}_timeseries_config"

buckets: List[TimeSeriesBucketConfig] = field(
factory=lambda: [
TimeSeriesBucketConfig(
start=int(timedelta(days=2).total_seconds()), resolution=int(timedelta(hours=4).total_seconds())
),
TimeSeriesBucketConfig(
start=int(timedelta(days=30).total_seconds()), resolution=int(timedelta(days=1).total_seconds())
),
TimeSeriesBucketConfig(
start=int(timedelta(days=180).total_seconds()), resolution=int(timedelta(days=3).total_seconds())
),
],
metadata={"description": "List of time series buckets."},
)


@define()
class CoreConfig(ConfigObject):
api: ApiConfig
Expand All @@ -584,6 +608,7 @@ class CoreConfig(ConfigObject):
snapshots: SnapshotsScheduleConfig
args: Namespace
run: RunConfig
timeseries: TimeSeriesConfig

@property
def multi_tenant_setup(self) -> bool:
Expand All @@ -595,7 +620,7 @@ def no_scheduling(self) -> bool:

@property
def editable(self) -> "EditableConfig":
return EditableConfig(self.api, self.cli, self.graph_update, self.runtime, self.workflows)
return EditableConfig(self.api, self.cli, self.graph_update, self.runtime, self.workflows, self.timeseries)

def json(self) -> Json:
return {ResotoCoreRoot: to_js(self.editable, strip_attr="kind")}
Expand Down Expand Up @@ -627,6 +652,10 @@ class EditableConfig(ConfigObject):
factory=lambda: {"collect_and_cleanup": WorkflowConfig(schedule="0 * * * *")},
metadata={"description": "Workflow related properties."},
)
timeseries: TimeSeriesConfig = field(
factory=TimeSeriesConfig,
metadata={"description": "Time series related properties."},
)


def config_model() -> List[Json]:
Expand Down Expand Up @@ -740,6 +769,7 @@ def parse_config(
runtime=ed.runtime,
workflows=ed.workflows,
run=RunConfig(), # overridden for each run
timeseries=ed.timeseries,
)


Expand Down
2 changes: 1 addition & 1 deletion resotocore/resotocore/db/arango_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ def load_time_series(
bv_start = ctx.add_bind_var(int(start.timestamp()))
bv_end = ctx.add_bind_var(int(end.timestamp()))

query = f"FOR d in `{time_series_collection}` FILTER d.ts==@{bv_name} AND d.at>=@{bv_start} AND d.at<=@{bv_end}"
query = f"FOR d in `{time_series_collection}` FILTER d.ts==@{bv_name} AND d.at>=@{bv_start} AND d.at<@{bv_end}"
if group_filter:
parts = []
for f in group_filter:
Expand Down
2 changes: 1 addition & 1 deletion resotocore/resotocore/db/db_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(
self.configs_model_db = model_db(self.db, configs_model)
self.template_entity_db = template_entity_db(self.db, template_entity)
self.package_entity_db = app_package_entity_db(self.db, infra_app_packages)
self.time_series_db = TimeSeriesDB(self.db, time_series, config.db.time_series_ttl)
self.time_series_db = TimeSeriesDB(self.db, time_series, config)
self.graph_dbs: Dict[str, GraphDB] = {}
self.config = config
self.cleaner = Periodic("outdated_updates_cleaner", self.check_outdated_updates, timedelta(seconds=60))
Expand Down
Loading

0 comments on commit 54b5f52

Please sign in to comment.