From d1e76fbaecb47f97182ffcc5b50e5c6825aaf032 Mon Sep 17 00:00:00 2001 From: Tsung-Ju Lii Date: Wed, 24 Jul 2024 12:48:53 +0800 Subject: [PATCH] finish implementation --- .../discovery_api/graphql_client/__init__.py | 10 + .../graphql_client/base_model.py | 3 +- .../discovery_api/graphql_client/client.py | 46 +++- .../graphql_client/get_job_run_metrics.py | 44 ++++ .../graphql_client/get_job_run_tests.py | 3 +- .../dbt/cloud/discovery_api/queries.graphql | 167 +++----------- metaphor/dbt/cloud/extractor.py | 131 ++--------- metaphor/dbt/cloud/parser/common.py | 45 ++++ metaphor/dbt/cloud/parser/metric_parser.py | 66 ++++++ metaphor/dbt/cloud/parser/node_parser.py | 78 +++---- metaphor/dbt/cloud/parser/parser.py | 40 +++- metaphor/dbt/cloud/parser/test_parser.py | 21 +- tests/dbt/cloud/test_client.py | 27 +-- tests/dbt/cloud/test_extractor.py | 208 ------------------ 14 files changed, 322 insertions(+), 567 deletions(-) create mode 100644 metaphor/dbt/cloud/discovery_api/graphql_client/get_job_run_metrics.py create mode 100644 metaphor/dbt/cloud/parser/common.py create mode 100644 metaphor/dbt/cloud/parser/metric_parser.py delete mode 100644 tests/dbt/cloud/test_extractor.py diff --git a/metaphor/dbt/cloud/discovery_api/graphql_client/__init__.py b/metaphor/dbt/cloud/discovery_api/graphql_client/__init__.py index f366d68f..0e716d18 100644 --- a/metaphor/dbt/cloud/discovery_api/graphql_client/__init__.py +++ b/metaphor/dbt/cloud/discovery_api/graphql_client/__init__.py @@ -48,6 +48,12 @@ GetJobRunMacrosJob, GetJobRunMacrosJobMacros, ) +from .get_job_run_metrics import ( + GetJobRunMetrics, + GetJobRunMetricsJob, + GetJobRunMetricsJobMetrics, + GetJobRunMetricsJobMetricsFilters, +) from .get_job_run_models import ( GetJobRunModels, GetJobRunModelsJob, @@ -125,6 +131,10 @@ "GetJobRunMacros", "GetJobRunMacrosJob", "GetJobRunMacrosJobMacros", + "GetJobRunMetrics", + "GetJobRunMetricsJob", + "GetJobRunMetricsJobMetrics", + "GetJobRunMetricsJobMetricsFilters", "GetJobRunModels", "GetJobRunModelsJob", "GetJobRunModelsJobModels", diff --git a/metaphor/dbt/cloud/discovery_api/graphql_client/base_model.py b/metaphor/dbt/cloud/discovery_api/graphql_client/base_model.py index 76b84873..a93b416e 100644 --- a/metaphor/dbt/cloud/discovery_api/graphql_client/base_model.py +++ b/metaphor/dbt/cloud/discovery_api/graphql_client/base_model.py @@ -2,7 +2,8 @@ from io import IOBase -from pydantic import BaseModel as PydanticBaseModel, ConfigDict +from pydantic import BaseModel as PydanticBaseModel +from pydantic import ConfigDict class UnsetType: diff --git a/metaphor/dbt/cloud/discovery_api/graphql_client/client.py b/metaphor/dbt/cloud/discovery_api/graphql_client/client.py index cd3b2f3e..237926b7 100644 --- a/metaphor/dbt/cloud/discovery_api/graphql_client/client.py +++ b/metaphor/dbt/cloud/discovery_api/graphql_client/client.py @@ -9,6 +9,7 @@ from .get_environment_model_file_path import GetEnvironmentModelFilePath from .get_environment_snapshot_file_path import GetEnvironmentSnapshotFilePath from .get_job_run_macros import GetJobRunMacros +from .get_job_run_metrics import GetJobRunMetrics from .get_job_run_models import GetJobRunModels from .get_job_run_snapshots import GetJobRunSnapshots from .get_job_run_sources import GetJobRunSources @@ -207,6 +208,50 @@ def get_job_run_sources( data = self.get_data(response) return GetJobRunSources.model_validate(data) + def get_job_run_metrics( + self, + job_id: Any, + run_id: Union[Optional[Any], UnsetType] = UNSET, + **kwargs: Any + ) -> GetJobRunMetrics: + query = gql( + """ + query GetJobRunMetrics($jobId: BigInt!, $runId: BigInt) { + job(id: $jobId, runId: $runId) { + metrics { + packageName + label + description + dependsOn + uniqueId + timeGrains + timestamp + dimensions + filters { + field + operator + value + } + tags + type + sql + expression + calculation_method + } + } + } + """ + ) + variables: Dict[str, object] = {"jobId": job_id, "runId": run_id} + response = self.execute( + query=query, + operation_name="GetJobRunMetrics", + variables=variables, + **kwargs + ) + data = self.get_data(response) + return GetJobRunMetrics.model_validate(data) + def get_job_run_tests( self, job_id: Any, run_id: Any, **kwargs: Any ) -> GetJobRunTests: @@ -220,7 +265,6 @@ def get_job_run_tests( compiledCode dependsOn name - runId uniqueId } } diff --git a/metaphor/dbt/cloud/discovery_api/graphql_client/get_job_run_metrics.py b/metaphor/dbt/cloud/discovery_api/graphql_client/get_job_run_metrics.py new file mode 100644 index 00000000..1603b8ff --- /dev/null +++ b/metaphor/dbt/cloud/discovery_api/graphql_client/get_job_run_metrics.py @@ -0,0 +1,44 @@ +# Generated by ariadne-codegen +# Source: queries.graphql + +from typing import List, Optional + +from pydantic import Field + +from .base_model import BaseModel + + +class GetJobRunMetrics(BaseModel): + job: Optional["GetJobRunMetricsJob"] + + +class GetJobRunMetricsJob(BaseModel): + metrics: List["GetJobRunMetricsJobMetrics"] + + +class GetJobRunMetricsJobMetrics(BaseModel): + package_name: Optional[str] = Field(alias="packageName") + label: Optional[str] + description: Optional[str] + depends_on: List[str] = Field(alias="dependsOn") + unique_id: str = Field(alias="uniqueId") + time_grains: Optional[List[str]] = Field(alias="timeGrains") + timestamp: Optional[str] + dimensions: List[str] + filters: List["GetJobRunMetricsJobMetricsFilters"] + tags: Optional[List[str]] + type: Optional[str] + sql: Optional[str] + expression: Optional[str] + calculation_method: Optional[str] + + +class GetJobRunMetricsJobMetricsFilters(BaseModel): + field: Optional[str] + operator: Optional[str] + value: Optional[str] + + +GetJobRunMetrics.model_rebuild() +GetJobRunMetricsJob.model_rebuild() +GetJobRunMetricsJobMetrics.model_rebuild() diff --git a/metaphor/dbt/cloud/discovery_api/graphql_client/get_job_run_tests.py b/metaphor/dbt/cloud/discovery_api/graphql_client/get_job_run_tests.py index 9fbb61a0..146407d6 100644 --- a/metaphor/dbt/cloud/discovery_api/graphql_client/get_job_run_tests.py +++ b/metaphor/dbt/cloud/discovery_api/graphql_client/get_job_run_tests.py @@ -1,7 +1,7 @@ # Generated by ariadne-codegen # Source: queries.graphql -from typing import Any, List, Optional +from typing import List, Optional from pydantic import Field @@ -22,7 +22,6 @@ class GetJobRunTestsJobTests(BaseModel): compiled_code: Optional[str] = Field(alias="compiledCode") depends_on: List[str] = Field(alias="dependsOn") name: Optional[str] - run_id: Any = Field(alias="runId") unique_id: str = Field(alias="uniqueId") diff --git a/metaphor/dbt/cloud/discovery_api/queries.graphql b/metaphor/dbt/cloud/discovery_api/queries.graphql index 44a4d775..76d73ef9 100644 --- a/metaphor/dbt/cloud/discovery_api/queries.graphql +++ b/metaphor/dbt/cloud/discovery_api/queries.graphql @@ -108,148 +108,31 @@ query GetJobRunSources($jobId: BigInt!, $runId: BigInt!) { } } } - # metrics { - # accountId - # calculation_method - # dbtVersion - # dependsOn - # description - # dimensions - # environmentId - # environmentName - # expression - # filters { - # field - # operator - # value - # } - # jobId - # label - # meta - # model { - # access - # accountId - # alias - # args - # childrenL1 - # columns { - # comment - # description - # index - # meta - # name - # tags - # type - # } - # comment - # compileCompletedAt - # compileStartedAt - # compiledCode - # compiledSql - # database - # dbtGroup - # dbtVersion - # dependsOn - # description - # environmentId - # error - # executeCompletedAt - # executeStartedAt - # executionTime - # invocationId - # jobId - # language - # materializedType - # meta - # name - # owner - # packageName - # packages - # projectId - # rawCode - # rawSql - # resourceType - # runElapsedTime - # runGeneratedAt - # runId - # runResults { - # args - # compileCompletedAt - # compileStartedAt - # error - # executeCompletedAt - # executeStartedAt - # executionTime - # invocationId - # runElapsedTime - # runGeneratedAt - # skip - # status - # threadId - # } - # schema - # skip - # stats { - # description - # id - # include - # label - # value - # } - # status - # tags - # tests { - # accountId - # columnName - # compileCompletedAt - # compileStartedAt - # compiledCode - # compiledSql - # dbtVersion - # dependsOn - # description - # environmentId - # error - # executeCompletedAt - # executeStartedAt - # executionTime - # fail - # invocationId - # jobId - # language - # meta - # name - # projectId - # rawCode - # rawSql - # resourceType - # runElapsedTime - # runGeneratedAt - # runId - # skip - # state - # status - # tags - # threadId - # uniqueId - # warn - # } - # threadId - # type - # uniqueId - # } - # name - # packageName - # projectId - # resourceType - # runId - # sql - # tags - # timeGrains - # timestamp - # type - # uniqueId - # } + +query GetJobRunMetrics($jobId: BigInt!, $runId: BigInt) { + job(id: $jobId, runId: $runId) { + metrics { + packageName + label + description + dependsOn + uniqueId + timeGrains + timestamp + dimensions + filters { + field + operator + value + } + tags + type + sql + expression + calculation_method + } + } +} query GetJobRunTests($jobId: BigInt!, $runId: BigInt!) { job(id: $jobId, runId: $runId) { diff --git a/metaphor/dbt/cloud/extractor.py b/metaphor/dbt/cloud/extractor.py index 5b062954..01425233 100644 --- a/metaphor/dbt/cloud/extractor.py +++ b/metaphor/dbt/cloud/extractor.py @@ -1,5 +1,4 @@ -from collections import defaultdict -from typing import Collection, Dict, List, Optional, Set +from typing import Collection, Dict, List import httpx @@ -9,19 +8,9 @@ from metaphor.dbt.cloud.client import DbtAdminAPIClient from metaphor.dbt.cloud.config import DbtCloudConfig from metaphor.dbt.cloud.discovery_api import DiscoveryAPIClient -from metaphor.dbt.cloud.discovery_api.graphql_client.get_job_tests import ( - GetJobTestsJobTests, -) from metaphor.dbt.cloud.parser.parser import Parser from metaphor.dbt.cloud.utils import parse_environment -from metaphor.dbt.config import DbtRunConfig from metaphor.models.crawler_run_metadata import Platform -from metaphor.models.metadata_change_event import ( - DataPlatform, - Dataset, - Metric, - VirtualView, -) logger = get_logger() @@ -40,26 +29,23 @@ def from_config_file(config_file: str) -> "DbtCloudExtractor": def __init__(self, config: DbtCloudConfig): super().__init__(config) + self._config = config self._account_id = config.account_id self._job_ids = config.job_ids self._project_ids = config.project_ids - self._service_token = config.service_token - self._meta_ownerships = config.meta_ownerships - self._meta_tags = config.meta_tags - self._meta_key_tags = config.meta_key_tags self._base_url = config.base_url self._discovery_api_url = config.discovery_api_url self._project_accounts: Dict[int, str] = {} - self._entities: Dict[int, Collection[ENTITY_TYPES]] = {} + self._entities: List[ENTITY_TYPES] = [] self._client = DbtAdminAPIClient( base_url=self._base_url, account_id=self._account_id, - service_token=self._service_token, + service_token=config.service_token, included_env_ids=config.environment_ids, ) headers = { - "Authorization": f"Bearer {self._service_token}", + "Authorization": f"Bearer {config.service_token}", "Content-Type": "application/json", } self._discovery_api_client = DiscoveryAPIClient( @@ -68,11 +54,6 @@ def __init__(self, config: DbtCloudConfig): http_client=httpx.Client(timeout=None, headers=headers), ) - self._datasets: Dict[str, Dataset] = {} - self._virtual_views: Dict[str, VirtualView] = {} - self._metrics: Dict[str, Metric] = {} - self._referenced_virtual_views: Set[str] = set() - async def extract(self) -> Collection[ENTITY_TYPES]: logger.info("Fetching metadata from DBT cloud") @@ -80,11 +61,11 @@ async def extract(self) -> Collection[ENTITY_TYPES]: self._job_ids.update(self._client.get_project_jobs(project_id)) for job_id in self._job_ids: - await self._extract_last_run(job_id) + await self._extract_job(job_id) - return [item for ls in self._entities.values() for item in ls] + return self._entities - async def _extract_last_run(self, job_id: int): + async def _extract_job(self, job_id: int): if not self._client.is_job_included(job_id): logger.info(f"Ignoring job ID: {job_id}") return @@ -115,97 +96,13 @@ async def _extract_last_run(self, job_id: int): ).environment platform, project_name = parse_environment(environment) - conf = DbtRunConfig( - manifest="", - run_results=None, # Instead of getting test results from `run_results.json`, we get them from discovery API after we parse the manifest - account=account, - docs_base_url=docs_base_url, - output=self._output, - meta_ownerships=self._meta_ownerships, - meta_tags=self._meta_tags, - meta_key_tags=self._meta_key_tags, - ) - - Parser( + job_run_parser = Parser( self._discovery_api_client, - conf, + self._config, platform, + account, project_name, - ).parse_run(run) - - def _extend_test_run_results_entities( - self, - platform: DataPlatform, - account: Optional[str], - job_id: int, - entities: Collection[ENTITY_TYPES], - ): - logger.info("Parsing test run results") - - new_monitor_datasets: List[Dataset] = list() - - # Get all test nodes from discovery API - test_nodes_by_model_uid: Dict[str, List[GetJobTestsJobTests]] = defaultdict( - list + docs_base_url, + project_source_url=None, ) - job = self._discovery_api_client.get_job_tests(job_id).job - assert job is not None - for test in job.tests: - for model in [x for x in test.depends_on if x.startswith("model.")]: - test_nodes_by_model_uid[model].append(test) - - # job = self._discovery_api_client.get_job_models(job_id).job - # assert job is not None - # model_names = { - # model.unique_id: dataset_normalized_name( - # model.database, model.schema_, model.name - # ) - # for model in job.models - # } - - # # Go thru the virtual views - # for entity in entities: - # if not isinstance(entity, VirtualView): - # continue - # if not entity.logical_id or not entity.logical_id.name: - # continue - - # model_unique_id = f"model.{entity.logical_id.name}" - - # if ( - # model_unique_id not in test_nodes_by_model_uid - # or model_unique_id not in model_names - # ): - # continue - - # dataset_logical_id = DatasetLogicalID( - # name=model_names[model_unique_id], - # platform=platform, - # account=account, - # ) - - # dataset = Dataset( - # logical_id=dataset_logical_id, - # ) - - # # Go thru the tests in this dbt model - # for test in test_nodes_by_model_uid[model_unique_id]: - # if not test.name: - # continue - - # status = dbt_run_result_output_data_monitor_status_map[ - # test.status or "skipped" - # ] - - # add_data_quality_monitor( - # dataset, - # test.name, - # test.column_name, - # status, - # test.execute_completed_at, - # ) - - # if dataset.data_quality and dataset.data_quality.monitors: - # new_monitor_datasets.append(dataset) - - # return list(entities) + new_monitor_datasets + self._entities.extend(job_run_parser.parse_run(run)) diff --git a/metaphor/dbt/cloud/parser/common.py b/metaphor/dbt/cloud/parser/common.py new file mode 100644 index 00000000..94ca3c94 --- /dev/null +++ b/metaphor/dbt/cloud/parser/common.py @@ -0,0 +1,45 @@ +from typing import Dict, List, Union + +from metaphor.common.entity_id import EntityId +from metaphor.common.utils import unique_list +from metaphor.dbt.util import get_virtual_view_id +from metaphor.models.metadata_change_event import ( + DbtMacro, + DbtMetric, + DbtModel, + VirtualView, +) + + +def parse_depends_on( + virtual_views: Dict[str, VirtualView], + depends_on: List[str], + source_map: Dict[str, EntityId], + macro_map: Dict[str, DbtMacro], + target: Union[DbtModel, DbtMetric], +): + if not depends_on: + return + + datasets, models, macros = None, None, None + + datasets = unique_list( + [str(source_map[n]) for n in depends_on if n.startswith("source.")] + ) + + models = unique_list( + [ + get_virtual_view_id(virtual_views[n].logical_id) # type: ignore + for n in depends_on + if n.startswith("model.") or n.startswith("snapshot.") + ] + ) + + macros = [ + macro_map[n] for n in depends_on if n.startswith("macro.") and n in macro_map + ] + + target.source_datasets = datasets if datasets else None + target.source_models = models if models else None + if isinstance(target, DbtModel): + target.macros = macros if macros else None diff --git a/metaphor/dbt/cloud/parser/metric_parser.py b/metaphor/dbt/cloud/parser/metric_parser.py new file mode 100644 index 00000000..e4a31979 --- /dev/null +++ b/metaphor/dbt/cloud/parser/metric_parser.py @@ -0,0 +1,66 @@ +from typing import Dict, Optional + +from metaphor.common.entity_id import EntityId +from metaphor.dbt.cloud.discovery_api.graphql_client.get_job_run_metrics import ( + GetJobRunMetricsJobMetrics as JobMetric, +) +from metaphor.dbt.cloud.parser.common import parse_depends_on +from metaphor.dbt.util import build_metric_docs_url, build_system_tags, init_metric +from metaphor.models.metadata_change_event import ( + DbtMacro, + DbtMetric, + EntityUpstream, + Metric, + MetricFilter, + VirtualView, +) + + +class MetricParser: + def __init__( + self, + metrics: Dict[str, Metric], + virtual_views: Dict[str, VirtualView], + docs_base_url: Optional[str], + ) -> None: + self._metrics = metrics + self._virtual_views = virtual_views + self._docs_base_url = docs_base_url + + def parse( + self, + metric: JobMetric, + source_map: Dict[str, EntityId], + macro_map: Dict[str, DbtMacro], + ) -> None: + + metric_entity = init_metric(self._metrics, metric.unique_id) + metric_entity.dbt_metric = DbtMetric( + package_name=metric.package_name, + description=metric.description or None, + label=metric.label, + timestamp=metric.timestamp, + time_grains=metric.time_grains, + dimensions=metric.dimensions, + filters=[ + MetricFilter(field=f.field, operator=f.operator, value=f.value) + for f in metric.filters + ], + url=build_metric_docs_url(self._docs_base_url, metric.unique_id), + sql=metric.expression or metric.sql, + type=metric.calculation_method or metric.type, + ) + if metric.tags: + metric_entity.system_tags = build_system_tags(metric.tags) + + parse_depends_on( + self._virtual_views, + metric.depends_on, + source_map, + macro_map, + metric_entity.dbt_metric, + ) + + metric_entity.entity_upstream = EntityUpstream( + source_entities=metric_entity.dbt_metric.source_models, + ) diff --git a/metaphor/dbt/cloud/parser/node_parser.py b/metaphor/dbt/cloud/parser/node_parser.py index 19625984..e061b11a 100644 --- a/metaphor/dbt/cloud/parser/node_parser.py +++ b/metaphor/dbt/cloud/parser/node_parser.py @@ -1,8 +1,9 @@ import json -from typing import Any, Dict, List, Optional, Set, Tuple, Union, cast +from typing import Any, Dict, List, Optional, Union, cast from metaphor.common.entity_id import EntityId, parts_to_dataset_entity_id from metaphor.common.utils import unique_list +from metaphor.dbt.cloud.config import DbtCloudConfig from metaphor.dbt.cloud.discovery_api import DiscoveryAPIClient from metaphor.dbt.cloud.discovery_api.graphql_client.get_job_run_models import ( GetJobRunModelsJobModels, @@ -10,7 +11,7 @@ from metaphor.dbt.cloud.discovery_api.graphql_client.get_job_run_snapshots import ( GetJobRunSnapshotsJobSnapshots, ) -from metaphor.dbt.config import DbtRunConfig +from metaphor.dbt.cloud.parser.common import parse_depends_on from metaphor.dbt.util import ( build_model_docs_url, build_source_code_url, @@ -20,7 +21,6 @@ get_model_name_from_unique_id, get_ownerships_from_meta, get_snapshot_name_from_unique_id, - get_virtual_view_id, init_dataset, init_field, init_virtual_view, @@ -49,9 +49,11 @@ class NodeParser: def __init__( self, discovery_api: DiscoveryAPIClient, - config: DbtRunConfig, + config: DbtCloudConfig, platform: DataPlatform, account: Optional[str], + docs_base_url: Optional[str], + project_source_url: Optional[str], datasets: Dict[str, Dataset], virtual_views: Dict[str, VirtualView], metrics: Dict[str, Metric], @@ -59,8 +61,8 @@ def __init__( self._discovery_api = discovery_api self._platform = platform self._account = account - self._docs_base_url = config.docs_base_url - self._project_source_url = config.project_source_url + self._docs_base_url = docs_base_url + self._project_source_url = project_source_url self._meta_ownerships = config.meta_ownerships self._meta_tags = config.meta_tags self._meta_key_tags = config.meta_key_tags @@ -199,18 +201,20 @@ def _parse_column_meta(self, node: NODE_TYPE, column_name: str, meta: Dict) -> N def _parse_node_file_path(self, node: NODE_TYPE): if isinstance(node, GetJobRunModelsJobModels): - definition = self._discovery_api.get_environment_model_file_path( + model_definition = self._discovery_api.get_environment_model_file_path( node.environment_id ).environment.definition - assert definition - return definition.models.edges[0].node.file_path + assert model_definition + return model_definition.models.edges[0].node.file_path elif isinstance(node, GetJobRunSnapshotsJobSnapshots): - definition = self._discovery_api.get_environment_snapshot_file_path( - node.environment_id - ).environment.definition - assert definition - return definition.snapshots.edges[0].node.file_path + snapshot_definition = ( + self._discovery_api.get_environment_snapshot_file_path( + node.environment_id + ).environment.definition + ) + assert snapshot_definition + return snapshot_definition.snapshots.edges[0].node.file_path def _init_dbt_model(self, node: NODE_TYPE, virtual_view: VirtualView): virtual_view.dbt_model = DbtModel( @@ -226,10 +230,10 @@ def _init_dbt_model(self, node: NODE_TYPE, virtual_view: VirtualView): def _set_system_tags(self, node: NODE_TYPE, virtual_view: VirtualView): # Treat dbt tags as system tags - tags = get_dbt_tags_from_meta(node.meta, self._meta_key_tags) - if node.tags: - tags.extend(node.tags) - tags: List[str] = unique_list(tags) + tags: List[str] = unique_list( + get_dbt_tags_from_meta(node.meta, self._meta_key_tags) + + (node.tags if node.tags else []) + ) if len(tags) > 0: virtual_view.system_tags = build_system_tags(tags) @@ -274,41 +278,9 @@ def parse( if isinstance(node, GetJobRunModelsJobModels): self._parse_model_meta(node, virtual_view) self._parse_model_materialization(node, dbt_model) - self._parse_depends_on(node.depends_on, source_map, macro_map, dbt_model) + parse_depends_on( + self._virtual_views, node.depends_on, source_map, macro_map, dbt_model + ) self._parse_node_columns(node, dbt_model) self._set_entity_upstream(virtual_view, dbt_model) - - def _parse_depends_on( - self, - depends_on: List[str], - source_map: Dict[str, EntityId], - macro_map: Dict[str, DbtMacro], - dbt_model: DbtModel, - ): - if not depends_on: - return - - datasets, models, macros = None, None, None - - datasets = unique_list( - [str(source_map[n]) for n in depends_on if n.startswith("source.")] - ) - - models = unique_list( - [ - get_virtual_view_id(self._virtual_views[n].logical_id) # type: ignore - for n in depends_on - if n.startswith("model.") or n.startswith("snapshot.") - ] - ) - - macros = [ - macro_map[n] - for n in depends_on - if n.startswith("macro.") and n in macro_map - ] - - dbt_model.source_datasets = datasets if datasets else None - dbt_model.source_models = models if models else None - dbt_model.macros = macros if macros else None diff --git a/metaphor/dbt/cloud/parser/parser.py b/metaphor/dbt/cloud/parser/parser.py index ffcd53be..34a9f76f 100644 --- a/metaphor/dbt/cloud/parser/parser.py +++ b/metaphor/dbt/cloud/parser/parser.py @@ -1,15 +1,20 @@ +import time from typing import Dict, List, Optional, Set from metaphor.common.event_util import ENTITY_TYPES +from metaphor.common.logger import get_logger from metaphor.common.snowflake import normalize_snowflake_account from metaphor.dbt.cloud.client import DbtRun +from metaphor.dbt.cloud.config import DbtCloudConfig from metaphor.dbt.cloud.discovery_api import DiscoveryAPIClient -from metaphor.dbt.cloud.discovery_api.graphql_client.get_job_run_models import GetJobRunModelsJobModels +from metaphor.dbt.cloud.discovery_api.graphql_client.get_job_run_models import ( + GetJobRunModelsJobModels, +) from metaphor.dbt.cloud.parser.macro_parser import MacroParser +from metaphor.dbt.cloud.parser.metric_parser import MetricParser from metaphor.dbt.cloud.parser.node_parser import NodeParser from metaphor.dbt.cloud.parser.source_parser import SourceParser from metaphor.dbt.cloud.parser.test_parser import TestParser -from metaphor.dbt.config import DbtRunConfig from metaphor.dbt.util import init_virtual_view from metaphor.models.metadata_change_event import ( DataPlatform, @@ -18,17 +23,23 @@ VirtualView, ) +logger = get_logger() + class Parser: def __init__( self, discovery_api: DiscoveryAPIClient, - config: DbtRunConfig, + config: DbtCloudConfig, platform: DataPlatform, + account: Optional[str], project_name: Optional[str], + docs_base_url: str, + project_source_url: Optional[str], ) -> None: self._discovery_api = discovery_api self._platform = platform + self._account = account self._config = config self._datasets: Dict[str, Dataset] = {} @@ -37,7 +48,6 @@ def __init__( self._referenced_virtual_views: Set[str] = set() self._project_name = project_name - self._account = config.account if self._account and platform == DataPlatform.SNOWFLAKE: self._account = normalize_snowflake_account(self._account) @@ -50,6 +60,8 @@ def __init__( self._config, self._platform, self._account, + docs_base_url, + project_source_url, self._datasets, self._virtual_views, self._metrics, @@ -60,6 +72,11 @@ def __init__( self._virtual_views, self._datasets, ) + self._metric_parser = MetricParser( + self._metrics, + self._virtual_views, + docs_base_url, + ) def _get_source_map(self, run: DbtRun): job_run_sources = self._discovery_api.get_job_run_sources( @@ -88,10 +105,18 @@ def _get_tests(self, run: DbtRun): assert job_run_tests.job return job_run_tests.job.tests + def _get_metrics(self, run: DbtRun): + job_run_metrics = self._discovery_api.get_job_run_metrics( + run.job_id, run.run_id + ) + assert job_run_metrics.job + return job_run_metrics.job.metrics + def parse_run(self, run: DbtRun): """ Parses a single job run. """ + start = time.time() source_map = self._get_source_map(run) macro_map = self._get_macro_map(run) @@ -115,8 +140,8 @@ def parse_run(self, run: DbtRun): for test in self._get_tests(run): self._test_parser.parse(test, models) - # for _, metric in job.metrics: - # self._parse_metric(metric, source_map, macro_map) + for metric in self._get_metrics(run): + self._metric_parser.parse(metric, source_map, macro_map) entities: List[ENTITY_TYPES] = [] entities.extend(self._datasets.values()) @@ -126,4 +151,7 @@ def parse_run(self, run: DbtRun): if k not in self._referenced_virtual_views ) entities.extend(self._metrics.values()) + logger.info( + f"Fetched job ID: {run.job_id}, elapsed time: {time.time() - start} secs." + ) return entities diff --git a/metaphor/dbt/cloud/parser/test_parser.py b/metaphor/dbt/cloud/parser/test_parser.py index c391b7af..2cd4d1d0 100644 --- a/metaphor/dbt/cloud/parser/test_parser.py +++ b/metaphor/dbt/cloud/parser/test_parser.py @@ -1,22 +1,24 @@ from datetime import datetime from typing import Dict, List, Optional -from metaphor.models.metadata_change_event import ( - VirtualView, - Dataset, - DbtTest, - DataPlatform, - DataMonitorStatus, -) from metaphor.common.logger import get_logger from metaphor.dbt.cloud.discovery_api.graphql_client.get_job_run_models import ( GetJobRunModelsJobModels as Model, +) +from metaphor.dbt.cloud.discovery_api.graphql_client.get_job_run_models import ( GetJobRunModelsJobModelsRunResults as RunResult, ) from metaphor.dbt.cloud.discovery_api.graphql_client.get_job_run_tests import ( GetJobRunTestsJobTests as Test, ) from metaphor.dbt.util import add_data_quality_monitor, init_dataset, init_dbt_tests +from metaphor.models.metadata_change_event import ( + DataMonitorStatus, + DataPlatform, + Dataset, + DbtTest, + VirtualView, +) logger = get_logger() dbt_run_result_output_data_monitor_status_map: Dict[str, DataMonitorStatus] = { @@ -118,10 +120,7 @@ def run_result_key(run_result: RunResult): ), None, ) - if ( - run_result is None - or run_result.status is None - ): + if run_result is None or run_result.status is None: logger.warning(f"No valid run_result found: {run_results}") return diff --git a/tests/dbt/cloud/test_client.py b/tests/dbt/cloud/test_client.py index 8ecf91a6..c7fc8c4f 100644 --- a/tests/dbt/cloud/test_client.py +++ b/tests/dbt/cloud/test_client.py @@ -1,7 +1,7 @@ from typing import Dict from unittest.mock import patch -from metaphor.dbt.cloud.client import DbtAdminAPIClient, DbtRun +from metaphor.dbt.cloud.client import DbtAdminAPIClient class Response: @@ -198,31 +198,6 @@ def test_get_project_jobs(mock_requests): assert jobs == [3333, 2222] -@patch("metaphor.dbt.cloud.client.requests") -def test_get_run_artifact(mock_requests): - client = DbtAdminAPIClient( - base_url="http://base.url", - account_id=1111, - service_token="service_token", - ) - - mock_requests.get.return_value = Response(200, {"artifact": "json"}) - - run = DbtRun(run_id=2222, project_id=3333, job_id=4444) - path = client.get_run_artifact(run, "manifest.json") - assert path.endswith("/3333-4444-manifest.json") - - mock_requests.get.assert_called_once_with( - "http://base.url/api/v2/accounts/1111/runs/2222/artifacts/manifest.json", - params=None, - headers={ - "Content-Type": "application/json", - "Authorization": "Token service_token", - }, - timeout=600, - ) - - @patch("metaphor.dbt.cloud.client.requests") def test_job_is_included(mock_requests): client = DbtAdminAPIClient( diff --git a/tests/dbt/cloud/test_extractor.py b/tests/dbt/cloud/test_extractor.py deleted file mode 100644 index 4b1b6208..00000000 --- a/tests/dbt/cloud/test_extractor.py +++ /dev/null @@ -1,208 +0,0 @@ -from datetime import datetime -from unittest.mock import MagicMock, patch - -import pytest - -from metaphor.common.base_config import OutputConfig -from metaphor.dbt.cloud.client import DbtRun -from metaphor.dbt.cloud.config import DbtCloudConfig -from metaphor.dbt.cloud.discovery_api.queries.get_job_tests import GetJobTests -from metaphor.dbt.cloud.extractor import DbtCloudExtractor -from metaphor.models.metadata_change_event import ( - DataMonitorStatus, - DataMonitorTarget, - DataPlatform, - Dataset, - DatasetLogicalID, - VirtualView, - VirtualViewLogicalID, - VirtualViewType, -) - - -@patch("metaphor.dbt.cloud.extractor.DiscoveryAPI") -@patch("metaphor.dbt.cloud.extractor.get_data_platform_from_manifest") -@patch("metaphor.dbt.cloud.extractor.DbtAdminAPIClient") -@patch("metaphor.dbt.cloud.extractor.DbtExtractor") -@pytest.mark.asyncio -async def test_extractor( - mock_dbt_extractor_class: MagicMock, - mock_client_class: MagicMock, - mock_get_data_platform_from_manifest: MagicMock, - mock_discovery_api_class: MagicMock, -): - mock_client = MagicMock() - mock_client.get_last_successful_run = MagicMock( - side_effect=( - DbtRun(run_id=3333, project_id=4444, job_id=2222), - DbtRun(run_id=7777, project_id=6666, job_id=8888), - DbtRun(run_id=3333, project_id=4444, job_id=2222), - ) - ) - mock_client.get_project_jobs = MagicMock(side_effect=[[8888], [2222]]) - - def mock_is_job_included(job_id: int) -> bool: - return job_id != 3333 - - mock_client.is_job_included = mock_is_job_included - mock_client.get_snowflake_account = MagicMock(return_value="snowflake_account") - mock_client.get_run_artifact = MagicMock(return_value="tempfile") - - mock_get_data_platform_from_manifest.return_value = DataPlatform.UNKNOWN - - mock_dbt_extractor = MagicMock() - - async def fake_extract(): - return [] - - mock_dbt_extractor.extract.side_effect = fake_extract - - mock_client_class.return_value = mock_client - mock_dbt_extractor_class.return_value = mock_dbt_extractor - - mock_discovery_api = MagicMock() - mock_discovery_api.get_all_job_tests.return_value = [] - - config = DbtCloudConfig( - output=OutputConfig(), - account_id=1111, - job_ids={2222, 3333}, - project_ids={6666, 4444}, - environment_ids={1}, - base_url="https://cloud.metaphor.getdbt.com", - service_token="service_token", - ) - extractor = DbtCloudExtractor(config) - await extractor.extract() - assert sorted(extractor._entities.keys()) == [3333, 7777] - - -@patch("metaphor.dbt.cloud.extractor.get_data_platform_from_manifest") -@patch("metaphor.dbt.cloud.extractor.DbtAdminAPIClient") -@patch("metaphor.dbt.cloud.extractor.DbtExtractor") -@pytest.mark.asyncio -async def test_extractor_bad_source( - mock_dbt_extractor_class: MagicMock, - mock_client_class: MagicMock, - mock_get_data_platform_from_manifest: MagicMock, -): - mock_client = MagicMock() - mock_client.get_last_successful_run = MagicMock( - side_effect=( - DbtRun(run_id=3333, project_id=4444, job_id=2222), - DbtRun(run_id=7777, project_id=6666, job_id=8888), - DbtRun(run_id=3333, project_id=4444, job_id=2222), - ) - ) - mock_client.get_project_jobs = MagicMock(side_effect=[[8888], [2222]]) - mock_client.get_snowflake_account = MagicMock(return_value="snowflake_account") - mock_client.get_run_artifact = MagicMock(return_value="tempfile") - - mock_get_data_platform_from_manifest.return_value = DataPlatform.UNKNOWN - - mock_dbt_extractor = MagicMock() - - async def fake_extract(): - raise ValueError() - - mock_dbt_extractor.extract.side_effect = fake_extract - - mock_client_class.return_value = mock_client - mock_dbt_extractor_class.return_value = mock_dbt_extractor - - config = DbtCloudConfig( - output=OutputConfig(), - account_id=1111, - job_ids={2222}, - project_ids={6666, 4444}, - base_url="https://cloud.metaphor.getdbt.com", - service_token="service_token", - ) - extractor = DbtCloudExtractor(config) - await extractor.extract() - assert not extractor._entities - - -@patch("metaphor.dbt.cloud.extractor.DiscoveryAPI") -def test_extend_test_run_results_entities(mock_discovery_api_class: MagicMock): - config = DbtCloudConfig( - output=OutputConfig(), - account_id=1111, - job_ids={2222}, - project_ids={6666, 4444}, - base_url="https://cloud.metaphor.getdbt.com", - service_token="service_token", - ) - extractor = DbtCloudExtractor(config) - mock_discovery_api = MagicMock() - mock_discovery_api.get_all_job_model_names.return_value = { - "model.foo.bar": "db.sch.tab" - } - - def fake_get_all_job_tests(job_id: int): - return GetJobTests.model_validate( - { - "job": { - "tests": [ - { - "uniqueId": "1", - "name": "test1", - "columnName": "col1", - "status": "pass", - "executeCompletedAt": datetime.now(), - "dependsOn": [ - "model.foo.bar", - ], - }, - { - "uniqueId": "2", - "name": "test2", - "columnName": "col2", - "status": "error", - "executeCompletedAt": datetime.now(), - "dependsOn": [ - "model.foo.bar", - ], - }, - ] - } - } - ) - - mock_discovery_api.get_all_job_tests.side_effect = fake_get_all_job_tests - mock_discovery_api_class.return_value = mock_discovery_api - entities = [ - VirtualView( - logical_id=VirtualViewLogicalID( - name="foo.bar", - type=VirtualViewType.DBT_MODEL, - ), - ), - Dataset( - logical_id=DatasetLogicalID( - name="a.b.c", - platform=DataPlatform.UNKNOWN, - ) - ), - ] - - res = extractor._extend_test_run_results_entities( - DataPlatform.UNKNOWN, None, 2222, entities - ) - assert len(res) == 3 - dataset = next( - x for x in res if isinstance(x, Dataset) and x.data_quality is not None - ) - assert dataset.data_quality and dataset.data_quality.monitors - assert dataset.data_quality.monitors[0].status == DataMonitorStatus.PASSED - assert dataset.data_quality.monitors[0].targets == [ - DataMonitorTarget( - column="col1", dataset="DATASET~083603875008F6D0B4981A524F67A549" - ) - ] - assert dataset.data_quality.monitors[1].status == DataMonitorStatus.ERROR - assert dataset.data_quality.monitors[1].targets == [ - DataMonitorTarget( - column="col2", dataset="DATASET~083603875008F6D0B4981A524F67A549" - ) - ]