Skip to content

Commit

Permalink
dbt cloud crawler is not extract test status (#962)
Browse files Browse the repository at this point in the history
* Extract test status from tests connection

Remove

* Bump version
  • Loading branch information
elic-eon authored Aug 23, 2024
1 parent 070b65d commit c350363
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 138 deletions.
2 changes: 2 additions & 0 deletions metaphor/dbt/cloud/discovery_api/generated/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
AncestorNodeType,
AppliedModelSortField,
FreshnessStatus,
OwnerResourceType,
PackageResourceType,
ReleaseVersion,
ResourceNodeType,
Expand Down Expand Up @@ -141,6 +142,7 @@
"MacroDefinitionFilter",
"ModelAppliedFilter",
"ModelDefinitionFilter",
"OwnerResourceType",
"PackageResourceType",
"ReleaseVersion",
"ResourceNodeType",
Expand Down
2 changes: 2 additions & 0 deletions metaphor/dbt/cloud/discovery_api/generated/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ def get_job_run_tests(
dependsOn
name
uniqueId
status
executeCompletedAt
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions metaphor/dbt/cloud/discovery_api/generated/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@ class AppliedModelSortField(str, Enum):
class FreshnessStatus(str, Enum):
Error = "Error"
Pass = "Pass"
Unknown = "Unknown"
Warn = "Warn"


class OwnerResourceType(str, Enum):
exposure = "exposure"
group = "group"


class PackageResourceType(str, Enum):
macro = "macro"
model = "model"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Generated by ariadne-codegen
# Source: queries.graphql

from typing import List, Optional
from typing import Any, List, Optional

from pydantic import Field

Expand All @@ -23,6 +23,8 @@ class GetJobRunTestsJobTests(BaseModel):
depends_on: List[str] = Field(alias="dependsOn")
name: Optional[str]
unique_id: str = Field(alias="uniqueId")
status: Optional[str]
execute_completed_at: Optional[Any] = Field(alias="executeCompletedAt")


GetJobRunTests.model_rebuild()
Expand Down
2 changes: 2 additions & 0 deletions metaphor/dbt/cloud/discovery_api/queries.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ query GetJobRunTests($jobId: BigInt!, $runId: BigInt!) {
dependsOn
name
uniqueId
status
executeCompletedAt
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions metaphor/dbt/cloud/discovery_api/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ type AppliedState {
"""
lastUpdatedAt: DateTime

"""The git sha of the project when the applied state was last updated."""
latestGitSha: String

"""Project Lineage."""
lineage(
"""Lineage Filter"""
Expand Down Expand Up @@ -122,6 +125,10 @@ type AppliedState {
"""Sort by"""
sort: AppliedModelSort
): ModelAppliedStateNodeConnection!
owners(
"""Filter by resource type"""
resource: OwnerResourceType!
): [ExposureOwner!]!

"""List of packages used in the environment"""
packages(
Expand Down Expand Up @@ -790,6 +797,9 @@ type ExposureDefinitionNode implements EnvironmentDefinitionNode {
"""The fully qualified name of this exposure"""
fqn: [String!]!

"""Freshness status of the source"""
freshnessStatus: FreshnessStatus

"""Exposure Label"""
label: String

Expand Down Expand Up @@ -1000,6 +1010,15 @@ type ExposureNode implements CloudArtifactInterface & NodeInterface {
url: String
}

"""The owners of exposure"""
type ExposureOwner {
"""The email of the owner"""
email: String!

"""The name of the owner"""
name: String!
}

"""Public model from another project"""
type ExternalModelNode implements EnvironmentAppliedNestedNode & EnvironmentDefinitionNestedNode {
"""The account ID of this node"""
Expand Down Expand Up @@ -1054,6 +1073,7 @@ type ExternalModelNode implements EnvironmentAppliedNestedNode & EnvironmentDefi
enum FreshnessStatus {
Error
Pass
Unknown
Warn
}

Expand Down Expand Up @@ -2850,6 +2870,12 @@ interface NodeInterface {
uniqueId: String!
}

"""The type of owner"""
enum OwnerResourceType {
exposure
group
}

"""The type of package resource"""
enum PackageResourceType {
macro
Expand Down
53 changes: 11 additions & 42 deletions metaphor/dbt/cloud/parser/dbt_test_parser.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
from datetime import datetime
from typing import Dict, List, Optional
from typing import Any, Dict, Optional

from metaphor.common.logger import get_logger
from metaphor.dbt.cloud.discovery_api.generated.get_job_run_models import (
GetJobRunModelsJobModels as Model,
)
from metaphor.dbt.cloud.discovery_api.generated.get_job_run_models import (
GetJobRunModelsJobModelsRunResults as RunResult,
)
from metaphor.dbt.cloud.discovery_api.generated.get_job_run_tests import (
GetJobRunTestsJobTests as Test,
)
Expand Down Expand Up @@ -70,8 +67,6 @@ def parse(
if model_unique_id not in models:
return

model = models[model_unique_id]

dbt_test = DbtTest(
name=test.name,
unique_id=test.unique_id,
Expand All @@ -84,19 +79,16 @@ def parse(

init_dbt_tests(self._virtual_views, model_unique_id).append(dbt_test)

if model.run_results:
self._parse_test_run_result(
test, models[model_unique_id], model.run_results
)
self._parse_test_run_result(test, models[model_unique_id])

@staticmethod
def _get_run_result_executed_completed_at(
run_result: RunResult,
def _parse_date_time_from_result(
field: Optional[Any],
) -> Optional[datetime]:
if isinstance(run_result.execute_completed_at, datetime):
return run_result.execute_completed_at
if isinstance(run_result.execute_completed_at, str):
completed_at = run_result.execute_completed_at
if isinstance(field, datetime):
return field
if isinstance(field, str):
completed_at = field
if completed_at.endswith("Z"):
# Convert Zulu to +00:00
completed_at = f"{completed_at[:-1]}+00:00"
Expand All @@ -110,36 +102,13 @@ def _parse_test_run_result(
self,
test: Test,
model: Model,
run_results: List[RunResult],
) -> None:
model_name = model.alias or model.name
if model.database is None or model.schema_ is None or model_name is None:
logger.warning(f"Skipping model without name, {model.unique_id}")
return

if not test.name:
return

if not run_results:
logger.warning(f"Skipping test without run_results, {model.unique_id}")
return

def run_result_key(run_result: RunResult):
completed_at = self._get_run_result_executed_completed_at(run_result)
if not completed_at:
return 0
return completed_at.timestamp()

run_result = next(
(
n
for n in sorted(run_results, key=run_result_key, reverse=True)
if n.status
),
None,
)
if run_result is None or run_result.status is None:
logger.warning(f"No valid run_result found: {run_results}")
if not test.name or test.status is None or test.execute_completed_at is None:
return

dataset = init_dataset(
Expand All @@ -152,6 +121,6 @@ def run_result_key(run_result: RunResult):
model.unique_id,
)

status = dbt_run_result_output_data_monitor_status_map[run_result.status]
last_run = self._get_run_result_executed_completed_at(run_result)
status = dbt_run_result_output_data_monitor_status_map[test.status]
last_run = self._parse_date_time_from_result(test.execute_completed_at)
add_data_quality_monitor(dataset, test.name, test.column_name, status, last_run)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.85"
version = "0.14.86"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
Loading

0 comments on commit c350363

Please sign in to comment.