Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sc-24626] Add query logs MCE count to CrawlerRunMetadata #782

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions metaphor/common/file_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,17 @@ def __init__(
self.mces_per_batch = batch_size_count
self.bytes_per_batch = batch_size_bytes # FIXME our logic for handling this isn't that precise, but should be close enough

self.logs: List[QueryLog] = []
self.mces: List[dict] = []
self.logs_count: int = 0
self.mces_count: int = 0
self.completed_batches = 0
self.total_mces_wrote: int = 0
"""
Total number of QueryLogs MCEs written.
"""

# INTERNALS
self._logs: List[QueryLog] = []
self._mces: List[dict] = []
self._logs_count: int = 0
self._mces_count: int = 0
self.logs_per_mce = logs_per_mce
self._entered = False

Expand All @@ -78,30 +84,33 @@ def __enter__(self):
return self

def __exit__(self, _exception_type, _exception_value, _traceback):
if self.logs:
if self._logs:
self._finalize_current_mce()
if self.mces:
if self._mces:
self._finalize_current_batch()

self._entered = False
if self.completed_batches:
logger.info(f"Wrote {self.completed_batches} batches of QueryLogs MCE")
logger.info(
f"Wrote {self.total_mces_wrote} QueryLogs MCEs into {self.completed_batches} batches"
)

def _finalize_current_mce(self) -> None:
self.mces.append(EventUtil.build_then_trim(QueryLogs(logs=self.logs)))
self.logs_count = 0
self.logs.clear()
self.mces_count += 1
self._mces.append(EventUtil.build_then_trim(QueryLogs(logs=self._logs)))
self._logs_count = 0
self._logs.clear()
self._mces_count += 1
self.total_mces_wrote += 1

def _finalize_current_batch(self) -> None:
# No need to validate mce
self.storage.write_file(
f"{self.path}/query_logs-{self.completed_batches}.json",
json.dumps(self.mces),
json.dumps(self._mces),
)
self.completed_batches += 1
self.mces_count = 0
self.mces.clear()
self._mces_count = 0
self._mces.clear()
self.batch_bytes = 0

def write_query_log(self, query_log: QueryLog) -> None:
Expand All @@ -110,17 +119,17 @@ def write_query_log(self, query_log: QueryLog) -> None:
"This method can only be called when QueryLogSink is in a managed context"
)
if (
self.logs_count >= self.logs_per_mce
self._logs_count >= self.logs_per_mce
or self.batch_bytes >= self.bytes_per_batch
):
self._finalize_current_mce()
if (
self.mces_count >= self.mces_per_batch
self._mces_count >= self.mces_per_batch
or self.batch_bytes >= self.bytes_per_batch
):
self._finalize_current_batch()
self.logs.append(query_log)
self.logs_count += 1
self._logs.append(query_log)
self._logs_count += 1
self.batch_bytes += len(json.dumps(query_log.to_dict()))


Expand Down
4 changes: 3 additions & 1 deletion metaphor/common/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,15 @@ def run_connector(
if file_sink_config is not None:
file_sink = FileSink(file_sink_config)
file_sink.write_events(events)
file_sink.write_metadata(run_metadata)
file_sink.write_execution_logs()

with file_sink.get_query_log_sink() as query_log_sink:
for query_log in connector.collect_query_logs():
query_log_sink.write_query_log(query_log)
if run_metadata.entity_count is not None:
run_metadata.entity_count += float(query_log_sink.total_mces_wrote)

file_sink.write_metadata(run_metadata)
return events, run_metadata


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.13.124"
version = "0.13.125"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
30 changes: 28 additions & 2 deletions tests/common/test_runner.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from typing import Collection, List
import tempfile
from typing import Collection, Iterator, List

from metaphor.common.base_config import BaseConfig, OutputConfig
from metaphor.common.base_extractor import BaseExtractor
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.file_sink import FileSinkConfig
from metaphor.common.runner import run_connector
from metaphor.common.utils import md5_digest
from metaphor.models.crawler_run_metadata import RunStatus
from metaphor.models.metadata_change_event import (
DataPlatform,
Dataset,
DatasetLogicalID,
QueryLog,
)


Expand Down Expand Up @@ -36,9 +40,31 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
self.extend_errors(e)
return entities

def collect_query_logs(self) -> Iterator[QueryLog]:
query_logs = [
QueryLog(
id=f"{DataPlatform.SNOWFLAKE.name}:{query_id}",
query_id=str(query_id),
platform=DataPlatform.SNOWFLAKE,
account="account",
sql=query_text,
sql_hash=md5_digest(query_text.encode("utf-8")),
)
for query_id, query_text in enumerate(
f"this is query no. {x}" for x in range(5)
)
]
for query_log in query_logs:
yield query_log

directory = tempfile.mkdtemp()
file_sink_config = FileSinkConfig(directory=directory, batch_size_bytes=1000000)
dummy_connector = DummyConnector(BaseConfig(output=OutputConfig()))
events, run_metadata = run_connector(
dummy_connector, "dummy_connector", "dummy connector"
dummy_connector,
"dummy_connector",
"dummy connector",
file_sink_config=file_sink_config,
)
assert run_metadata.status is RunStatus.FAILURE
assert sorted(
Expand Down
Loading