Skip to content

Commit

Permalink
WIP - Accumulated z-scores with num_partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
marcopeix committed Nov 28, 2024
1 parent 350ef80 commit 2de28b8
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 2 deletions.
5 changes: 4 additions & 1 deletion nbs/src/nixtla_client.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,8 @@
" )\n",
" if 'anomaly_score' in first_res:\n",
" resp['anomaly_score'] = np.hstack([res['anomaly_score'] for res in results])\n",
" if 'accumulated_anomaly_score' in first_res and first_res['accumulated_anomaly_score'] is not None:\n",
" resp['accumulated_anomaly_score'] = np.hstack([res['accumulated_anomaly_score'] for res in results])\n",
" if first_res[\"intervals\"] is None:\n",
" resp[\"intervals\"] = None\n",
" else:\n",
Expand Down Expand Up @@ -1811,7 +1813,7 @@
" .reset_index(drop=True))\n",
" out = ufp.assign_columns(out, 'anomaly', resp['anomaly'])\n",
" out = ufp.assign_columns(out, 'anomaly_score', resp['anomaly_score'])\n",
" if resp['accumulated_anomaly_score']is not None:\n",
" if threshold_method == 'multivariate':\n",
" out = ufp.assign_columns(out, 'accumulated_anomaly_score', resp['accumulated_anomaly_score'])\n",
" return _maybe_add_intervals(out, resp['intervals'])\n",
"\n",
Expand Down Expand Up @@ -2450,6 +2452,7 @@
" if method == 'detect_anomalies_realtime':\n",
" schema.append('anomaly:bool')\n",
" schema.append('anomaly_score:double')\n",
" schema.append('accumulated_anomaly_score:double')\n",
" elif method == 'cross_validation':\n",
" schema.append(('cutoff', schema[time_col].type))\n",
" if level is not None and quantiles is not None:\n",
Expand Down
10 changes: 9 additions & 1 deletion nixtla/nixtla_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,13 @@ def _make_partitioned_requests(
)
if "anomaly_score" in first_res:
resp["anomaly_score"] = np.hstack([res["anomaly_score"] for res in results])
if (
"accumulated_anomaly_score" in first_res
and first_res["accumulated_anomaly_score"] is not None
):
resp["accumulated_anomaly_score"] = np.hstack(
[res["accumulated_anomaly_score"] for res in results]
)
if first_res["intervals"] is None:
resp["intervals"] = None
else:
Expand Down Expand Up @@ -1752,7 +1759,7 @@ def detect_anomalies_realtime(
out = out_aggregated.groupby(id_col).tail(detection_size).reset_index(drop=True)
out = ufp.assign_columns(out, "anomaly", resp["anomaly"])
out = ufp.assign_columns(out, "anomaly_score", resp["anomaly_score"])
if resp["accumulated_anomaly_score"] is not None:
if threshold_method == "multivariate":
out = ufp.assign_columns(
out, "accumulated_anomaly_score", resp["accumulated_anomaly_score"]
)
Expand Down Expand Up @@ -2394,6 +2401,7 @@ def _get_schema(
if method == "detect_anomalies_realtime":
schema.append("anomaly:bool")
schema.append("anomaly_score:double")
schema.append("accumulated_anomaly_score:double")
elif method == "cross_validation":
schema.append(("cutoff", schema[time_col].type))
if level is not None and quantiles is not None:
Expand Down

0 comments on commit 2de28b8

Please sign in to comment.