Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/mode): Handle 204 response and invalid json #12156

Merged
merged 9 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dataclasses import dataclass
from datetime import datetime, timezone
from functools import lru_cache
from json import JSONDecodeError
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union

import dateutil.parser as dp
Expand Down Expand Up @@ -193,6 +194,9 @@
pass


ModeRequestError = (HTTPError, JSONDecodeError)


@dataclass
class ModeSourceReport(StaleEntityRemovalSourceReport):
filtered_spaces: LossyList[str] = dataclasses.field(default_factory=LossyList)
Expand Down Expand Up @@ -328,11 +332,11 @@
# Test the connection
try:
self._get_request_json(f"{self.config.connect_uri}/api/verify")
except HTTPError as http_error:
except ModeRequestError as e:

Check warning on line 335 in metadata-ingestion/src/datahub/ingestion/source/mode.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mode.py#L335

Added line #L335 was not covered by tests
self.report.report_failure(
title="Failed to Connect",
message="Unable to verify connection to mode.",
context=f"Error: {str(http_error)}",
context=f"Error: {str(e)}",
)

self.workspace_uri = f"{self.config.connect_uri}/api/{self.config.workspace}"
Expand Down Expand Up @@ -521,11 +525,11 @@
if self.config.owner_username_instead_of_email
else user_json.get("email")
)
except HTTPError as http_error:
except ModeRequestError as e:
self.report.report_warning(
title="Failed to retrieve Mode creator",
message=f"Unable to retrieve user for {href}",
context=f"Reason: {str(http_error)}",
context=f"Reason: {str(e)}",
)
return user

Expand Down Expand Up @@ -571,11 +575,11 @@
logging.debug(f"Skipping space {space_name} due to space pattern")
continue
space_info[s.get("token", "")] = s.get("name", "")
except HTTPError as http_error:
except ModeRequestError as e:

Check warning on line 578 in metadata-ingestion/src/datahub/ingestion/source/mode.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mode.py#L578

Added line #L578 was not covered by tests
self.report.report_failure(
title="Failed to Retrieve Spaces",
message="Unable to retrieve spaces / collections for workspace.",
context=f"Workspace: {self.workspace_uri}, Error: {str(http_error)}",
context=f"Workspace: {self.workspace_uri}, Error: {str(e)}",
)

return space_info
Expand Down Expand Up @@ -721,11 +725,11 @@
try:
ds_json = self._get_request_json(f"{self.workspace_uri}/data_sources")
data_sources = ds_json.get("_embedded", {}).get("data_sources", [])
except HTTPError as http_error:
except ModeRequestError as e:

Check warning on line 728 in metadata-ingestion/src/datahub/ingestion/source/mode.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mode.py#L728

Added line #L728 was not covered by tests
self.report.report_failure(
title="Failed to retrieve Data Sources",
message="Unable to retrieve data sources from Mode.",
context=f"Error: {str(http_error)}",
context=f"Error: {str(e)}",
)

return data_sources
Expand Down Expand Up @@ -812,11 +816,11 @@
if definition.get("name", "") == definition_name:
return definition.get("source", "")

except HTTPError as http_error:
except ModeRequestError as e:

Check warning on line 819 in metadata-ingestion/src/datahub/ingestion/source/mode.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mode.py#L819

Added line #L819 was not covered by tests
self.report.report_failure(
title="Failed to Retrieve Definition",
message="Unable to retrieve definition from Mode.",
context=f"Definition Name: {definition_name}, Error: {str(http_error)}",
context=f"Definition Name: {definition_name}, Error: {str(e)}",
)
return None

Expand Down Expand Up @@ -1382,11 +1386,11 @@
f"{self.workspace_uri}/spaces/{space_token}/reports"
)
reports = reports_json.get("_embedded", {}).get("reports", {})
except HTTPError as http_error:
except ModeRequestError as e:

Check warning on line 1389 in metadata-ingestion/src/datahub/ingestion/source/mode.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mode.py#L1389

Added line #L1389 was not covered by tests
self.report.report_failure(
title="Failed to Retrieve Reports for Space",
message="Unable to retrieve reports for space token.",
context=f"Space Token: {space_token}, Error: {str(http_error)}",
context=f"Space Token: {space_token}, Error: {str(e)}",
)
return reports

