diff --git a/metaphor/common/file_sink.py b/metaphor/common/file_sink.py index 61f6e634..6d51a901 100644 --- a/metaphor/common/file_sink.py +++ b/metaphor/common/file_sink.py @@ -63,11 +63,17 @@ def __init__( self.mces_per_batch = batch_size_count self.bytes_per_batch = batch_size_bytes # FIXME our logic for handling this isn't that precise, but should be close enough - self.logs: List[QueryLog] = [] - self.mces: List[dict] = [] - self.logs_count: int = 0 - self.mces_count: int = 0 self.completed_batches = 0 + self.total_mces_wrote: int = 0 + """ + Total number of QueryLogs MCEs written. + """ + + # INTERNALS + self._logs: List[QueryLog] = [] + self._mces: List[dict] = [] + self._logs_count: int = 0 + self._mces_count: int = 0 self.logs_per_mce = logs_per_mce self._entered = False @@ -78,30 +84,33 @@ def __enter__(self): return self def __exit__(self, _exception_type, _exception_value, _traceback): - if self.logs: + if self._logs: self._finalize_current_mce() - if self.mces: + if self._mces: self._finalize_current_batch() self._entered = False if self.completed_batches: - logger.info(f"Wrote {self.completed_batches} batches of QueryLogs MCE") + logger.info( + f"Wrote {self.total_mces_wrote} QueryLogs MCEs into {self.completed_batches} batches" + ) def _finalize_current_mce(self) -> None: - self.mces.append(EventUtil.build_then_trim(QueryLogs(logs=self.logs))) - self.logs_count = 0 - self.logs.clear() - self.mces_count += 1 + self._mces.append(EventUtil.build_then_trim(QueryLogs(logs=self._logs))) + self._logs_count = 0 + self._logs.clear() + self._mces_count += 1 + self.total_mces_wrote += 1 def _finalize_current_batch(self) -> None: # No need to validate mce self.storage.write_file( f"{self.path}/query_logs-{self.completed_batches}.json", - json.dumps(self.mces), + json.dumps(self._mces), ) self.completed_batches += 1 - self.mces_count = 0 - self.mces.clear() + self._mces_count = 0 + self._mces.clear() self.batch_bytes = 0 def write_query_log(self, query_log: QueryLog) -> None: @@ -110,17 +119,17 @@ def write_query_log(self, query_log: QueryLog) -> None: "This method can only be called when QueryLogSink is in a managed context" ) if ( - self.logs_count >= self.logs_per_mce + self._logs_count >= self.logs_per_mce or self.batch_bytes >= self.bytes_per_batch ): self._finalize_current_mce() if ( - self.mces_count >= self.mces_per_batch + self._mces_count >= self.mces_per_batch or self.batch_bytes >= self.bytes_per_batch ): self._finalize_current_batch() - self.logs.append(query_log) - self.logs_count += 1 + self._logs.append(query_log) + self._logs_count += 1 self.batch_bytes += len(json.dumps(query_log.to_dict())) diff --git a/metaphor/common/runner.py b/metaphor/common/runner.py index 74d3e86f..3cc623f6 100644 --- a/metaphor/common/runner.py +++ b/metaphor/common/runner.py @@ -83,13 +83,15 @@ def run_connector( if file_sink_config is not None: file_sink = FileSink(file_sink_config) file_sink.write_events(events) - file_sink.write_metadata(run_metadata) file_sink.write_execution_logs() with file_sink.get_query_log_sink() as query_log_sink: for query_log in connector.collect_query_logs(): query_log_sink.write_query_log(query_log) + if run_metadata.entity_count is not None: + run_metadata.entity_count += float(query_log_sink.total_mces_wrote) + file_sink.write_metadata(run_metadata) return events, run_metadata diff --git a/pyproject.toml b/pyproject.toml index d570987b..afba6b90 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.13.124" +version = "0.13.125" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/common/test_runner.py b/tests/common/test_runner.py index f03976a8..3cda6373 100644 --- a/tests/common/test_runner.py +++ b/tests/common/test_runner.py @@ -1,14 +1,18 @@ -from typing import Collection, List +import tempfile +from typing import Collection, Iterator, List from metaphor.common.base_config import BaseConfig, OutputConfig from metaphor.common.base_extractor import BaseExtractor from metaphor.common.event_util import ENTITY_TYPES +from metaphor.common.file_sink import FileSinkConfig from metaphor.common.runner import run_connector +from metaphor.common.utils import md5_digest from metaphor.models.crawler_run_metadata import RunStatus from metaphor.models.metadata_change_event import ( DataPlatform, Dataset, DatasetLogicalID, + QueryLog, ) @@ -36,9 +40,31 @@ async def extract(self) -> Collection[ENTITY_TYPES]: self.extend_errors(e) return entities + def collect_query_logs(self) -> Iterator[QueryLog]: + query_logs = [ + QueryLog( + id=f"{DataPlatform.SNOWFLAKE.name}:{query_id}", + query_id=str(query_id), + platform=DataPlatform.SNOWFLAKE, + account="account", + sql=query_text, + sql_hash=md5_digest(query_text.encode("utf-8")), + ) + for query_id, query_text in enumerate( + f"this is query no. {x}" for x in range(5) + ) + ] + for query_log in query_logs: + yield query_log + + directory = tempfile.mkdtemp() + file_sink_config = FileSinkConfig(directory=directory, batch_size_bytes=1000000) dummy_connector = DummyConnector(BaseConfig(output=OutputConfig())) events, run_metadata = run_connector( - dummy_connector, "dummy_connector", "dummy connector" + dummy_connector, + "dummy_connector", + "dummy connector", + file_sink_config=file_sink_config, ) assert run_metadata.status is RunStatus.FAILURE assert sorted(