diff --git a/metaphor/common/event_util.py b/metaphor/common/event_util.py index 9a77b479..e89f5adc 100644 --- a/metaphor/common/event_util.py +++ b/metaphor/common/event_util.py @@ -3,7 +3,8 @@ from importlib import resources from typing import Union -from jsonschema import ValidationError, validate +from jsonschema import ValidationError +from jsonschema.validators import validator_for from metaphor import models # type: ignore from metaphor.models.metadata_change_event import ( @@ -26,41 +27,43 @@ class EventUtil: """Event utilities""" - with resources.open_text(models, "metadata_change_event.json") as f: - schema = json.load(f) + def __init__(self): + with resources.open_text(models, "metadata_change_event.json") as f: + mce_schema = json.load(f) - def __init__(self, extractor_class="", server=""): - self._extractor_class = extractor_class - self._server = server + validator_class = validator_for(mce_schema) + validator_class.check_schema(mce_schema) + self._validator = validator_class(mce_schema) - def _build_event(self, **kwargs) -> MetadataChangeEvent: + @staticmethod + def _build_event(**kwargs) -> MetadataChangeEvent: """Create an MCE""" return MetadataChangeEvent(**kwargs) - def build_event(self, entity: ENTITY_TYPES): + @staticmethod + def build_event(entity: ENTITY_TYPES): """Build MCE given an entity""" if type(entity) is Dashboard: - return self._build_event(dashboard=entity) + return EventUtil._build_event(dashboard=entity) elif type(entity) is Dataset: - return self._build_event(dataset=entity) + return EventUtil._build_event(dataset=entity) elif type(entity) is Metric: - return self._build_event(metric=entity) + return EventUtil._build_event(metric=entity) elif type(entity) is Pipeline: - return self._build_event(pipeline=entity) + return EventUtil._build_event(pipeline=entity) elif type(entity) is KnowledgeCard: - return self._build_event(knowledge_card=entity) + return EventUtil._build_event(knowledge_card=entity) elif type(entity) is VirtualView: - return self._build_event(virtual_view=entity) + return EventUtil._build_event(virtual_view=entity) elif type(entity) is QueryLogs: - return self._build_event(query_logs=entity) + return EventUtil._build_event(query_logs=entity) else: raise TypeError(f"invalid entity type {type(entity)}") - @staticmethod - def validate_message(message: dict) -> bool: + def validate_message(self, message: dict) -> bool: """Validate message against json schema""" try: - validate(message, EventUtil.schema) + self._validator.validate(message) except ValidationError as e: logger.error(f"MCE validation error: {e}. Message: {message}") return False diff --git a/metaphor/common/runner.py b/metaphor/common/runner.py index dde5daa6..1fca3923 100644 --- a/metaphor/common/runner.py +++ b/metaphor/common/runner.py @@ -64,8 +64,7 @@ def run_connector( f"Ended running with {run_status} at {end_time}, fetched {entity_count} entities, took {format((end_time - start_time).total_seconds(), '.1f')}s" ) - event_util = EventUtil(name) - events = [event_util.build_event(entity) for entity in entities] + events = [EventUtil.build_event(entity) for entity in entities] run_metadata = CrawlerRunMetadata( crawler_name=name, diff --git a/metaphor/common/sink.py b/metaphor/common/sink.py index 817c9f85..94e6a3d2 100644 --- a/metaphor/common/sink.py +++ b/metaphor/common/sink.py @@ -15,8 +15,12 @@ class Sink(ABC): def sink(self, events: List[MetadataChangeEvent]) -> bool: """Sink MCE messages to the destination""" - records = [EventUtil.trim_event(e) for e in events] - valid_records = [r for r in records if EventUtil.validate_message(r)] + event_util = EventUtil() + records = [event_util.trim_event(e) for e in events] + + logger.info("validating MCE records") + valid_records = [r for r in records if event_util.validate_message(r)] + if len(valid_records) == 0: return False diff --git a/poetry.lock b/poetry.lock index de4e2d49..c859ac70 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2611,13 +2611,13 @@ files = [ [[package]] name = "jsonschema" -version = "4.19.0" +version = "4.19.1" description = "An implementation of JSON Schema validation for Python" optional = false python-versions = ">=3.8" files = [ - {file = "jsonschema-4.19.0-py3-none-any.whl", hash = "sha256:043dc26a3845ff09d20e4420d6012a9c91c9aa8999fa184e7efcfeccb41e32cb"}, - {file = "jsonschema-4.19.0.tar.gz", hash = "sha256:6e1e7569ac13be8139b2dd2c21a55d350066ee3f80df06c608b398cdc6f30e8f"}, + {file = "jsonschema-4.19.1-py3-none-any.whl", hash = "sha256:cd5f1f9ed9444e554b38ba003af06c0a8c2868131e56bfbef0550fb450c0330e"}, + {file = "jsonschema-4.19.1.tar.gz", hash = "sha256:ec84cc37cfa703ef7cd4928db24f9cb31428a5d0fa77747b8b51a847458e0bbf"}, ] [package.dependencies] @@ -4730,13 +4730,13 @@ files = [ [[package]] name = "sql-metadata" -version = "2.8.0" +version = "2.9.0" description = "Uses tokenized query returned by python-sqlparse and generates query metadata" optional = true -python-versions = ">=3.7.2,<4.0.0" +python-versions = ">=3.8,<4.0" files = [ - {file = "sql_metadata-2.8.0-py3-none-any.whl", hash = "sha256:f0cadad6915ce99ac9869245399d8702536f17b1feba68a2b5763a0f2aa43352"}, - {file = "sql_metadata-2.8.0.tar.gz", hash = "sha256:3897dc881ae00b49d8efa1b0fa8270617fa4f6d818d2a945703f594cecfe93ad"}, + {file = "sql_metadata-2.9.0-py3-none-any.whl", hash = "sha256:82791389ca1b37829c75a7f4cff854352ec8ec19ebcdbf59aa7965698f29b15e"}, + {file = "sql_metadata-2.9.0.tar.gz", hash = "sha256:0243bfc80f93337ee89a438d416b28e5f301deb1e328245e2b974da14321dece"}, ] [package.dependencies] @@ -5405,4 +5405,4 @@ unity-catalog = ["databricks-cli"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<4.0" -content-hash = "abffc210a0a82a32a3226752985ca0be75270e8ce74ee089c628d0a85165cb1b" +content-hash = "44d69fcaa536bdda15a6c8731222d6c44e6b533bd9c2e06bbba7eca4d311313a"