Expand All @@ -1400,11 +1404,11 @@
url = f"{self.workspace_uri}/spaces/{space_token}/datasets"
datasets_json = self._get_request_json(url)
datasets = datasets_json.get("_embedded", {}).get("reports", [])
except HTTPError as http_error:
except ModeRequestError as e:

Check warning on line 1407 in metadata-ingestion/src/datahub/ingestion/source/mode.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mode.py#L1407

Added line #L1407 was not covered by tests
self.report.report_failure(
title="Failed to Retrieve Datasets for Space",
message=f"Unable to retrieve datasets for space token {space_token}.",
context=f"Error: {str(http_error)}",
context=f"Error: {str(e)}",
)
return datasets

Expand All @@ -1416,11 +1420,11 @@
f"{self.workspace_uri}/reports/{report_token}/queries"
)
queries = queries_json.get("_embedded", {}).get("queries", {})
except HTTPError as http_error:
except ModeRequestError as e:

Check warning on line 1423 in metadata-ingestion/src/datahub/ingestion/source/mode.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mode.py#L1423

Added line #L1423 was not covered by tests
self.report.report_failure(
title="Failed to Retrieve Queries",
message="Unable to retrieve queries for report token.",
context=f"Report Token: {report_token}, Error: {str(http_error)}",
context=f"Report Token: {report_token}, Error: {str(e)}",
)
return queries

Expand All @@ -1433,11 +1437,11 @@
f"{self.workspace_uri}/reports/{report_token}/runs/{report_run_id}/query_runs{query_run_id}"
)
queries = queries_json.get("_embedded", {}).get("queries", {})
except HTTPError as http_error:
except ModeRequestError as e:

Check warning on line 1440 in metadata-ingestion/src/datahub/ingestion/source/mode.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mode.py#L1440

Added line #L1440 was not covered by tests
self.report.report_failure(
title="Failed to Retrieve Queries for Report",
message="Unable to retrieve queries for report token.",
context=f"Report Token:{report_token}, Error: {str(http_error)}",
context=f"Report Token:{report_token}, Error: {str(e)}",
)
return {}
return queries
Expand All @@ -1451,13 +1455,13 @@
f"/queries/{query_token}/charts"
)
charts = charts_json.get("_embedded", {}).get("charts", {})
except HTTPError as http_error:
except ModeRequestError as e:

Check warning on line 1458 in metadata-ingestion/src/datahub/ingestion/source/mode.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/mode.py#L1458

Added line #L1458 was not covered by tests
self.report.report_failure(
title="Failed to Retrieve Charts",
message="Unable to retrieve charts from Mode.",
context=f"Report Token: {report_token}, "
f"Query token: {query_token}, "
f"Error: {str(http_error)}",
f"Error: {str(e)}",
)
return charts

Expand All @@ -1477,6 +1481,8 @@
response = self.session.get(
url, timeout=self.config.api_options.timeout
)
if response.status_code == 204: # No content, don't parse json
return {}
return response.json()
except HTTPError as http_error:
error_response = http_error.response
Expand Down
141 changes: 125 additions & 16 deletions metadata-ingestion/tests/integration/mode/test_mode.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import json
import pathlib
from typing import Sequence
from unittest.mock import patch

import pytest
from freezegun import freeze_time
from requests.models import HTTPError

from datahub.configuration.common import PipelineExecutionError
from datahub.ingestion.api.source import StructuredLogEntry
from datahub.ingestion.run.pipeline import Pipeline
from tests.test_helpers import mce_helpers

Expand All @@ -28,7 +31,7 @@
"https://app.mode.com/api/acryl/reports/24f66e1701b6/queries": "dataset_queries_24f66e1701b6.json",
}

RESPONSE_ERROR_LIST = ["https://app.mode.com/api/acryl/spaces/75737b70402e/reports"]
ERROR_URL = "https://app.mode.com/api/acryl/spaces/75737b70402e/reports"

test_resources_dir = pathlib.Path(__file__).parent

Expand All @@ -49,6 +52,14 @@ def mount(self, prefix, adaptor):
return self

def get(self, url, timeout=40):
if self.error_list is not None and self.url in self.error_list:
http_error_msg = "{} Client Error: {} for url: {}".format(
400,
"Simulate error",
self.url,
)
raise HTTPError(http_error_msg, response=self)

