diff --git a/metaphor/common/event_util.py b/metaphor/common/event_util.py index ed0e3530..bafb187e 100644 --- a/metaphor/common/event_util.py +++ b/metaphor/common/event_util.py @@ -1,7 +1,7 @@ import json import logging from importlib import resources -from typing import Union +from typing import Optional, Union from jsonschema import ValidationError from jsonschema.validators import validator_for @@ -80,14 +80,14 @@ def build_event(entity: ENTITY_TYPES): else: raise TypeError(f"invalid entity type {type(entity)}") - def validate_message(self, message: dict) -> bool: + def validate_message(self, message: dict) -> Optional[dict]: """Validate message against json schema""" try: self._validator.validate(message) except ValidationError as e: logger.error(f"MCE validation error: {e}. Message: {message}") - return False - return True + return None + return message @staticmethod def clean_nones(value): diff --git a/metaphor/common/sink.py b/metaphor/common/sink.py index 94e6a3d2..859ca2cf 100644 --- a/metaphor/common/sink.py +++ b/metaphor/common/sink.py @@ -1,5 +1,6 @@ import logging from abc import ABC, abstractmethod +from concurrent.futures import ThreadPoolExecutor from typing import Generator, List from metaphor.models.metadata_change_event import MetadataChangeEvent @@ -19,7 +20,12 @@ def sink(self, events: List[MetadataChangeEvent]) -> bool: records = [event_util.trim_event(e) for e in events] logger.info("validating MCE records") - valid_records = [r for r in records if event_util.validate_message(r)] + with ThreadPoolExecutor() as tpe: + valid_records = [ + r + for r in tpe.map(event_util.validate_message, records) + if r is not None + ] if len(valid_records) == 0: return False diff --git a/pyproject.toml b/pyproject.toml index 3f632aff..0dc31c73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.13.61" +version = "0.13.62" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "]