From 2de28b8835764e0e0594f4e69404ca78ea516e9b Mon Sep 17 00:00:00 2001 From: marcopeix Date: Thu, 28 Nov 2024 13:13:53 -0500 Subject: [PATCH] WIP - Accumulated z-scores with num_partitions --- nbs/src/nixtla_client.ipynb | 5 ++++- nixtla/nixtla_client.py | 10 +++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/nbs/src/nixtla_client.ipynb b/nbs/src/nixtla_client.ipynb index f0312050..dc1309c5 100644 --- a/nbs/src/nixtla_client.ipynb +++ b/nbs/src/nixtla_client.ipynb @@ -812,6 +812,8 @@ " )\n", " if 'anomaly_score' in first_res:\n", " resp['anomaly_score'] = np.hstack([res['anomaly_score'] for res in results])\n", + " if 'accumulated_anomaly_score' in first_res and first_res['accumulated_anomaly_score'] is not None:\n", + " resp['accumulated_anomaly_score'] = np.hstack([res['accumulated_anomaly_score'] for res in results])\n", " if first_res[\"intervals\"] is None:\n", " resp[\"intervals\"] = None\n", " else:\n", @@ -1811,7 +1813,7 @@ " .reset_index(drop=True))\n", " out = ufp.assign_columns(out, 'anomaly', resp['anomaly'])\n", " out = ufp.assign_columns(out, 'anomaly_score', resp['anomaly_score'])\n", - " if resp['accumulated_anomaly_score']is not None:\n", + " if threshold_method == 'multivariate':\n", " out = ufp.assign_columns(out, 'accumulated_anomaly_score', resp['accumulated_anomaly_score'])\n", " return _maybe_add_intervals(out, resp['intervals'])\n", "\n", @@ -2450,6 +2452,7 @@ " if method == 'detect_anomalies_realtime':\n", " schema.append('anomaly:bool')\n", " schema.append('anomaly_score:double')\n", + " schema.append('accumulated_anomaly_score:double')\n", " elif method == 'cross_validation':\n", " schema.append(('cutoff', schema[time_col].type))\n", " if level is not None and quantiles is not None:\n", diff --git a/nixtla/nixtla_client.py b/nixtla/nixtla_client.py index 298086b8..8331e2cb 100644 --- a/nixtla/nixtla_client.py +++ b/nixtla/nixtla_client.py @@ -741,6 +741,13 @@ def _make_partitioned_requests( ) if "anomaly_score" in first_res: resp["anomaly_score"] = np.hstack([res["anomaly_score"] for res in results]) + if ( + "accumulated_anomaly_score" in first_res + and first_res["accumulated_anomaly_score"] is not None + ): + resp["accumulated_anomaly_score"] = np.hstack( + [res["accumulated_anomaly_score"] for res in results] + ) if first_res["intervals"] is None: resp["intervals"] = None else: @@ -1752,7 +1759,7 @@ def detect_anomalies_realtime( out = out_aggregated.groupby(id_col).tail(detection_size).reset_index(drop=True) out = ufp.assign_columns(out, "anomaly", resp["anomaly"]) out = ufp.assign_columns(out, "anomaly_score", resp["anomaly_score"]) - if resp["accumulated_anomaly_score"] is not None: + if threshold_method == "multivariate": out = ufp.assign_columns( out, "accumulated_anomaly_score", resp["accumulated_anomaly_score"] ) @@ -2394,6 +2401,7 @@ def _get_schema( if method == "detect_anomalies_realtime": schema.append("anomaly:bool") schema.append("anomaly_score:double") + schema.append("accumulated_anomaly_score:double") elif method == "cross_validation": schema.append(("cutoff", schema[time_col].type)) if level is not None and quantiles is not None: