Skip to content

Commit

Permalink
improve MCE validator performance (#611)
Browse files Browse the repository at this point in the history
  • Loading branch information
alyiwang authored Oct 3, 2023
1 parent c06b8de commit 305f2e6
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 30 deletions.
39 changes: 21 additions & 18 deletions metaphor/common/event_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from importlib import resources
from typing import Union

from jsonschema import ValidationError, validate
from jsonschema import ValidationError
from jsonschema.validators import validator_for

from metaphor import models # type: ignore
from metaphor.models.metadata_change_event import (
Expand All @@ -26,41 +27,43 @@
class EventUtil:
"""Event utilities"""

with resources.open_text(models, "metadata_change_event.json") as f:
schema = json.load(f)
def __init__(self):
with resources.open_text(models, "metadata_change_event.json") as f:
mce_schema = json.load(f)

def __init__(self, extractor_class="", server=""):
self._extractor_class = extractor_class
self._server = server
validator_class = validator_for(mce_schema)
validator_class.check_schema(mce_schema)
self._validator = validator_class(mce_schema)

def _build_event(self, **kwargs) -> MetadataChangeEvent:
@staticmethod
def _build_event(**kwargs) -> MetadataChangeEvent:
"""Create an MCE"""
return MetadataChangeEvent(**kwargs)

def build_event(self, entity: ENTITY_TYPES):
@staticmethod
def build_event(entity: ENTITY_TYPES):
"""Build MCE given an entity"""
if type(entity) is Dashboard:
return self._build_event(dashboard=entity)
return EventUtil._build_event(dashboard=entity)
elif type(entity) is Dataset:
return self._build_event(dataset=entity)
return EventUtil._build_event(dataset=entity)
elif type(entity) is Metric:
return self._build_event(metric=entity)
return EventUtil._build_event(metric=entity)
elif type(entity) is Pipeline:
return self._build_event(pipeline=entity)
return EventUtil._build_event(pipeline=entity)
elif type(entity) is KnowledgeCard:
return self._build_event(knowledge_card=entity)
return EventUtil._build_event(knowledge_card=entity)
elif type(entity) is VirtualView:
return self._build_event(virtual_view=entity)
return EventUtil._build_event(virtual_view=entity)
elif type(entity) is QueryLogs:
return self._build_event(query_logs=entity)
return EventUtil._build_event(query_logs=entity)
else:
raise TypeError(f"invalid entity type {type(entity)}")

@staticmethod
def validate_message(message: dict) -> bool:
def validate_message(self, message: dict) -> bool:
"""Validate message against json schema"""
try:
validate(message, EventUtil.schema)
self._validator.validate(message)
except ValidationError as e:
logger.error(f"MCE validation error: {e}. Message: {message}")
return False
Expand Down
3 changes: 1 addition & 2 deletions metaphor/common/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ def run_connector(
f"Ended running with {run_status} at {end_time}, fetched {entity_count} entities, took {format((end_time - start_time).total_seconds(), '.1f')}s"
)

event_util = EventUtil(name)
events = [event_util.build_event(entity) for entity in entities]
events = [EventUtil.build_event(entity) for entity in entities]

run_metadata = CrawlerRunMetadata(
crawler_name=name,
Expand Down
8 changes: 6 additions & 2 deletions metaphor/common/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ class Sink(ABC):

def sink(self, events: List[MetadataChangeEvent]) -> bool:
"""Sink MCE messages to the destination"""
records = [EventUtil.trim_event(e) for e in events]
valid_records = [r for r in records if EventUtil.validate_message(r)]
event_util = EventUtil()
records = [event_util.trim_event(e) for e in events]

logger.info("validating MCE records")
valid_records = [r for r in records if event_util.validate_message(r)]

if len(valid_records) == 0:
return False

Expand Down
16 changes: 8 additions & 8 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 305f2e6

Please sign in to comment.