Skip to content

Commit

Permalink
Treat unhandled Monte Carlo anomalies as data quality errors
Browse files Browse the repository at this point in the history
  • Loading branch information
mars-lan committed Dec 23, 2024
1 parent 9b2b596 commit 26c5717
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 17 deletions.
8 changes: 8 additions & 0 deletions metaphor/monte_carlo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ If some of the monitored data assets are Snowflake datasets, please provide the
snowflake_account: <account_name>
```
#### Treat Unhandled Anomalies as Errors
If set to true, the connector will treat unhandled [anomalies](https://docs.getmontecarlo.com/docs/detectors-overview) as data quality errors.
```yaml
treat_unhandled_anomalies_as_errors: true
```
## Testing
Follow the [Installation](../../README.md) instructions to install `metaphor-connectors` in your environment (or virtualenv). Make sure to include either `all` or `monte_carlo` extra.
Expand Down
3 changes: 3 additions & 0 deletions metaphor/monte_carlo/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class MonteCarloRunConfig(BaseConfig):
# Snowflake data source account
snowflake_account: Optional[str] = None

# Treat unhandled anomalies as errors
treat_unhandled_anomalies_as_errors: bool = False

# Errors to be ignored, e.g. timeouts
ignored_errors: List[str] = field(
default_factory=lambda: [
Expand Down
151 changes: 139 additions & 12 deletions metaphor/monte_carlo/extractor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
from datetime import datetime, timedelta
from typing import Collection, Dict, List

from dateutil import parser
Expand Down Expand Up @@ -53,6 +54,13 @@
"P3": DataMonitorSeverity.LOW,
}

alert_severity_map = {
"SEV_1": DataMonitorSeverity.HIGH,
"SEV_2": DataMonitorSeverity.MEDIUM,
"SEV_3": DataMonitorSeverity.LOW,
"SEV_4": DataMonitorSeverity.LOW,
}

connection_type_platform_map = {
"BIGQUERY": DataPlatform.BIGQUERY,
"REDSHIFT": DataPlatform.REDSHIFT,
Expand All @@ -79,6 +87,8 @@ def __init__(self, config: MonteCarloRunConfig):
)
self._ignored_errors = config.ignored_errors

self._treat_unhandled_anomalies_as_errors = config.treat_unhandled_anomalies_as_errors

self._client = Client(
session=Session(mcd_id=config.api_key_id, mcd_token=config.api_key_secret)
)
Expand All @@ -93,6 +103,9 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
self._fetch_tables()
self._fetch_monitors()

if self._treat_unhandled_anomalies_as_errors:
self._fetch_alerts()

return self._datasets.values()

def _fetch_monitors(self) -> None:
Expand All @@ -113,7 +126,6 @@ def _fetch_monitors(self) -> None:
uuid
name
description
entities
entityMcons
priority
monitorStatus
Expand All @@ -138,6 +150,78 @@ def _fetch_monitors(self) -> None:
json_dump_to_debug_file(monitors, "getMonitors.json")
self._parse_monitors(monitors)

def _fetch_alerts(self) -> None:
"""Fetch all alerts
See https://apidocs.getmontecarlo.com/#query-getAlerts
"""

batch_size = 200
end_cursor = None
created_after = datetime.now() - timedelta(days=30)
create_before = datetime.now()

result: List[Dict] = []

while True:
logger.info(f"Querying getAlerts after {end_cursor} ({len(result)} tables)")
resp = self._client(
"""
query getAlerts($first: Int, $after: String, $createdAfter: DateTime!, $createdBefore: DateTime!) {
getAlerts(first: $first
after: $after
filter: {
include:{
types: [ANOMALIES]
}
}
createdTime: {before: $createdBefore, after: $createdAfter}
) {
edges {
node {
id
title
type
status
severity
priority
owner {
email
}
createdTime
tables {
mcon
}
}
}
pageInfo {
endCursor
hasNextPage
}
}
}
""",
{
"first": batch_size,
"after": end_cursor,
"createdAfter": created_after.isoformat(),
"createdBefore": create_before.isoformat(),
},
)

nodes = [edge["node"] for edge in resp["get_alerts"]["edges"]]
result.extend(nodes)

if not resp["get_alerts"]["page_info"]["has_next_page"]:
break

end_cursor = resp["get_alerts"]["page_info"]["end_cursor"]

logger.info(f"Fetched {len(result)} alerts")
json_dump_to_debug_file(result, "getAlerts.json")

self._parse_alerts(result)

def _fetch_tables(self) -> None:
"""Fetch all tables
Expand Down Expand Up @@ -221,26 +305,65 @@ def _parse_monitors(self, monitors) -> None:
exceptions=exceptions,
)

if monitor["entities"] is None or monitor["entityMcons"] is None:
if monitor["entityMcons"] is None:
logger.info(f"Skipping monitors not linked to any entities: {uuid}")
continue

for index, entity in enumerate(monitor["entities"]):
if index > len(monitor["entityMcons"]) - 1:
logger.warning(f"Unmatched entity mcon in monitor {monitor}")
break

mcon = monitor["entityMcons"][index]
for mcon in monitor["entityMcons"]:
platform = self._mcon_platform_map.get(mcon)
if platform is None:
logger.warning(f"Unable to determine platform for {mcon}")
continue

name = self._convert_dataset_name(entity)
name = self._extract_dataset_name(mcon)
dataset = self._init_dataset(name, platform)
dataset.data_quality.url = f"{assets_base_url}/{mcon}/custom-monitors"
dataset.data_quality.monitors.append(data_monitor)

def _parse_alerts(self, alerts) -> None:
for alert in alerts:
id = alert["id"]

if alert["tables"] is None or len(alert["tables"]) == 0:
logger.info(f"Skipping alert not linked to any tables: {id}")
continue

# Filter out alerts with status (e.g. "ACKNOWLEDGED", "EXPECTED")
if alert["status"] is not None:
continue

# Alerts triggered by anomalies do not have any inherited priority.
# Derive it from assigned severity instead.
alert_severity = alert_severity_map.get(
alert["severity"], DataMonitorSeverity.UNKNOWN
)

owner = None
if alert["owner"] is not None and alert["owner"]["email"] is not None:
owner = alert["owner"]["email"]

data_monitor = DataMonitor(
title=alert["title"],
description=alert["title"],
owner=owner,
status=DataMonitorStatus.ERROR,
severity=alert_severity,
last_run=parser.parse(alert["createdTime"]),
targets=[],
)

for table in alert.get("tables", []):

mcon = table["mcon"]
platform = self._mcon_platform_map.get(mcon)
if platform is None:
logger.warning(f"Unable to determine platform for {mcon}")
continue

name = self._extract_dataset_name(mcon)
dataset = self._init_dataset(name, platform)
dataset.data_quality.monitors.append(data_monitor)

def _parse_monitor_status(self, monitor: Dict):
status = monitor_status_map.get(
monitor["monitorStatus"], DataMonitorStatus.UNKNOWN
Expand All @@ -258,9 +381,13 @@ def _parse_monitor_status(self, monitor: Dict):
return status

@staticmethod
def _convert_dataset_name(entity: str) -> str:
"""entity name format is <db>:<schema>.<table>"""
return normalize_full_dataset_name(entity.replace(":", ".", 1))
def _extract_dataset_name(mcon: str) -> str:
"""Extract dataset name form MCON"""

# MCON has the following format:
# MCON++{account_uuid}++{resource_uuid}++table++{db:schema.table}
_, _, _, _, obj_name = mcon.split("++")
return normalize_full_dataset_name(obj_name.replace(":", ".", 1))

def _init_dataset(self, normalized_name: str, platform: DataPlatform) -> Dataset:
account = self._account if platform == DataPlatform.SNOWFLAKE else None
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.162"
version = "0.14.163"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
1 change: 1 addition & 0 deletions tests/monte_carlo/config.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
---
api_key_id: key_id
api_key_secret: key_secret
treat_unhandled_anomalies_as_errors: true
output: {}
9 changes: 9 additions & 0 deletions tests/monte_carlo/expected.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
"targets": [],
"title": "auto_monitor_name_cd5b69bd-e465-4545-b3f9-a5d507ea766c",
"url": "https://getmontecarlo.com/monitors/e0dc143e-dd8a-4cb9-b4cc-dedec715d955"
},
{
"description": "Anomaly 1",
"lastRun": "2023-06-23T03:54:35.817000+00:00",
"owner": "[email protected]",
"severity": "HIGH",
"status": "ERROR",
"targets": [],
"title": "Anomaly 1"
}
],
"provider": "MONTE_CARLO",
Expand Down
1 change: 1 addition & 0 deletions tests/monte_carlo/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ def test_yaml_config(test_root_dir):
assert config == MonteCarloRunConfig(
api_key_id="key_id",
api_key_secret="key_secret",
treat_unhandled_anomalies_as_errors=True,
output=OutputConfig(),
)
51 changes: 47 additions & 4 deletions tests/monte_carlo/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def dummy_config():
api_key_id="key_id",
api_key_secret="key_secret",
snowflake_account="snow",
treat_unhandled_anomalies_as_errors=True,
output=OutputConfig(),
)

Expand Down Expand Up @@ -62,7 +63,6 @@ async def test_extractor(mock_pycarlo_client: MagicMock, test_root_dir: str):
"uuid": "e0dc143e-dd8a-4cb9-b4cc-dedec715d955",
"name": "auto_monitor_name_cd5b69bd-e465-4545-b3f9-a5d507ea766c",
"description": "Field Health for all fields in db:metaphor.test1",
"entities": ["db:metaphor.test1"],
"entityMcons": [
"MCON++6418a1e2-9718-4413-9d2b-6a354e01ddf8++a19e22b4-7659-4064-8fd4-8d6122fabe1c++table++db:metaphor.test1"
],
Expand All @@ -77,7 +77,6 @@ async def test_extractor(mock_pycarlo_client: MagicMock, test_root_dir: str):
"uuid": "ce4c4568-35f4-4365-a6fe-95f233fcf6c3",
"name": "auto_monitor_name_53c985e6-8f49-4af7-8ef1-7b402a27538b",
"description": "Field Health for all fields in db:metaphor.test2",
"entities": ["db:metaphor.test2"],
"entityMcons": [
"MCON++6418a1e2-9718-4413-9d2b-6a354e01ddf8++a19e22b4-7659-4064-8fd4-8d6122fabe1c++table++db:metaphor.test2"
],
Expand All @@ -92,7 +91,6 @@ async def test_extractor(mock_pycarlo_client: MagicMock, test_root_dir: str):
"uuid": "2c156c8d-ab4a-432f-b8bb-f9ea9f31ed3d",
"name": "auto_monitor_name_18637195-a3c4-416e-a3e2-a89cc10adbc8",
"description": "Field Health for all fields in db:metaphor.test3",
"entities": ["db:metaphor.test3"],
"entityMcons": [
"MCON++6418a1e2-9718-4413-9d2b-6a354e01ddf8++a19e22b4-7659-4064-8fd4-8d6122fabe1c++table++db:metaphor.test3"
],
Expand All @@ -107,7 +105,6 @@ async def test_extractor(mock_pycarlo_client: MagicMock, test_root_dir: str):
"uuid": "d14af7d8-6342-420a-bb09-5805fad677f1",
"name": "auto_monitor_name_693b98e3-950d-472b-83fe-8c8e5b5979f9",
"description": "Field Health for all fields in db:metaphor.test4",
"entities": ["db:metaphor.test4"],
"entityMcons": [
"MCON++6418a1e2-9718-4413-9d2b-6a354e01ddf8++a19e22b4-7659-4064-8fd4-8d6122fabe1c++table++db:metaphor.test4"
],
Expand All @@ -133,6 +130,52 @@ async def test_extractor(mock_pycarlo_client: MagicMock, test_root_dir: str):
},
]
},
{
"get_alerts": {
"edges": [
{
"node": {
"id": "1",
"title": "Anomaly 1",
"type": "ANOMALIES",
"status": None,
"severity": "SEV_1",
"priority": None,
"owner": {
"email": "[email protected]",
},
"createdTime": "2023-06-23T03:54:35.817000+00:00",
"tables": [
{
"mcon": "MCON++6418a1e2-9718-4413-9d2b-6a354e01ddf8++a19e22b4-7659-4064-8fd4-8d6122fabe1c++table++db:metaphor.test1",
}
]
}
},
{
"node": {
"id": "1",
"title": "Handled anomaly",
"type": "ANOMALIES",
"status": "EXPECTED",
"severity": "SEV_2",
"priority": None,
"owner": None,
"createdTime": "2023-06-23T03:54:35.817000+00:00",
"tables": [
{
"mcon": "MCON++6418a1e2-9718-4413-9d2b-6a354e01ddf8++a19e22b4-7659-4064-8fd4-8d6122fabe1c++table++db:metaphor.test1",
}
]
}
},
],
"page_info": {
"end_corsor": "cursor",
"has_next_page": False,
},
}
},
]

config = dummy_config()
Expand Down

0 comments on commit 26c5717

Please sign in to comment.