Skip to content

Commit

Permalink
Crawler should generate run metadata if output is valid (#724)
Browse files Browse the repository at this point in the history
* Gen runMetadata even passing an invalid config

* Add tests

* Bump version

* Add tests

* Address reviewer comments
  • Loading branch information
elic-eon authored Dec 13, 2023
1 parent 085aa80 commit c2c8f31
Show file tree
Hide file tree
Showing 47 changed files with 304 additions and 125 deletions.
10 changes: 5 additions & 5 deletions metaphor/azure_data_factory/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ class Factory:
class AzureDataFactoryExtractor(BaseExtractor):
"""Azure Data Factory metadata extractor"""

_description = "Azure Data Factory metadata crawler"
_platform = Platform.GLUE

@staticmethod
def from_config_file(config_file: str) -> "AzureDataFactoryExtractor":
return AzureDataFactoryExtractor(
AzureDataFactoryRunConfig.from_yaml_file(config_file)
)

def __init__(
self,
config: AzureDataFactoryRunConfig,
):
super().__init__(config, "Azure Data Factory metadata crawler", Platform.GLUE)
def __init__(self, config: AzureDataFactoryRunConfig):
super().__init__(config)
self._config = config

self._datasets: Dict[str, Dataset] = {}
Expand Down
7 changes: 5 additions & 2 deletions metaphor/bigquery/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,15 @@
class BigQueryExtractor(BaseExtractor):
"""BigQuery metadata extractor"""

_description = "BigQuery metadata crawler"
_platform = Platform.BIGQUERY

@staticmethod
def from_config_file(config_file: str) -> "BigQueryExtractor":
return BigQueryExtractor(BigQueryRunConfig.from_yaml_file(config_file))

def __init__(self, config: BigQueryRunConfig) -> None:
super().__init__(config, "BigQuery metadata crawler", Platform.BIGQUERY)
super().__init__(config)
self._config = config
self._credentials = get_credentials(config)
self._project_ids = config.project_ids
Expand All @@ -92,7 +95,7 @@ async def extract(self) -> Collection[ENTITY_TYPES]:

return [*self._datasets, *self._query_logs]

def _extract_project(self, project_id):
def _extract_project(self, project_id) -> None:
logger.info(f"Fetching metadata from BigQuery project {project_id}")

client = build_client(project_id, self._credentials)
Expand Down
5 changes: 4 additions & 1 deletion metaphor/bigquery/lineage/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@
class BigQueryLineageExtractor(BaseExtractor):
"""BigQuery lineage metadata extractor"""

_description = "BigQuery data lineage crawler"
_platform = Platform.BIGQUERY

@staticmethod
def from_config_file(config_file: str) -> "BigQueryLineageExtractor":
return BigQueryLineageExtractor(
BigQueryLineageRunConfig.from_yaml_file(config_file)
)

def __init__(self, config: BigQueryLineageRunConfig):
super().__init__(config, "BigQuery data lineage crawler", Platform.BIGQUERY)
super().__init__(config)
self._config = config
self._credentials = get_credentials(config)
self._project_ids = config.project_ids
Expand Down
5 changes: 4 additions & 1 deletion metaphor/bigquery/profile/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@
class BigQueryProfileExtractor(BaseExtractor):
"""BigQuery data profile extractor"""

_description = "BigQuery data profile crawler"
_platform = Platform.BIGQUERY

@staticmethod
def from_config_file(config_file: str) -> "BigQueryProfileExtractor":
return BigQueryProfileExtractor(
BigQueryProfileRunConfig.from_yaml_file(config_file)
)

def __init__(self, config: BigQueryProfileRunConfig):
super().__init__(config, "BigQuery data profile crawler", Platform.BIGQUERY)
super().__init__(config)
self._config = config
self._credentials = get_credentials(config)
self._project_ids = config.project_ids
Expand Down
2 changes: 1 addition & 1 deletion metaphor/common/base_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class OutputConfig:
file: Optional[FileSinkConfig] = None


@dataclass(config=ConnectorConfig)
@dataclass()
class BaseConfig:
"""Base class for runtime parameters
Expand Down
12 changes: 6 additions & 6 deletions metaphor/common/base_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
class BaseExtractor(ABC):
"""Base class for metadata extractors"""

_platform: Optional[Platform]
_description: str

@staticmethod
@abstractmethod
def from_config_file(config_file: str) -> "BaseExtractor":
Expand All @@ -21,23 +24,20 @@ def from_config_file(config_file: str) -> "BaseExtractor":
async def extract(self) -> Collection[ENTITY_TYPES]:
"""Extract metadata and build messages, should be overridden"""

def __init__(
self, config: BaseConfig, description: str, platform: Optional[Platform]
) -> None:
def __init__(self, config: BaseConfig) -> None:
self._output = config.output
self._description = description
self._platform = platform

def run_async(self) -> Collection[ENTITY_TYPES]:
return asyncio.run(self.extract())

def run(self) -> List[MetadataChangeEvent]:
"""Callable function to extract metadata and send/post messages"""

return run_connector(
(events, _) = run_connector(
connector_func=self.run_async,
name=EventUtil.class_fqcn(self.__class__),
platform=self._platform,
description=self._description,
file_sink_config=self._output.file,
)
return events
18 changes: 16 additions & 2 deletions metaphor/common/cli.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
from typing import Type

from metaphor.common.base_config import BaseConfig
from metaphor.common.base_extractor import BaseExtractor
from metaphor.common.event_util import EventUtil
from metaphor.common.runner import run_connector


def cli_main(extractor_cls: Type[BaseExtractor], config_file: str):
extractor = extractor_cls.from_config_file(config_file)
extractor.run()
base_config = BaseConfig.from_yaml_file(config_file)

def connector_func():
extractor = extractor_cls.from_config_file(config_file)
return extractor.run_async()

return run_connector(
connector_func=connector_func,
name=EventUtil.class_fqcn(extractor_cls),
description=extractor_cls._description,
platform=extractor_cls._platform,
file_sink_config=base_config.output.file,
)
12 changes: 6 additions & 6 deletions metaphor/common/runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import traceback
from datetime import datetime
from typing import Callable, Collection, List, Optional
from typing import Callable, Collection, List, Optional, Tuple

from metaphor.common.event_util import ENTITY_TYPES, EventUtil
from metaphor.common.file_sink import FileSink, FileSinkConfig, S3StorageConfig
Expand All @@ -14,10 +14,10 @@
def run_connector(
connector_func: Callable[[], Collection[ENTITY_TYPES]],
name: str,
platform: Platform,
description: str,
platform: Optional[Platform] = None,
file_sink_config: Optional[FileSinkConfig] = None,
) -> List[MetadataChangeEvent]:
) -> Tuple[List[MetadataChangeEvent], CrawlerRunMetadata]:
"""Run a connector and write the resulting events to files and/or API.
Parameters
Expand All @@ -26,10 +26,10 @@ def run_connector(
The connector function to run
name : str
Name of the connector
platform : Platform
Platform of the connector
description : str
Textual description of the connector
platform : Optional[Platform]
Platform of the connector
file_sink_config : Optional[FileSinkConfig]
Optional configuration for outputting events to files or cloud storage
Expand Down Expand Up @@ -80,7 +80,7 @@ def run_connector(
file_sink.sink_metadata(run_metadata)
file_sink.sink_logs()

return events
return events, run_metadata


def metaphor_file_sink_config(
Expand Down
5 changes: 4 additions & 1 deletion metaphor/custom/data_quality/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@
class CustomDataQualityExtractor(BaseExtractor):
"""Custom data quality extractor"""

_description = "Custom data quality connector"
_platform = None

@staticmethod
def from_config_file(config_file: str) -> "CustomDataQualityExtractor":
return CustomDataQualityExtractor(
CustomDataQualityConfig.from_yaml_file(config_file)
)

def __init__(self, config: CustomDataQualityConfig) -> None:
super().__init__(config, "Custom data quality connector", None)
super().__init__(config)
self._datasets = config.datasets

async def extract(self) -> List[MetadataChangeEvent]:
Expand Down
5 changes: 4 additions & 1 deletion metaphor/custom/governance/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@
class CustomGovernanceExtractor(BaseExtractor):
"""Custom governance extractor"""

_description = "Custom governance connector"
_platform = None

@staticmethod
def from_config_file(config_file: str) -> "CustomGovernanceExtractor":
return CustomGovernanceExtractor(
CustomGovernanceConfig.from_yaml_file(config_file)
)

def __init__(self, config: CustomGovernanceConfig) -> None:
super().__init__(config, "Custom governance connector", None)
super().__init__(config)
self._datasets = config.datasets

async def extract(self) -> List[MetadataChangeEvent]:
Expand Down
5 changes: 4 additions & 1 deletion metaphor/custom/lineage/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
class CustomLineageExtractor(BaseExtractor):
"""Custom lineage extractor"""

_description = "Custom data lineage connector"
_platform = None

@staticmethod
def from_config_file(config_file: str) -> "CustomLineageExtractor":
return CustomLineageExtractor(CustomLineageConfig.from_yaml_file(config_file))

def __init__(self, config: CustomLineageConfig) -> None:
super().__init__(config, "Custom data lineage connector", None)
super().__init__(config)
self._lineages = config.lineages

async def extract(self) -> List[MetadataChangeEvent]:
Expand Down
5 changes: 4 additions & 1 deletion metaphor/custom/metadata/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
class CustomMetadataExtractor(BaseExtractor):
"""Custom metadata extractor"""

_description = "Custom metadata connector"
_platform = None

@staticmethod
def from_config_file(config_file: str) -> "CustomMetadataExtractor":
return CustomMetadataExtractor(CustomMetadataConfig.from_yaml_file(config_file))

def __init__(self, config: CustomMetadataConfig) -> None:
super().__init__(config, "Custom metadata connector", None)
super().__init__(config)
self._datasets = config.datasets

async def extract(self) -> List[MetadataChangeEvent]:
Expand Down
5 changes: 4 additions & 1 deletion metaphor/custom/query_attributions/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@


class CustomQueryAttributionsExtractor(BaseExtractor):
_description = "Custom query attribution connector"
_platform = None

@staticmethod
def from_config_file(config_file: str) -> "CustomQueryAttributionsExtractor":
return CustomQueryAttributionsExtractor(
CustomQueryAttributionsConfig.from_yaml_file(config_file)
)

def __init__(self, config: CustomQueryAttributionsConfig) -> None:
super().__init__(config, "Custom query attribution connector", None)
super().__init__(config)
self._config = config

async def extract(self) -> List[MetadataChangeEvent]:
Expand Down
5 changes: 4 additions & 1 deletion metaphor/dbt/cloud/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ class DbtCloudExtractor(BaseExtractor):
dbt cloud metadata extractor
"""

_description = "dbt cloud metadata crawler"
_platform = Platform.DBT_MODEL

@staticmethod
def from_config_file(config_file: str) -> "DbtCloudExtractor":
return DbtCloudExtractor(DbtCloudConfig.from_yaml_file(config_file))

def __init__(self, config: DbtCloudConfig):
super().__init__(config, "dbt cloud metadata crawler", Platform.DBT_MODEL)
super().__init__(config)
self._account_id = config.account_id
self._job_ids = config.job_ids
self._service_token = config.service_token
Expand Down
5 changes: 4 additions & 1 deletion metaphor/dbt/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ class DbtExtractor(BaseExtractor):
Using manifest v3 and catalog v1 schema, but backward-compatible with older schema versions
"""

_description = "dbt metadata crawler"
_platform = Platform.DBT_MODEL

@staticmethod
def from_config_file(config_file: str) -> "DbtExtractor":
return DbtExtractor(DbtRunConfig.from_yaml_file(config_file))

def __init__(self, config: DbtRunConfig):
super().__init__(config, "dbt metadata crawler", Platform.DBT_MODEL)
super().__init__(config)
self._manifest = config.manifest
self._run_results = config.run_results
self._config = config
Expand Down
8 changes: 6 additions & 2 deletions metaphor/fivetran/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,15 @@ class SchemaMetadata:
class FivetranExtractor(BaseExtractor):
"""Fivetran metadata extractor"""

_description = "Fivetran metadata crawler"
_platform = Platform.FIVETRAN

@staticmethod
def from_config_file(config_file: str) -> "FivetranExtractor":
return FivetranExtractor(FivetranRunConfig.from_yaml_file(config_file))

def __init__(self, config: FivetranRunConfig) -> None:
super().__init__(config, "Fivetran metadata crawler", Platform.GLUE)

super().__init__(config)
self._auth = HTTPBasicAuth(username=config.api_key, password=config.api_secret)
self._datasets: Dict[str, Dataset] = {}
self._source_datasets: Dict[str, Dataset] = {}
Expand Down Expand Up @@ -536,6 +538,8 @@ def _get_all(self, url: str, type_: Type[DataT]) -> List[DataT]:
params=query,
)
except ApiError as error:
if error.status_code == 401:
raise error
logger.error(error)
return result

Expand Down
4 changes: 3 additions & 1 deletion metaphor/glue/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ class GlueExtractor(BaseExtractor):
"""Glue metadata extractor"""

BYTES_PER_MEGABYTES = 1024 * 1024
_description = "Glue metadata crawler"
_platform = Platform.GLUE

@staticmethod
def from_config_file(config_file: str) -> "GlueExtractor":
return GlueExtractor(GlueRunConfig.from_yaml_file(config_file))

def __init__(self, config: GlueRunConfig) -> None:
super().__init__(config, "Glue metadata crawler", Platform.GLUE)
super().__init__(config)
self._datasets: Dict[str, Dataset] = {}
self._aws_config = config.aws

Expand Down
6 changes: 4 additions & 2 deletions metaphor/kafka/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
class KafkaExtractor(BaseExtractor):
"""Kafka metadata extractor"""

_description = "Kafka metadata crawler"
_platform = Platform.KAFKA

@staticmethod
def from_config_file(config_file: str) -> "KafkaExtractor":
return KafkaExtractor(KafkaConfig.from_yaml_file(config_file))

def __init__(self, config: KafkaConfig) -> None:
super().__init__(config, "Kafka metadata crawler", Platform.KAFKA)

super().__init__(config)
self._config = config
self._filter = config.filter.normalize().merge(DEFAULT_FILTER)
self._admin_client = KafkaExtractor.init_admin_client(self._config)
Expand Down
Loading

0 comments on commit c2c8f31

Please sign in to comment.