diff --git a/metaphor/monte_carlo/README.md b/metaphor/monte_carlo/README.md index 0232ae91..76f0e711 100644 --- a/metaphor/monte_carlo/README.md +++ b/metaphor/monte_carlo/README.md @@ -32,6 +32,14 @@ If some of the monitored data assets are Snowflake datasets, please provide the snowflake_account: ``` +#### Treat Unhandled Anomalies as Errors + +If set to true, the connector will treat unhandled [anomalies](https://docs.getmontecarlo.com/docs/detectors-overview) as data quality errors. + +```yaml +treat_unhandled_anomalies_as_errors: true +``` + ## Testing Follow the [Installation](../../README.md) instructions to install `metaphor-connectors` in your environment (or virtualenv). Make sure to include either `all` or `monte_carlo` extra. diff --git a/metaphor/monte_carlo/config.py b/metaphor/monte_carlo/config.py index c1ddc3bd..9816140d 100644 --- a/metaphor/monte_carlo/config.py +++ b/metaphor/monte_carlo/config.py @@ -15,6 +15,9 @@ class MonteCarloRunConfig(BaseConfig): # Snowflake data source account snowflake_account: Optional[str] = None + # Treat unhandled anomalies as errors + treat_unhandled_anomalies_as_errors: bool = False + # Errors to be ignored, e.g. timeouts ignored_errors: List[str] = field( default_factory=lambda: [ diff --git a/metaphor/monte_carlo/extractor.py b/metaphor/monte_carlo/extractor.py index 2eea281c..c680bf6e 100644 --- a/metaphor/monte_carlo/extractor.py +++ b/metaphor/monte_carlo/extractor.py @@ -1,4 +1,5 @@ import re +from datetime import datetime, timedelta from typing import Collection, Dict, List from dateutil import parser @@ -53,6 +54,13 @@ "P3": DataMonitorSeverity.LOW, } +alert_severity_map = { + "SEV_1": DataMonitorSeverity.HIGH, + "SEV_2": DataMonitorSeverity.MEDIUM, + "SEV_3": DataMonitorSeverity.LOW, + "SEV_4": DataMonitorSeverity.LOW, +} + connection_type_platform_map = { "BIGQUERY": DataPlatform.BIGQUERY, "REDSHIFT": DataPlatform.REDSHIFT, @@ -79,6 +87,8 @@ def __init__(self, config: MonteCarloRunConfig): ) self._ignored_errors = config.ignored_errors + self._treat_unhandled_anomalies_as_errors = config.treat_unhandled_anomalies_as_errors + self._client = Client( session=Session(mcd_id=config.api_key_id, mcd_token=config.api_key_secret) ) @@ -93,6 +103,9 @@ async def extract(self) -> Collection[ENTITY_TYPES]: self._fetch_tables() self._fetch_monitors() + if self._treat_unhandled_anomalies_as_errors: + self._fetch_alerts() + return self._datasets.values() def _fetch_monitors(self) -> None: @@ -113,7 +126,6 @@ def _fetch_monitors(self) -> None: uuid name description - entities entityMcons priority monitorStatus @@ -138,6 +150,78 @@ def _fetch_monitors(self) -> None: json_dump_to_debug_file(monitors, "getMonitors.json") self._parse_monitors(monitors) + def _fetch_alerts(self) -> None: + """Fetch all alerts + + See https://apidocs.getmontecarlo.com/#query-getAlerts + """ + + batch_size = 200 + end_cursor = None + created_after = datetime.now() - timedelta(days=30) + create_before = datetime.now() + + result: List[Dict] = [] + + while True: + logger.info(f"Querying getAlerts after {end_cursor} ({len(result)} tables)") + resp = self._client( + """ + query getAlerts($first: Int, $after: String, $createdAfter: DateTime!, $createdBefore: DateTime!) { + getAlerts(first: $first + after: $after + filter: { + include:{ + types: [ANOMALIES] + } + } + createdTime: {before: $createdBefore, after: $createdAfter} + ) { + edges { + node { + id + title + type + status + severity + priority + owner { + email + } + createdTime + tables { + mcon + } + } + } + pageInfo { + endCursor + hasNextPage + } + } + } + """, + { + "first": batch_size, + "after": end_cursor, + "createdAfter": created_after.isoformat(), + "createdBefore": create_before.isoformat(), + }, + ) + + nodes = [edge["node"] for edge in resp["get_alerts"]["edges"]] + result.extend(nodes) + + if not resp["get_alerts"]["page_info"]["has_next_page"]: + break + + end_cursor = resp["get_alerts"]["page_info"]["end_cursor"] + + logger.info(f"Fetched {len(result)} alerts") + json_dump_to_debug_file(result, "getAlerts.json") + + self._parse_alerts(result) + def _fetch_tables(self) -> None: """Fetch all tables @@ -221,26 +305,65 @@ def _parse_monitors(self, monitors) -> None: exceptions=exceptions, ) - if monitor["entities"] is None or monitor["entityMcons"] is None: + if monitor["entityMcons"] is None: logger.info(f"Skipping monitors not linked to any entities: {uuid}") continue - for index, entity in enumerate(monitor["entities"]): - if index > len(monitor["entityMcons"]) - 1: - logger.warning(f"Unmatched entity mcon in monitor {monitor}") - break - - mcon = monitor["entityMcons"][index] + for mcon in monitor["entityMcons"]: platform = self._mcon_platform_map.get(mcon) if platform is None: logger.warning(f"Unable to determine platform for {mcon}") continue - name = self._convert_dataset_name(entity) + name = self._extract_dataset_name(mcon) dataset = self._init_dataset(name, platform) dataset.data_quality.url = f"{assets_base_url}/{mcon}/custom-monitors" dataset.data_quality.monitors.append(data_monitor) + def _parse_alerts(self, alerts) -> None: + for alert in alerts: + id = alert["id"] + + if alert["tables"] is None or len(alert["tables"]) == 0: + logger.info(f"Skipping alert not linked to any tables: {id}") + continue + + # Filter out alerts with status (e.g. "ACKNOWLEDGED", "EXPECTED") + if alert["status"] is not None: + continue + + # Alerts triggered by anomalies do not have any inherited priority. + # Derive it from assigned severity instead. + alert_severity = alert_severity_map.get( + alert["severity"], DataMonitorSeverity.UNKNOWN + ) + + owner = None + if alert["owner"] is not None and alert["owner"]["email"] is not None: + owner = alert["owner"]["email"] + + data_monitor = DataMonitor( + title=alert["title"], + description=alert["title"], + owner=owner, + status=DataMonitorStatus.ERROR, + severity=alert_severity, + last_run=parser.parse(alert["createdTime"]), + targets=[], + ) + + for table in alert.get("tables", []): + + mcon = table["mcon"] + platform = self._mcon_platform_map.get(mcon) + if platform is None: + logger.warning(f"Unable to determine platform for {mcon}") + continue + + name = self._extract_dataset_name(mcon) + dataset = self._init_dataset(name, platform) + dataset.data_quality.monitors.append(data_monitor) + def _parse_monitor_status(self, monitor: Dict): status = monitor_status_map.get( monitor["monitorStatus"], DataMonitorStatus.UNKNOWN @@ -258,9 +381,13 @@ def _parse_monitor_status(self, monitor: Dict): return status @staticmethod - def _convert_dataset_name(entity: str) -> str: - """entity name format is :.""" - return normalize_full_dataset_name(entity.replace(":", ".", 1)) + def _extract_dataset_name(mcon: str) -> str: + """Extract dataset name form MCON""" + + # MCON has the following format: + # MCON++{account_uuid}++{resource_uuid}++table++{db:schema.table} + _, _, _, _, obj_name = mcon.split("++") + return normalize_full_dataset_name(obj_name.replace(":", ".", 1)) def _init_dataset(self, normalized_name: str, platform: DataPlatform) -> Dataset: account = self._account if platform == DataPlatform.SNOWFLAKE else None diff --git a/pyproject.toml b/pyproject.toml index f6561cfd..a92891e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.162" +version = "0.14.163" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/monte_carlo/config.yml b/tests/monte_carlo/config.yml index 5f28e405..3e50f68c 100644 --- a/tests/monte_carlo/config.yml +++ b/tests/monte_carlo/config.yml @@ -1,4 +1,5 @@ --- api_key_id: key_id api_key_secret: key_secret +treat_unhandled_anomalies_as_errors: true output: {} diff --git a/tests/monte_carlo/expected.json b/tests/monte_carlo/expected.json index e518b314..8e649825 100644 --- a/tests/monte_carlo/expected.json +++ b/tests/monte_carlo/expected.json @@ -11,6 +11,15 @@ "targets": [], "title": "auto_monitor_name_cd5b69bd-e465-4545-b3f9-a5d507ea766c", "url": "https://getmontecarlo.com/monitors/e0dc143e-dd8a-4cb9-b4cc-dedec715d955" + }, + { + "description": "Anomaly 1", + "lastRun": "2023-06-23T03:54:35.817000+00:00", + "owner": "yi@metaphor.io", + "severity": "HIGH", + "status": "ERROR", + "targets": [], + "title": "Anomaly 1" } ], "provider": "MONTE_CARLO", diff --git a/tests/monte_carlo/test_config.py b/tests/monte_carlo/test_config.py index 84e402f8..b876046b 100644 --- a/tests/monte_carlo/test_config.py +++ b/tests/monte_carlo/test_config.py @@ -10,5 +10,6 @@ def test_yaml_config(test_root_dir): assert config == MonteCarloRunConfig( api_key_id="key_id", api_key_secret="key_secret", + treat_unhandled_anomalies_as_errors=True, output=OutputConfig(), ) diff --git a/tests/monte_carlo/test_extractor.py b/tests/monte_carlo/test_extractor.py index 2e5be17f..2b599f84 100644 --- a/tests/monte_carlo/test_extractor.py +++ b/tests/monte_carlo/test_extractor.py @@ -14,6 +14,7 @@ def dummy_config(): api_key_id="key_id", api_key_secret="key_secret", snowflake_account="snow", + treat_unhandled_anomalies_as_errors=True, output=OutputConfig(), ) @@ -62,7 +63,6 @@ async def test_extractor(mock_pycarlo_client: MagicMock, test_root_dir: str): "uuid": "e0dc143e-dd8a-4cb9-b4cc-dedec715d955", "name": "auto_monitor_name_cd5b69bd-e465-4545-b3f9-a5d507ea766c", "description": "Field Health for all fields in db:metaphor.test1", - "entities": ["db:metaphor.test1"], "entityMcons": [ "MCON++6418a1e2-9718-4413-9d2b-6a354e01ddf8++a19e22b4-7659-4064-8fd4-8d6122fabe1c++table++db:metaphor.test1" ], @@ -77,7 +77,6 @@ async def test_extractor(mock_pycarlo_client: MagicMock, test_root_dir: str): "uuid": "ce4c4568-35f4-4365-a6fe-95f233fcf6c3", "name": "auto_monitor_name_53c985e6-8f49-4af7-8ef1-7b402a27538b", "description": "Field Health for all fields in db:metaphor.test2", - "entities": ["db:metaphor.test2"], "entityMcons": [ "MCON++6418a1e2-9718-4413-9d2b-6a354e01ddf8++a19e22b4-7659-4064-8fd4-8d6122fabe1c++table++db:metaphor.test2" ], @@ -92,7 +91,6 @@ async def test_extractor(mock_pycarlo_client: MagicMock, test_root_dir: str): "uuid": "2c156c8d-ab4a-432f-b8bb-f9ea9f31ed3d", "name": "auto_monitor_name_18637195-a3c4-416e-a3e2-a89cc10adbc8", "description": "Field Health for all fields in db:metaphor.test3", - "entities": ["db:metaphor.test3"], "entityMcons": [ "MCON++6418a1e2-9718-4413-9d2b-6a354e01ddf8++a19e22b4-7659-4064-8fd4-8d6122fabe1c++table++db:metaphor.test3" ], @@ -107,7 +105,6 @@ async def test_extractor(mock_pycarlo_client: MagicMock, test_root_dir: str): "uuid": "d14af7d8-6342-420a-bb09-5805fad677f1", "name": "auto_monitor_name_693b98e3-950d-472b-83fe-8c8e5b5979f9", "description": "Field Health for all fields in db:metaphor.test4", - "entities": ["db:metaphor.test4"], "entityMcons": [ "MCON++6418a1e2-9718-4413-9d2b-6a354e01ddf8++a19e22b4-7659-4064-8fd4-8d6122fabe1c++table++db:metaphor.test4" ], @@ -133,6 +130,52 @@ async def test_extractor(mock_pycarlo_client: MagicMock, test_root_dir: str): }, ] }, + { + "get_alerts": { + "edges": [ + { + "node": { + "id": "1", + "title": "Anomaly 1", + "type": "ANOMALIES", + "status": None, + "severity": "SEV_1", + "priority": None, + "owner": { + "email": "yi@metaphor.io", + }, + "createdTime": "2023-06-23T03:54:35.817000+00:00", + "tables": [ + { + "mcon": "MCON++6418a1e2-9718-4413-9d2b-6a354e01ddf8++a19e22b4-7659-4064-8fd4-8d6122fabe1c++table++db:metaphor.test1", + } + ] + } + }, + { + "node": { + "id": "1", + "title": "Handled anomaly", + "type": "ANOMALIES", + "status": "EXPECTED", + "severity": "SEV_2", + "priority": None, + "owner": None, + "createdTime": "2023-06-23T03:54:35.817000+00:00", + "tables": [ + { + "mcon": "MCON++6418a1e2-9718-4413-9d2b-6a354e01ddf8++a19e22b4-7659-4064-8fd4-8d6122fabe1c++table++db:metaphor.test1", + } + ] + } + }, + ], + "page_info": { + "end_corsor": "cursor", + "has_next_page": False, + }, + } + }, ] config = dummy_config()