From 6474af18500a7a3ea52a5ecdf5f52201ddc89d2c Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Tue, 17 Dec 2024 15:29:53 -0800 Subject: [PATCH 1/7] fix(ingest/mode): Handle 204 response and invalid json --- .../src/datahub/ingestion/source/mode.py | 10 ++ .../tests/integration/mode/test_mode.py | 142 ++++++++++++++++-- 2 files changed, 136 insertions(+), 16 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py index c1ab9271ce13ae..6dfd936c66f606 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mode.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py @@ -2,6 +2,8 @@ import logging import re import time +from json import JSONDecodeError + from dataclasses import dataclass from datetime import datetime, timezone from functools import lru_cache @@ -15,6 +17,7 @@ import yaml from liquid import Template, Undefined from pydantic import Field, validator +from requests import Response from requests.adapters import HTTPAdapter, Retry from requests.exceptions import ConnectionError from requests.models import HTTPBasicAuth, HTTPError @@ -1466,10 +1469,13 @@ def _get_request_json(self, url: str) -> Dict: @r.wraps def get_request(): + response: Optional[Response] = None try: response = self.session.get( url, timeout=self.config.api_options.timeout ) + if response.status_code == 204: + return {} return response.json() except HTTPError as http_error: error_response = http_error.response @@ -1481,6 +1487,10 @@ def get_request(): raise HTTPError429 raise http_error + except JSONDecodeError as json_error: + raise HTTPError( + f"{json_error.__class__.__name__}: {json_error}" + ) from json_error return get_request() diff --git a/metadata-ingestion/tests/integration/mode/test_mode.py b/metadata-ingestion/tests/integration/mode/test_mode.py index ce7533d5611e49..d44bf80662e647 100644 --- a/metadata-ingestion/tests/integration/mode/test_mode.py +++ b/metadata-ingestion/tests/integration/mode/test_mode.py @@ -1,11 +1,14 @@ import json import pathlib +from typing import Sequence from unittest.mock import patch +import pytest from freezegun import freeze_time from requests.models import HTTPError from datahub.configuration.common import PipelineExecutionError +from datahub.ingestion.api.source import StructuredLogEntry from datahub.ingestion.run.pipeline import Pipeline from tests.test_helpers import mce_helpers @@ -28,13 +31,14 @@ "https://app.mode.com/api/acryl/reports/24f66e1701b6/queries": "dataset_queries_24f66e1701b6.json", } -RESPONSE_ERROR_LIST = ["https://app.mode.com/api/acryl/spaces/75737b70402e/reports"] +ERROR_URL = "https://app.mode.com/api/acryl/spaces/75737b70402e/reports" test_resources_dir = pathlib.Path(__file__).parent class MockResponse: def __init__(self, error_list, status_code): + self.text = None self.json_data = None self.error_list = error_list self.status_code = status_code @@ -49,6 +53,14 @@ def mount(self, prefix, adaptor): return self def get(self, url, timeout=40): + if self.error_list is not None and self.url in self.error_list: + http_error_msg = "{} Client Error: {} for url: {}".format( + 400, + "Simulate error", + self.url, + ) + raise HTTPError(http_error_msg, response=self) + self.url = url self.timeout = timeout response_json_path = f"{test_resources_dir}/setup/{JSON_RESPONSE_MAP.get(url)}" @@ -57,29 +69,46 @@ def get(self, url, timeout=40): self.json_data = data return self - def raise_for_status(self): - if self.error_list is not None and self.url in self.error_list: - http_error_msg = "{} Client Error: {} for url: {}".format( - 400, - "Simulate error", - self.url, - ) - raise HTTPError(http_error_msg, response=self) + +class MockResponseJson(MockResponse): + def __init__( + self, + status_code=200, + *, + json_empty_list: Sequence[str] = (), + json_error_list: Sequence[str] = (), + ): + super().__init__(None, status_code) + self.json_empty_list = json_empty_list + self.json_error_list = json_error_list + + def json(self): + if self.url in self.json_empty_list: + return json.loads("") # Shouldn't be called + if self.url in self.json_error_list: + return json.loads("{") + return super().json() + + def get(self, url, timeout=40): + response = super().get(url, timeout) + if self.url in self.json_empty_list: + response.status_code = 204 + return response -def mocked_requests_sucess(*args, **kwargs): +def mocked_requests_success(*args, **kwargs): return MockResponse(None, 200) def mocked_requests_failure(*args, **kwargs): - return MockResponse(RESPONSE_ERROR_LIST, 200) + return MockResponse([ERROR_URL], 200) @freeze_time(FROZEN_TIME) def test_mode_ingest_success(pytestconfig, tmp_path): with patch( "datahub.ingestion.source.mode.requests.Session", - side_effect=mocked_requests_sucess, + side_effect=mocked_requests_success, ): pipeline = Pipeline.create( { @@ -142,8 +171,89 @@ def test_mode_ingest_failure(pytestconfig, tmp_path): } ) pipeline.run() - try: + with pytest.raises(PipelineExecutionError) as exec_error: pipeline.raise_from_status() - except PipelineExecutionError as exec_error: - assert exec_error.args[0] == "Source reported errors" - assert len(exec_error.args[1].failures) == 1 + assert exec_error.value.args[0] == "Source reported errors" + assert len(exec_error.value.args[1].failures) == 1 + error_dict: StructuredLogEntry + _level, error_dict = exec_error.value.args[1].failures[0] + error = next(iter(error_dict.context)) + assert "Simulate error" in error + assert ERROR_URL in error + + +@freeze_time(FROZEN_TIME) +def test_mode_ingest_json_empty(pytestconfig, tmp_path): + with patch( + "datahub.ingestion.source.mode.requests.Session", + side_effect=lambda *args, **kwargs: MockResponseJson( + json_empty_list=["https://app.mode.com/api/modeuser"] + ), + ): + global test_resources_dir + test_resources_dir = pytestconfig.rootpath / "tests/integration/mode" + + pipeline = Pipeline.create( + { + "run_id": "mode-test", + "source": { + "type": "mode", + "config": { + "token": "xxxx", + "password": "xxxx", + "connect_uri": "https://app.mode.com/", + "workspace": "acryl", + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/mode_mces.json", + }, + }, + } + ) + pipeline.run() + pipeline.raise_from_status(raise_warnings=True) + + +@freeze_time(FROZEN_TIME) +def test_mode_ingest_json_failure(pytestconfig, tmp_path): + with patch( + "datahub.ingestion.source.mode.requests.Session", + side_effect=lambda *args, **kwargs: MockResponseJson( + json_error_list=["https://app.mode.com/api/modeuser"] + ), + ): + global test_resources_dir + test_resources_dir = pytestconfig.rootpath / "tests/integration/mode" + + pipeline = Pipeline.create( + { + "run_id": "mode-test", + "source": { + "type": "mode", + "config": { + "token": "xxxx", + "password": "xxxx", + "connect_uri": "https://app.mode.com/", + "workspace": "acryl", + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/mode_mces.json", + }, + }, + } + ) + pipeline.run() + pipeline.raise_from_status(raise_warnings=False) + with pytest.raises(PipelineExecutionError) as exec_error: + pipeline.raise_from_status(raise_warnings=True) + assert len(exec_error.value.args[1].warnings) > 0 + error_dict: StructuredLogEntry + _level, error_dict = exec_error.value.args[1].warnings[0] + error = next(iter(error_dict.context)) + assert "JSONDecodeError" in error From d70b0464e7ce08a3a3b77eeb752fec4bd1c8c5c6 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Tue, 17 Dec 2024 15:32:46 -0800 Subject: [PATCH 2/7] cleanup --- metadata-ingestion/src/datahub/ingestion/source/mode.py | 1 - 1 file changed, 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py index 6dfd936c66f606..793b9516c667eb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mode.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py @@ -1469,7 +1469,6 @@ def _get_request_json(self, url: str) -> Dict: @r.wraps def get_request(): - response: Optional[Response] = None try: response = self.session.get( url, timeout=self.config.api_options.timeout From 057064263df5fe6ca9ee1bfd4fa9dc91caeba8cf Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Tue, 17 Dec 2024 15:34:23 -0800 Subject: [PATCH 3/7] more cleanup --- metadata-ingestion/tests/integration/mode/test_mode.py | 1 - 1 file changed, 1 deletion(-) diff --git a/metadata-ingestion/tests/integration/mode/test_mode.py b/metadata-ingestion/tests/integration/mode/test_mode.py index d44bf80662e647..b32f20ee56a51b 100644 --- a/metadata-ingestion/tests/integration/mode/test_mode.py +++ b/metadata-ingestion/tests/integration/mode/test_mode.py @@ -38,7 +38,6 @@ class MockResponse: def __init__(self, error_list, status_code): - self.text = None self.json_data = None self.error_list = error_list self.status_code = status_code From f2ca543cb624d37c1150ab37967d1564f59c2907 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Sat, 21 Dec 2024 05:51:19 -0800 Subject: [PATCH 4/7] lint --- metadata-ingestion/src/datahub/ingestion/source/mode.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py index 793b9516c667eb..a4df851c487a8c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mode.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py @@ -2,11 +2,10 @@ import logging import re import time -from json import JSONDecodeError - from dataclasses import dataclass from datetime import datetime, timezone from functools import lru_cache +from json import JSONDecodeError from typing import Dict, Iterable, List, Optional, Set, Tuple, Union import dateutil.parser as dp From c7de7da8705a5def9733cffef969d071cdf603ba Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Sat, 21 Dec 2024 05:55:14 -0800 Subject: [PATCH 5/7] pr feedback --- .../src/datahub/ingestion/source/mode.py | 49 +++++++++---------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py index a4df851c487a8c..ce98d3e5c2716c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mode.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py @@ -194,6 +194,9 @@ class HTTPError429(HTTPError): pass +ModeRequestError = (HTTPError, JSONDecodeError) + + @dataclass class ModeSourceReport(StaleEntityRemovalSourceReport): filtered_spaces: LossyList[str] = dataclasses.field(default_factory=LossyList) @@ -329,11 +332,11 @@ def __init__(self, ctx: PipelineContext, config: ModeConfig): # Test the connection try: self._get_request_json(f"{self.config.connect_uri}/api/verify") - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Connect", message="Unable to verify connection to mode.", - context=f"Error: {str(http_error)}", + context=f"Error: {str(e)}", ) self.workspace_uri = f"{self.config.connect_uri}/api/{self.config.workspace}" @@ -522,11 +525,11 @@ def _get_creator(self, href: str) -> Optional[str]: if self.config.owner_username_instead_of_email else user_json.get("email") ) - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_warning( title="Failed to retrieve Mode creator", message=f"Unable to retrieve user for {href}", - context=f"Reason: {str(http_error)}", + context=f"Reason: {str(e)}", ) return user @@ -572,11 +575,11 @@ def _get_space_name_and_tokens(self) -> dict: logging.debug(f"Skipping space {space_name} due to space pattern") continue space_info[s.get("token", "")] = s.get("name", "") - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Retrieve Spaces", message="Unable to retrieve spaces / collections for workspace.", - context=f"Workspace: {self.workspace_uri}, Error: {str(http_error)}", + context=f"Workspace: {self.workspace_uri}, Error: {str(e)}", ) return space_info @@ -722,11 +725,11 @@ def _get_data_sources(self) -> List[dict]: try: ds_json = self._get_request_json(f"{self.workspace_uri}/data_sources") data_sources = ds_json.get("_embedded", {}).get("data_sources", []) - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to retrieve Data Sources", message="Unable to retrieve data sources from Mode.", - context=f"Error: {str(http_error)}", + context=f"Error: {str(e)}", ) return data_sources @@ -813,11 +816,11 @@ def _get_definition(self, definition_name): if definition.get("name", "") == definition_name: return definition.get("source", "") - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Retrieve Definition", message="Unable to retrieve definition from Mode.", - context=f"Definition Name: {definition_name}, Error: {str(http_error)}", + context=f"Definition Name: {definition_name}, Error: {str(e)}", ) return None @@ -1377,11 +1380,11 @@ def _get_reports(self, space_token: str) -> List[dict]: f"{self.workspace_uri}/spaces/{space_token}/reports" ) reports = reports_json.get("_embedded", {}).get("reports", {}) - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Retrieve Reports for Space", message="Unable to retrieve reports for space token.", - context=f"Space Token: {space_token}, Error: {str(http_error)}", + context=f"Space Token: {space_token}, Error: {str(e)}", ) return reports @@ -1395,11 +1398,11 @@ def _get_datasets(self, space_token: str) -> List[dict]: url = f"{self.workspace_uri}/spaces/{space_token}/datasets" datasets_json = self._get_request_json(url) datasets = datasets_json.get("_embedded", {}).get("reports", []) - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Retrieve Datasets for Space", message=f"Unable to retrieve datasets for space token {space_token}.", - context=f"Error: {str(http_error)}", + context=f"Error: {str(e)}", ) return datasets @@ -1411,11 +1414,11 @@ def _get_queries(self, report_token: str) -> list: f"{self.workspace_uri}/reports/{report_token}/queries" ) queries = queries_json.get("_embedded", {}).get("queries", {}) - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Retrieve Queries", message="Unable to retrieve queries for report token.", - context=f"Report Token: {report_token}, Error: {str(http_error)}", + context=f"Report Token: {report_token}, Error: {str(e)}", ) return queries @@ -1428,11 +1431,11 @@ def _get_last_query_run( f"{self.workspace_uri}/reports/{report_token}/runs/{report_run_id}/query_runs{query_run_id}" ) queries = queries_json.get("_embedded", {}).get("queries", {}) - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Retrieve Queries for Report", message="Unable to retrieve queries for report token.", - context=f"Report Token:{report_token}, Error: {str(http_error)}", + context=f"Report Token:{report_token}, Error: {str(e)}", ) return {} return queries @@ -1446,13 +1449,13 @@ def _get_charts(self, report_token: str, query_token: str) -> list: f"/queries/{query_token}/charts" ) charts = charts_json.get("_embedded", {}).get("charts", {}) - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Retrieve Charts", message="Unable to retrieve charts from Mode.", context=f"Report Token: {report_token}, " f"Query token: {query_token}, " - f"Error: {str(http_error)}", + f"Error: {str(e)}", ) return charts @@ -1472,7 +1475,7 @@ def get_request(): response = self.session.get( url, timeout=self.config.api_options.timeout ) - if response.status_code == 204: + if response.status_code == 204: # No content, don't parse json return {} return response.json() except HTTPError as http_error: @@ -1485,10 +1488,6 @@ def get_request(): raise HTTPError429 raise http_error - except JSONDecodeError as json_error: - raise HTTPError( - f"{json_error.__class__.__name__}: {json_error}" - ) from json_error return get_request() From 33e40c06552b927baf9d1145bd1c8ce6e13a0608 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Mon, 23 Dec 2024 11:26:33 -0800 Subject: [PATCH 6/7] lint --- metadata-ingestion/src/datahub/ingestion/source/mode.py | 1 - metadata-ingestion/tests/integration/mode/test_mode.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py index ce98d3e5c2716c..1a8145afaa32ef 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mode.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py @@ -16,7 +16,6 @@ import yaml from liquid import Template, Undefined from pydantic import Field, validator -from requests import Response from requests.adapters import HTTPAdapter, Retry from requests.exceptions import ConnectionError from requests.models import HTTPBasicAuth, HTTPError diff --git a/metadata-ingestion/tests/integration/mode/test_mode.py b/metadata-ingestion/tests/integration/mode/test_mode.py index b32f20ee56a51b..a492a68a2217a9 100644 --- a/metadata-ingestion/tests/integration/mode/test_mode.py +++ b/metadata-ingestion/tests/integration/mode/test_mode.py @@ -72,7 +72,7 @@ def get(self, url, timeout=40): class MockResponseJson(MockResponse): def __init__( self, - status_code=200, + status_code: int = 200, *, json_empty_list: Sequence[str] = (), json_error_list: Sequence[str] = (), From c05718f8d30cf113bcc9be5cdc11716e907925a9 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Tue, 24 Dec 2024 00:25:56 -0800 Subject: [PATCH 7/7] fix test --- metadata-ingestion/tests/integration/mode/test_mode.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/tests/integration/mode/test_mode.py b/metadata-ingestion/tests/integration/mode/test_mode.py index a492a68a2217a9..7f1e3935aa0fa1 100644 --- a/metadata-ingestion/tests/integration/mode/test_mode.py +++ b/metadata-ingestion/tests/integration/mode/test_mode.py @@ -255,4 +255,4 @@ def test_mode_ingest_json_failure(pytestconfig, tmp_path): error_dict: StructuredLogEntry _level, error_dict = exec_error.value.args[1].warnings[0] error = next(iter(error_dict.context)) - assert "JSONDecodeError" in error + assert "Expecting property name enclosed in double quotes" in error