-
Notifications
You must be signed in to change notification settings - Fork 4
/
custom_dq.py
64 lines (51 loc) · 2.01 KB
/
custom_dq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from typing import Collection, Optional
from pydantic.dataclasses import dataclass
from metaphor.common.base_config import BaseConfig
from metaphor.common.base_extractor import BaseExtractor
from metaphor.common.dataclass import ConnectorConfig
from metaphor.common.entity_id import dataset_normalized_name
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.runner import metaphor_file_sink_config, run_connector
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
DataPlatform,
Dataset,
DatasetDataQuality,
DatasetLogicalID,
)
@dataclass(config=ConnectorConfig)
class CustomDQRunConfig(BaseConfig):
pass
class CustomDQConnector(BaseExtractor):
"""
The connector class that produces a list of entities
"""
@staticmethod
def from_config_file(config_file: str) -> "CustomDQConnector":
return CustomDQConnector(CustomDQRunConfig.from_yaml_file(config_file))
def __init__(self, config: CustomDQRunConfig) -> None:
super().__init__(config)
# Run actual DQ tests and fill out DatasetDataQuality
def run_dq_tests(self) -> Optional[DatasetDataQuality]:
pass
async def extract(self) -> Collection[ENTITY_TYPES]:
# Set the upstream aspect
dataset = Dataset(
logical_id=DatasetLogicalID(
name=dataset_normalized_name("db", "schema", "dest"),
platform=DataPlatform.BIGQUERY,
),
data_quality=self.run_dq_tests(),
)
# Return a list of datasets
return [dataset]
# Use the runner to run the connector and output events to the tenant's S3 bucket
connector_name = "custom_dq_connector"
tenant_name = "tenant"
run_connector(
make_connector=lambda: CustomDQConnector.from_config_file(""),
name=connector_name,
platform=Platform.BIGQUERY,
description="This is a custom connector made by Acme, Inc.",
file_sink_config=metaphor_file_sink_config(tenant_name, connector_name),
)