self.url = url
self.timeout = timeout
response_json_path = f"{test_resources_dir}/setup/{JSON_RESPONSE_MAP.get(url)}"
Expand All @@ -57,29 +68,46 @@ def get(self, url, timeout=40):
self.json_data = data
return self

def raise_for_status(self):
if self.error_list is not None and self.url in self.error_list:
http_error_msg = "{} Client Error: {} for url: {}".format(
400,
"Simulate error",
self.url,
)
raise HTTPError(http_error_msg, response=self)

class MockResponseJson(MockResponse):
def __init__(
self,
status_code: int = 200,
*,
json_empty_list: Sequence[str] = (),
json_error_list: Sequence[str] = (),
):
super().__init__(None, status_code)
self.json_empty_list = json_empty_list
self.json_error_list = json_error_list

def json(self):
if self.url in self.json_empty_list:
return json.loads("") # Shouldn't be called
if self.url in self.json_error_list:
return json.loads("{")
return super().json()

def get(self, url, timeout=40):
response = super().get(url, timeout)
if self.url in self.json_empty_list:
response.status_code = 204
return response


def mocked_requests_sucess(*args, **kwargs):
def mocked_requests_success(*args, **kwargs):
return MockResponse(None, 200)


def mocked_requests_failure(*args, **kwargs):
return MockResponse(RESPONSE_ERROR_LIST, 200)
return MockResponse([ERROR_URL], 200)


@freeze_time(FROZEN_TIME)
def test_mode_ingest_success(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.Session",
side_effect=mocked_requests_sucess,
side_effect=mocked_requests_success,
):
pipeline = Pipeline.create(
{
Expand Down Expand Up @@ -142,8 +170,89 @@ def test_mode_ingest_failure(pytestconfig, tmp_path):
}
)
pipeline.run()
try:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously this wasn't actually testing anything... raise_from_status wasn't raising exceptions (we were overriding the implementation on the response object...) so we wouldn't assert anything. Changed up this file a bit so this test is actually testing stuff

with pytest.raises(PipelineExecutionError) as exec_error:
pipeline.raise_from_status()
except PipelineExecutionError as exec_error:
assert exec_error.args[0] == "Source reported errors"
assert len(exec_error.args[1].failures) == 1
assert exec_error.value.args[0] == "Source reported errors"
assert len(exec_error.value.args[1].failures) == 1
error_dict: StructuredLogEntry
_level, error_dict = exec_error.value.args[1].failures[0]
error = next(iter(error_dict.context))
assert "Simulate error" in error
assert ERROR_URL in error


@freeze_time(FROZEN_TIME)
def test_mode_ingest_json_empty(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.Session",
side_effect=lambda *args, **kwargs: MockResponseJson(
json_empty_list=["https://app.mode.com/api/modeuser"]
),
):
global test_resources_dir
test_resources_dir = pytestconfig.rootpath / "tests/integration/mode"

pipeline = Pipeline.create(
{
"run_id": "mode-test",
"source": {
"type": "mode",
"config": {
"token": "xxxx",
"password": "xxxx",
"connect_uri": "https://app.mode.com/",
"workspace": "acryl",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/mode_mces.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status(raise_warnings=True)


@freeze_time(FROZEN_TIME)
def test_mode_ingest_json_failure(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.Session",
side_effect=lambda *args, **kwargs: MockResponseJson(
json_error_list=["https://app.mode.com/api/modeuser"]
),
):
global test_resources_dir
test_resources_dir = pytestconfig.rootpath / "tests/integration/mode"

pipeline = Pipeline.create(
{
"run_id": "mode-test",
"source": {
"type": "mode",
"config": {
"token": "xxxx",
"password": "xxxx",
"connect_uri": "https://app.mode.com/",
"workspace": "acryl",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/mode_mces.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status(raise_warnings=False)
with pytest.raises(PipelineExecutionError) as exec_error:
pipeline.raise_from_status(raise_warnings=True)
assert len(exec_error.value.args[1].warnings) > 0
error_dict: StructuredLogEntry
_level, error_dict = exec_error.value.args[1].warnings[0]
error = next(iter(error_dict.context))
assert "Expecting property name enclosed in double quotes" in error
Loading