diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py index ef0b499129f97..68ecc5d8694ac 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mode.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py @@ -5,6 +5,7 @@ from dataclasses import dataclass from datetime import datetime, timezone from functools import lru_cache +from json import JSONDecodeError from typing import Dict, Iterable, List, Optional, Set, Tuple, Union import dateutil.parser as dp @@ -193,6 +194,9 @@ class HTTPError429(HTTPError): pass +ModeRequestError = (HTTPError, JSONDecodeError) + + @dataclass class ModeSourceReport(StaleEntityRemovalSourceReport): filtered_spaces: LossyList[str] = dataclasses.field(default_factory=LossyList) @@ -328,11 +332,11 @@ def __init__(self, ctx: PipelineContext, config: ModeConfig): # Test the connection try: self._get_request_json(f"{self.config.connect_uri}/api/verify") - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Connect", message="Unable to verify connection to mode.", - context=f"Error: {str(http_error)}", + context=f"Error: {str(e)}", ) self.workspace_uri = f"{self.config.connect_uri}/api/{self.config.workspace}" @@ -521,11 +525,11 @@ def _get_creator(self, href: str) -> Optional[str]: if self.config.owner_username_instead_of_email else user_json.get("email") ) - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_warning( title="Failed to retrieve Mode creator", message=f"Unable to retrieve user for {href}", - context=f"Reason: {str(http_error)}", + context=f"Reason: {str(e)}", ) return user @@ -571,11 +575,11 @@ def _get_space_name_and_tokens(self) -> dict: logging.debug(f"Skipping space {space_name} due to space pattern") continue space_info[s.get("token", "")] = s.get("name", "") - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Retrieve Spaces", message="Unable to retrieve spaces / collections for workspace.", - context=f"Workspace: {self.workspace_uri}, Error: {str(http_error)}", + context=f"Workspace: {self.workspace_uri}, Error: {str(e)}", ) return space_info @@ -721,11 +725,11 @@ def _get_data_sources(self) -> List[dict]: try: ds_json = self._get_request_json(f"{self.workspace_uri}/data_sources") data_sources = ds_json.get("_embedded", {}).get("data_sources", []) - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to retrieve Data Sources", message="Unable to retrieve data sources from Mode.", - context=f"Error: {str(http_error)}", + context=f"Error: {str(e)}", ) return data_sources @@ -812,11 +816,11 @@ def _get_definition(self, definition_name): if definition.get("name", "") == definition_name: return definition.get("source", "") - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Retrieve Definition", message="Unable to retrieve definition from Mode.", - context=f"Definition Name: {definition_name}, Error: {str(http_error)}", + context=f"Definition Name: {definition_name}, Error: {str(e)}", ) return None @@ -1382,11 +1386,11 @@ def _get_reports(self, space_token: str) -> List[dict]: f"{self.workspace_uri}/spaces/{space_token}/reports" ) reports = reports_json.get("_embedded", {}).get("reports", {}) - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Retrieve Reports for Space", message="Unable to retrieve reports for space token.", - context=f"Space Token: {space_token}, Error: {str(http_error)}", + context=f"Space Token: {space_token}, Error: {str(e)}", ) return reports @@ -1400,11 +1404,11 @@ def _get_datasets(self, space_token: str) -> List[dict]: url = f"{self.workspace_uri}/spaces/{space_token}/datasets" datasets_json = self._get_request_json(url) datasets = datasets_json.get("_embedded", {}).get("reports", []) - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Retrieve Datasets for Space", message=f"Unable to retrieve datasets for space token {space_token}.", - context=f"Error: {str(http_error)}", + context=f"Error: {str(e)}", ) return datasets @@ -1416,11 +1420,11 @@ def _get_queries(self, report_token: str) -> list: f"{self.workspace_uri}/reports/{report_token}/queries" ) queries = queries_json.get("_embedded", {}).get("queries", {}) - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Retrieve Queries", message="Unable to retrieve queries for report token.", - context=f"Report Token: {report_token}, Error: {str(http_error)}", + context=f"Report Token: {report_token}, Error: {str(e)}", ) return queries @@ -1433,11 +1437,11 @@ def _get_last_query_run( f"{self.workspace_uri}/reports/{report_token}/runs/{report_run_id}/query_runs{query_run_id}" ) queries = queries_json.get("_embedded", {}).get("queries", {}) - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Retrieve Queries for Report", message="Unable to retrieve queries for report token.", - context=f"Report Token:{report_token}, Error: {str(http_error)}", + context=f"Report Token:{report_token}, Error: {str(e)}", ) return {} return queries @@ -1451,13 +1455,13 @@ def _get_charts(self, report_token: str, query_token: str) -> list: f"/queries/{query_token}/charts" ) charts = charts_json.get("_embedded", {}).get("charts", {}) - except HTTPError as http_error: + except ModeRequestError as e: self.report.report_failure( title="Failed to Retrieve Charts", message="Unable to retrieve charts from Mode.", context=f"Report Token: {report_token}, " f"Query token: {query_token}, " - f"Error: {str(http_error)}", + f"Error: {str(e)}", ) return charts @@ -1477,6 +1481,8 @@ def get_request(): response = self.session.get( url, timeout=self.config.api_options.timeout ) + if response.status_code == 204: # No content, don't parse json + return {} return response.json() except HTTPError as http_error: error_response = http_error.response diff --git a/metadata-ingestion/tests/integration/mode/test_mode.py b/metadata-ingestion/tests/integration/mode/test_mode.py index ce7533d5611e4..7f1e3935aa0fa 100644 --- a/metadata-ingestion/tests/integration/mode/test_mode.py +++ b/metadata-ingestion/tests/integration/mode/test_mode.py @@ -1,11 +1,14 @@ import json import pathlib +from typing import Sequence from unittest.mock import patch +import pytest from freezegun import freeze_time from requests.models import HTTPError from datahub.configuration.common import PipelineExecutionError +from datahub.ingestion.api.source import StructuredLogEntry from datahub.ingestion.run.pipeline import Pipeline from tests.test_helpers import mce_helpers @@ -28,7 +31,7 @@ "https://app.mode.com/api/acryl/reports/24f66e1701b6/queries": "dataset_queries_24f66e1701b6.json", } -RESPONSE_ERROR_LIST = ["https://app.mode.com/api/acryl/spaces/75737b70402e/reports"] +ERROR_URL = "https://app.mode.com/api/acryl/spaces/75737b70402e/reports" test_resources_dir = pathlib.Path(__file__).parent @@ -49,6 +52,14 @@ def mount(self, prefix, adaptor): return self def get(self, url, timeout=40): + if self.error_list is not None and self.url in self.error_list: + http_error_msg = "{} Client Error: {} for url: {}".format( + 400, + "Simulate error", + self.url, + ) + raise HTTPError(http_error_msg, response=self) + self.url = url self.timeout = timeout response_json_path = f"{test_resources_dir}/setup/{JSON_RESPONSE_MAP.get(url)}" @@ -57,29 +68,46 @@ def get(self, url, timeout=40): self.json_data = data return self - def raise_for_status(self): - if self.error_list is not None and self.url in self.error_list: - http_error_msg = "{} Client Error: {} for url: {}".format( - 400, - "Simulate error", - self.url, - ) - raise HTTPError(http_error_msg, response=self) + +class MockResponseJson(MockResponse): + def __init__( + self, + status_code: int = 200, + *, + json_empty_list: Sequence[str] = (), + json_error_list: Sequence[str] = (), + ): + super().__init__(None, status_code) + self.json_empty_list = json_empty_list + self.json_error_list = json_error_list + + def json(self): + if self.url in self.json_empty_list: + return json.loads("") # Shouldn't be called + if self.url in self.json_error_list: + return json.loads("{") + return super().json() + + def get(self, url, timeout=40): + response = super().get(url, timeout) + if self.url in self.json_empty_list: + response.status_code = 204 + return response -def mocked_requests_sucess(*args, **kwargs): +def mocked_requests_success(*args, **kwargs): return MockResponse(None, 200) def mocked_requests_failure(*args, **kwargs): - return MockResponse(RESPONSE_ERROR_LIST, 200) + return MockResponse([ERROR_URL], 200) @freeze_time(FROZEN_TIME) def test_mode_ingest_success(pytestconfig, tmp_path): with patch( "datahub.ingestion.source.mode.requests.Session", - side_effect=mocked_requests_sucess, + side_effect=mocked_requests_success, ): pipeline = Pipeline.create( { @@ -142,8 +170,89 @@ def test_mode_ingest_failure(pytestconfig, tmp_path): } ) pipeline.run() - try: + with pytest.raises(PipelineExecutionError) as exec_error: pipeline.raise_from_status() - except PipelineExecutionError as exec_error: - assert exec_error.args[0] == "Source reported errors" - assert len(exec_error.args[1].failures) == 1 + assert exec_error.value.args[0] == "Source reported errors" + assert len(exec_error.value.args[1].failures) == 1 + error_dict: StructuredLogEntry + _level, error_dict = exec_error.value.args[1].failures[0] + error = next(iter(error_dict.context)) + assert "Simulate error" in error + assert ERROR_URL in error + + +@freeze_time(FROZEN_TIME) +def test_mode_ingest_json_empty(pytestconfig, tmp_path): + with patch( + "datahub.ingestion.source.mode.requests.Session", + side_effect=lambda *args, **kwargs: MockResponseJson( + json_empty_list=["https://app.mode.com/api/modeuser"] + ), + ): + global test_resources_dir + test_resources_dir = pytestconfig.rootpath / "tests/integration/mode" + + pipeline = Pipeline.create( + { + "run_id": "mode-test", + "source": { + "type": "mode", + "config": { + "token": "xxxx", + "password": "xxxx", + "connect_uri": "https://app.mode.com/", + "workspace": "acryl", + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/mode_mces.json", + }, + }, + } + ) + pipeline.run() + pipeline.raise_from_status(raise_warnings=True) + + +@freeze_time(FROZEN_TIME) +def test_mode_ingest_json_failure(pytestconfig, tmp_path): + with patch( + "datahub.ingestion.source.mode.requests.Session", + side_effect=lambda *args, **kwargs: MockResponseJson( + json_error_list=["https://app.mode.com/api/modeuser"] + ), + ): + global test_resources_dir + test_resources_dir = pytestconfig.rootpath / "tests/integration/mode" + + pipeline = Pipeline.create( + { + "run_id": "mode-test", + "source": { + "type": "mode", + "config": { + "token": "xxxx", + "password": "xxxx", + "connect_uri": "https://app.mode.com/", + "workspace": "acryl", + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/mode_mces.json", + }, + }, + } + ) + pipeline.run() + pipeline.raise_from_status(raise_warnings=False) + with pytest.raises(PipelineExecutionError) as exec_error: + pipeline.raise_from_status(raise_warnings=True) + assert len(exec_error.value.args[1].warnings) > 0 + error_dict: StructuredLogEntry + _level, error_dict = exec_error.value.args[1].warnings[0] + error = next(iter(error_dict.context)) + assert "Expecting property name enclosed in double quotes" in error