diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index bf80172441405f..f14c080df644a1 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -722,6 +722,7 @@ "snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource", "snowflake-queries = datahub.ingestion.source.snowflake.snowflake_queries:SnowflakeQueriesSource", "superset = datahub.ingestion.source.superset:SupersetSource", + "preset = datahub.ingestion.source.preset:PresetSource", "tableau = datahub.ingestion.source.tableau.tableau:TableauSource", "openapi = datahub.ingestion.source.openapi:OpenApiSource", "metabase = datahub.ingestion.source.metabase:MetabaseSource", diff --git a/metadata-ingestion/src/datahub/ingestion/source/preset.py b/metadata-ingestion/src/datahub/ingestion/source/preset.py new file mode 100644 index 00000000000000..e51520898103d6 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/preset.py @@ -0,0 +1,114 @@ +import logging +from typing import Dict, Optional + +import requests +from pydantic.class_validators import root_validator, validator +from pydantic.fields import Field + +from datahub.emitter.mce_builder import DEFAULT_ENV +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SourceCapability, + SupportStatus, + capability, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.source.state.stale_entity_removal_handler import ( + StaleEntityRemovalSourceReport, + StatefulStaleMetadataRemovalConfig, +) +from datahub.ingestion.source.superset import SupersetConfig, SupersetSource +from datahub.utilities import config_clean + +logger = logging.getLogger(__name__) + + +class PresetConfig(SupersetConfig): + manager_uri: str = Field( + default="https://api.app.preset.io", description="Preset.io API URL" + ) + connect_uri: str = Field(default="", description="Preset workspace URL.") + display_uri: Optional[str] = Field( + default=None, + description="optional URL to use in links (if `connect_uri` is only for ingestion)", + ) + api_key: Optional[str] = Field(default=None, description="Preset.io API key.") + api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.") + + # Configuration for stateful ingestion + stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field( + default=None, description="Preset Stateful Ingestion Config." + ) + + options: Dict = Field(default={}, description="") + env: str = Field( + default=DEFAULT_ENV, + description="Environment to use in namespace when constructing URNs", + ) + database_alias: Dict[str, str] = Field( + default={}, + description="Can be used to change mapping for database names in superset to what you have in datahub", + ) + + @validator("connect_uri", "display_uri") + def remove_trailing_slash(cls, v): + return config_clean.remove_trailing_slashes(v) + + @root_validator + def default_display_uri_to_connect_uri(cls, values): + base = values.get("display_uri") + if base is None: + values["display_uri"] = values.get("connect_uri") + return values + + +@platform_name("Preset") +@config_class(PresetConfig) +@support_status(SupportStatus.TESTING) +@capability( + SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion" +) +class PresetSource(SupersetSource): + """ + Variation of the Superset plugin that works with Preset.io (Apache Superset SaaS). + """ + + config: PresetConfig + report: StaleEntityRemovalSourceReport + platform = "preset" + + def __init__(self, ctx: PipelineContext, config: PresetConfig): + logger.info(f"ctx is {ctx}") + + super().__init__(ctx, config) + self.config = config + self.report = StaleEntityRemovalSourceReport() + + def login(self): + try: + login_response = requests.post( + f"{self.config.manager_uri}/v1/auth/", + json={"name": self.config.api_key, "secret": self.config.api_secret}, + ) + except requests.exceptions.RequestException as e: + logger.error(f"Failed to authenticate with Preset: {e}") + raise e + + self.access_token = login_response.json()["payload"]["access_token"] + logger.debug("Got access token from Preset") + + requests_session = requests.Session() + requests_session.headers.update( + { + "Authorization": f"Bearer {self.access_token}", + "Content-Type": "application/json", + "Accept": "*/*", + } + ) + # Test the connection + test_response = requests_session.get(f"{self.config.connect_uri}/version") + if not test_response.ok: + logger.error("Unable to connect to workspace") + return requests_session diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index e563a806446c43..858281f880359a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -101,7 +101,11 @@ class SupersetConfig( ) username: Optional[str] = Field(default=None, description="Superset username.") password: Optional[str] = Field(default=None, description="Superset password.") - + api_key: Optional[str] = Field(default=None, description="Preset.io API key.") + api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.") + manager_uri: str = Field( + default="https://api.app.preset.io", description="Preset.io API URL" + ) # Configuration for stateful ingestion stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field( default=None, description="Superset Stateful Ingestion Config." @@ -179,7 +183,14 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig): super().__init__(config, ctx) self.config = config self.report = StaleEntityRemovalSourceReport() + if self.config.domain: + self.domain_registry = DomainRegistry( + cached_domains=[domain_id for domain_id in self.config.domain], + graph=self.ctx.graph, + ) + self.session = self.login() + def login(self) -> requests.Session: login_response = requests.post( f"{self.config.connect_uri}/api/v1/security/login", json={ @@ -193,8 +204,8 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig): self.access_token = login_response.json()["access_token"] logger.debug("Got access token from superset") - self.session = requests.Session() - self.session.headers.update( + requests_session = requests.Session() + requests_session.headers.update( { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json", @@ -202,17 +213,14 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig): } ) - if self.config.domain: - self.domain_registry = DomainRegistry( - cached_domains=[domain_id for domain_id in self.config.domain], - graph=self.ctx.graph, - ) - # Test the connection - test_response = self.session.get(f"{self.config.connect_uri}/api/v1/dashboard/") + test_response = requests_session.get( + f"{self.config.connect_uri}/api/v1/dashboard/" + ) if test_response.status_code == 200: pass # TODO(Gabe): how should we message about this error? + return requests_session @classmethod def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: diff --git a/metadata-ingestion/tests/integration/preset/golden_test_ingest.json b/metadata-ingestion/tests/integration/preset/golden_test_ingest.json new file mode 100644 index 00000000000000..5aca7f3e5bd14a --- /dev/null +++ b/metadata-ingestion/tests/integration/preset/golden_test_ingest.json @@ -0,0 +1,286 @@ +[ +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot": { + "urn": "urn:li:dashboard:(preset,1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dashboard.DashboardInfo": { + "customProperties": { + "Status": "published", + "IsPublished": "true", + "Owners": "test_username_1, test_username_2", + "IsCertified": "true", + "CertifiedBy": "Certification team", + "CertificationDetails": "Approved" + }, + "title": "test_dashboard_title_1", + "description": "", + "charts": [ + "urn:li:chart:(preset,10)", + "urn:li:chart:(preset,11)" + ], + "datasets": [], + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_1" + } + }, + "dashboardUrl": "mock://mock-domain.preset.io/dashboard/test_dashboard_url_1" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot": { + "urn": "urn:li:dashboard:(preset,2)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dashboard.DashboardInfo": { + "customProperties": { + "Status": "draft", + "IsPublished": "false", + "Owners": "unknown", + "IsCertified": "false" + }, + "title": "test_dashboard_title_2", + "description": "", + "charts": [ + "urn:li:chart:(preset,12)", + "urn:li:chart:(preset,13)" + ], + "datasets": [], + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_2" + } + }, + "dashboardUrl": "mock://mock-domain.preset.io/dashboard/test_dashboard_url_2" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,10)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_1", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_1" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_10", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "BAR" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,11)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_2", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_1" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_11", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "PIE" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,12)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_3", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_2" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_12", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "AREA" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,13)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_4", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_2" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_13", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "HISTOGRAM" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-test", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/preset/golden_test_stateful_ingest.json b/metadata-ingestion/tests/integration/preset/golden_test_stateful_ingest.json new file mode 100644 index 00000000000000..719f0a78fb7d72 --- /dev/null +++ b/metadata-ingestion/tests/integration/preset/golden_test_stateful_ingest.json @@ -0,0 +1,261 @@ +[ +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot": { + "urn": "urn:li:dashboard:(preset,1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dashboard.DashboardInfo": { + "customProperties": { + "Status": "published", + "IsPublished": "true", + "Owners": "test_username_1, test_username_2", + "IsCertified": "true", + "CertifiedBy": "Certification team", + "CertificationDetails": "Approved" + }, + "title": "test_dashboard_title_1", + "description": "", + "charts": [ + "urn:li:chart:(preset,10)", + "urn:li:chart:(preset,11)" + ], + "datasets": [], + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_1" + } + }, + "dashboardUrl": "mock://mock-domain.preset.io/dashboard/test_dashboard_url_1" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-2024_07_10-07_00_00", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,10)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_1", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_1" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_10", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "BAR" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-2024_07_10-07_00_00", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,11)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_2", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_1" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_11", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "PIE" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-2024_07_10-07_00_00", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,12)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_3", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_2" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_12", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "AREA" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-2024_07_10-07_00_00", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,13)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_4", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_2" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_13", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "HISTOGRAM" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-2024_07_10-07_00_00", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(preset,2)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": true + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-2024_07_10-07_00_00", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/preset/test_preset.py b/metadata-ingestion/tests/integration/preset/test_preset.py new file mode 100644 index 00000000000000..f926a762e6a078 --- /dev/null +++ b/metadata-ingestion/tests/integration/preset/test_preset.py @@ -0,0 +1,366 @@ +from typing import Any, Dict, Optional +from unittest.mock import patch + +import pytest +from freezegun import freeze_time + +from datahub.ingestion.run.pipeline import Pipeline +from tests.test_helpers import mce_helpers +from tests.test_helpers.state_helpers import ( + get_current_checkpoint_from_pipeline, + run_and_get_pipeline, + validate_all_providers_have_committed_successfully, +) + +FROZEN_TIME = "2024-07-10 07:00:00" +GMS_PORT = 8080 +GMS_SERVER = f"http://localhost:{GMS_PORT}" + + +def register_mock_api(request_mock: Any, override_data: Optional[dict] = None) -> None: + if override_data is None: + override_data = {} + + api_vs_response = { + "mock://mock-domain.preset.io/v1/auth/": { + "method": "POST", + "status_code": 200, + "json": { + "payload": { + "access_token": "test_token", + } + }, + }, + "mock://mock-domain.preset.io/version": { + "method": "GET", + "status_code": 200, + "json": { + "ci": { + "built_at": "Tue Jul 10 00:00:00 UTC 2024", + "build_num": "1", + "triggered_by": "Not triggered by a user", + }, + "git": { + "branch": "4.0.1.6", + "sha": "test_sha", + "sha_superset": "test_sha_superset", + "release_name": "test_release_name", + }, + "chart_version": "1.16.1", + "start_time": "2024-07-10 00:00:00", + "mt_deployment": True, + }, + }, + "mock://mock-domain.preset.io/api/v1/dashboard/": { + "method": "GET", + "status_code": 200, + "json": { + "count": 2, + "result": [ + { + "id": "1", + "changed_by": { + "username": "test_username_1", + }, + "changed_on_utc": "2024-07-10T07:00:00.000000+0000", + "dashboard_title": "test_dashboard_title_1", + "url": "/dashboard/test_dashboard_url_1", + "position_json": '{"CHART-test-1": {"meta": { "chartId": "10" }}, "CHART-test-2": {"meta": { "chartId": "11" }}}', + "status": "published", + "published": True, + "owners": [ + { + "username": "test_username_1", + }, + { + "username": "test_username_2", + }, + ], + "certified_by": "Certification team", + "certification_details": "Approved", + }, + { + "id": "2", + "changed_by": { + "username": "test_username_2", + }, + "changed_on_utc": "2024-07-10T07:00:00.000000+0000", + "dashboard_title": "test_dashboard_title_2", + "url": "/dashboard/test_dashboard_url_2", + "position_json": '{"CHART-test-3": {"meta": { "chartId": "12" }}, "CHART-test-4": {"meta": { "chartId": "13" }}}', + "status": "draft", + "published": False, + "owners": [ + { + "first_name": "name", + }, + ], + "certified_by": "", + "certification_details": "", + }, + ], + }, + }, + "mock://mock-domain.preset.io/api/v1/chart/": { + "method": "GET", + "status_code": 200, + "json": { + "count": 4, + "result": [ + { + "id": "10", + "changed_by": { + "username": "test_username_1", + }, + "changed_on_utc": "2024-07-10T07:00:00.000000+0000", + "slice_name": "test_chart_title_1", + "viz_type": "box_plot", + "url": "/explore/test_chart_url_10", + "datasource_id": "20", + "params": '{"metrics": [], "adhoc_filters": []}', + }, + { + "id": "11", + "changed_by": { + "username": "test_username_1", + }, + "changed_on_utc": "2024-07-10T07:00:00.000000+0000", + "slice_name": "test_chart_title_2", + "viz_type": "pie", + "url": "/explore/test_chart_url_11", + "datasource_id": "20", + "params": '{"metrics": [], "adhoc_filters": []}', + }, + { + "id": "12", + "changed_by": { + "username": "test_username_2", + }, + "changed_on_utc": "2024-07-10T07:00:00.000000+0000", + "slice_name": "test_chart_title_3", + "viz_type": "treemap", + "url": "/explore/test_chart_url_12", + "datasource_id": "20", + "params": '{"metrics": [], "adhoc_filters": []}', + }, + { + "id": "13", + "changed_by": { + "username": "test_username_2", + }, + "changed_on_utc": "2024-07-10T07:00:00.000000+0000", + "slice_name": "test_chart_title_4", + "viz_type": "histogram", + "url": "/explore/test_chart_url_13", + "datasource_id": "20", + "params": '{"metrics": [], "adhoc_filters": []}', + }, + ], + }, + }, + "mock://mock-domain.preset.io/api/v1/dataset/20": { + "method": "GET", + "status_code": 200, + "json": { + "result": { + "schema": "test_schema_name", + "table_name": "test_table_name", + "database": { + "id": "30", + "database_name": "test_database_name", + }, + }, + }, + }, + "mock://mock-domain.preset.io/api/v1/database/30": { + "method": "GET", + "status_code": 200, + "json": { + "result": { + "sqlalchemy_uri": "test_sqlalchemy_uri", + }, + }, + }, + } + + api_vs_response.update(override_data) + + for url in api_vs_response: + request_mock.register_uri( + api_vs_response[url]["method"], + url, + json=api_vs_response[url]["json"], + status_code=api_vs_response[url]["status_code"], + ) + + +@freeze_time(FROZEN_TIME) +@pytest.mark.integration +def test_preset_ingest(pytestconfig, tmp_path, mock_time, requests_mock): + test_resources_dir = pytestconfig.rootpath / "tests/integration/preset" + + register_mock_api(request_mock=requests_mock) + + pipeline = Pipeline.create( + { + "run_id": "preset-test", + "source": { + "type": "preset", + "config": { + "connect_uri": "mock://mock-domain.preset.io/", + "manager_uri": "mock://mock-domain.preset.io", + "api_key": "test_key", + "api_secret": "test_secret", + "provider": "db", + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/preset_mces.json", + }, + }, + } + ) + + pipeline.run() + pipeline.raise_from_status() + golden_file = "golden_test_ingest.json" + + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "preset_mces.json", + golden_path=f"{test_resources_dir}/{golden_file}", + ) + + +@freeze_time(FROZEN_TIME) +@pytest.mark.integration +def test_preset_stateful_ingest( + pytestconfig, tmp_path, mock_time, requests_mock, mock_datahub_graph +): + test_resources_dir = pytestconfig.rootpath / "tests/integration/preset" + + register_mock_api(request_mock=requests_mock) + + pipeline_config_dict: Dict[str, Any] = { + "source": { + "type": "preset", + "config": { + "connect_uri": "mock://mock-domain.preset.io/", + "manager_uri": "mock://mock-domain.preset.io", + "api_key": "test_key", + "api_secret": "test_secret", + "provider": "db", + # enable stateful ingestion + "stateful_ingestion": { + "enabled": True, + "remove_stale_metadata": True, + "fail_safe_threshold": 100.0, + "state_provider": { + "type": "datahub", + "config": {"datahub_api": {"server": GMS_SERVER}}, + }, + }, + }, + }, + "sink": { + # we are not really interested in the resulting events for this test + "type": "console" + }, + "pipeline_name": "test_pipeline", + } + + dashboard_endpoint_override = { + "mock://mock-domain.preset.io/api/v1/dashboard/": { + "method": "GET", + "status_code": 200, + "json": { + "count": 1, + "result": [ + { + "id": "1", + "changed_by": { + "username": "test_username_1", + }, + "changed_on_utc": "2024-07-10T07:00:00.000000+0000", + "dashboard_title": "test_dashboard_title_1", + "url": "/dashboard/test_dashboard_url_1", + "position_json": '{"CHART-test-1": {"meta": { "chartId": "10" }}, "CHART-test-2": {"meta": { "chartId": "11" }}}', + "status": "published", + "published": True, + "owners": [ + { + "username": "test_username_1", + }, + { + "username": "test_username_2", + }, + ], + "certified_by": "Certification team", + "certification_details": "Approved", + }, + ], + }, + }, + } + + with patch( + "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", + mock_datahub_graph, + ) as mock_checkpoint: + # Both checkpoint and reporting will use the same mocked graph instance. + mock_checkpoint.return_value = mock_datahub_graph + + # Do the first run of the pipeline and get the default job's checkpoint. + pipeline_run1 = run_and_get_pipeline(pipeline_config_dict) + checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1) + + assert checkpoint1 + assert checkpoint1.state + + # Remove one dashboard from the preset config. + register_mock_api( + request_mock=requests_mock, override_data=dashboard_endpoint_override + ) + + # Capture MCEs of second run to validate Status(removed=true) + deleted_mces_path = f"{tmp_path}/preset_deleted_mces.json" + pipeline_config_dict["sink"]["type"] = "file" + pipeline_config_dict["sink"]["config"] = {"filename": deleted_mces_path} + + # Do the second run of the pipeline. + pipeline_run2 = run_and_get_pipeline(pipeline_config_dict) + checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2) + + assert checkpoint2 + assert checkpoint2.state + + # Perform all assertions on the states. The deleted dashboard should not be + # part of the second state + state1 = checkpoint1.state + state2 = checkpoint2.state + difference_urns = list( + state1.get_urns_not_in(type="dashboard", other_checkpoint_state=state2) + ) + + assert len(difference_urns) == 1 + + urn1 = "urn:li:dashboard:(preset,2)" + + assert urn1 in difference_urns + + # Validate that all providers have committed successfully. + validate_all_providers_have_committed_successfully( + pipeline=pipeline_run1, expected_providers=1 + ) + validate_all_providers_have_committed_successfully( + pipeline=pipeline_run2, expected_providers=1 + ) + + # Verify the output. + mce_helpers.check_golden_file( + pytestconfig, + output_path=deleted_mces_path, + golden_path=test_resources_dir / "golden_test_stateful_ingest.json", + ) diff --git a/metadata-ingestion/tests/unit/test_preset_source.py b/metadata-ingestion/tests/unit/test_preset_source.py new file mode 100644 index 00000000000000..d97db651f4c795 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_preset_source.py @@ -0,0 +1,22 @@ +from datahub.ingestion.source.preset import PresetConfig + + +def test_default_values(): + config = PresetConfig.parse_obj({}) + + assert config.connect_uri == "" + assert config.manager_uri == "https://api.app.preset.io" + assert config.display_uri == "" + assert config.env == "PROD" + assert config.api_key is None + assert config.api_secret is None + + +def test_set_display_uri(): + display_uri = "some_host:1234" + + config = PresetConfig.parse_obj({"display_uri": display_uri}) + + assert config.connect_uri == "" + assert config.manager_uri == "https://api.app.preset.io" + assert config.display_uri == display_uri