Skip to content

Commit

Permalink
Use ThreadPoolExecutor to speed up MCE validation time (#703)
Browse files Browse the repository at this point in the history
* Use ThreadPoolExecutor to speed up MCE validation time

* bump version
  • Loading branch information
usefulalgorithm authored Nov 22, 2023
1 parent 23cc220 commit 52612e8
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 6 deletions.
8 changes: 4 additions & 4 deletions metaphor/common/event_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
from importlib import resources
from typing import Union
from typing import Optional, Union

from jsonschema import ValidationError
from jsonschema.validators import validator_for
Expand Down Expand Up @@ -80,14 +80,14 @@ def build_event(entity: ENTITY_TYPES):
else:
raise TypeError(f"invalid entity type {type(entity)}")

def validate_message(self, message: dict) -> bool:
def validate_message(self, message: dict) -> Optional[dict]:
"""Validate message against json schema"""
try:
self._validator.validate(message)
except ValidationError as e:
logger.error(f"MCE validation error: {e}. Message: {message}")
return False
return True
return None
return message

@staticmethod
def clean_nones(value):
Expand Down
8 changes: 7 additions & 1 deletion metaphor/common/sink.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from typing import Generator, List

from metaphor.models.metadata_change_event import MetadataChangeEvent
Expand All @@ -19,7 +20,12 @@ def sink(self, events: List[MetadataChangeEvent]) -> bool:
records = [event_util.trim_event(e) for e in events]

logger.info("validating MCE records")
valid_records = [r for r in records if event_util.validate_message(r)]
with ThreadPoolExecutor() as tpe:
valid_records = [
r
for r in tpe.map(event_util.validate_message, records)
if r is not None
]

if len(valid_records) == 0:
return False
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.13.61"
version = "0.13.62"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down

0 comments on commit 52612e8

Please sign in to comment.