diff --git a/docs/custom.md b/docs/custom.md index 684969d1..e035bbc8 100644 --- a/docs/custom.md +++ b/docs/custom.md @@ -101,11 +101,11 @@ run_connector( name="connector_name", # name should contain only alphanumeric characters plus underscores platform=Platform.OTHER, # use other unless there's a matching platform description="connector description", # user-facing description for the connector - file_sink_config=local_file_sink_config("/path/to/output"), + sink_config=local_sink_config("/path/to/output"), ) ``` -Once the output content has been verified, you can change [`local_file_sink_config`](../metaphor/common/runner.py) to [`metaphor_file_sink_config`](../metaphor/common/runner.py) to publish to S3 buckets. +Once the output content has been verified, you can change [`local_sink_config`](../metaphor/common/runner.py) to [`metaphor_sink_config`](../metaphor/common/runner.py) to publish to S3 buckets. ## Logging diff --git a/examples/custom_dashboard.py b/examples/custom_dashboard.py index 13a88cf3..8f54e26e 100644 --- a/examples/custom_dashboard.py +++ b/examples/custom_dashboard.py @@ -7,8 +7,8 @@ from metaphor.common.dataclass import ConnectorConfig from metaphor.common.entity_id import dataset_normalized_name, to_dataset_entity_id from metaphor.common.event_util import ENTITY_TYPES -from metaphor.common.file_sink import S3StorageConfig -from metaphor.common.runner import metaphor_file_sink_config, run_connector +from metaphor.common.runner import metaphor_sink_config, run_connector +from metaphor.common.sink import S3StorageConfig from metaphor.models.crawler_run_metadata import Platform from metaphor.models.metadata_change_event import ( AssetStructure, @@ -85,7 +85,7 @@ async def extract(self) -> Collection[ENTITY_TYPES]: name=connector_name, platform=Platform.CUSTOM_DASHBOARD, description="This is a custom connector made by Acme, Inc.", - file_sink_config=metaphor_file_sink_config( + sink_config=metaphor_sink_config( tenant_name, connector_name, is_metaphor_cloud=False, diff --git a/examples/custom_dq.py b/examples/custom_dq.py index 9ffeaf7f..f94c04ea 100644 --- a/examples/custom_dq.py +++ b/examples/custom_dq.py @@ -7,7 +7,7 @@ from metaphor.common.dataclass import ConnectorConfig from metaphor.common.entity_id import dataset_normalized_name from metaphor.common.event_util import ENTITY_TYPES -from metaphor.common.runner import metaphor_file_sink_config, run_connector +from metaphor.common.runner import metaphor_sink_config, run_connector from metaphor.models.crawler_run_metadata import Platform from metaphor.models.metadata_change_event import ( DataPlatform, @@ -60,5 +60,5 @@ async def extract(self) -> Collection[ENTITY_TYPES]: name=connector_name, platform=Platform.BIGQUERY, description="This is a custom connector made by Acme, Inc.", - file_sink_config=metaphor_file_sink_config(tenant_name, connector_name), + sink_config=metaphor_sink_config(tenant_name, connector_name), ) diff --git a/examples/custom_lineage.py b/examples/custom_lineage.py index 40d7869c..53981f97 100644 --- a/examples/custom_lineage.py +++ b/examples/custom_lineage.py @@ -7,7 +7,7 @@ from metaphor.common.dataclass import ConnectorConfig from metaphor.common.entity_id import dataset_normalized_name, to_dataset_entity_id from metaphor.common.event_util import ENTITY_TYPES -from metaphor.common.runner import metaphor_file_sink_config, run_connector +from metaphor.common.runner import metaphor_sink_config, run_connector from metaphor.models.crawler_run_metadata import Platform from metaphor.models.metadata_change_event import ( DataPlatform, @@ -70,5 +70,5 @@ async def extract(self) -> Collection[ENTITY_TYPES]: name=connector_name, platform=Platform.BIGQUERY, description="This is a custom connector made by Acme, Inc.", - file_sink_config=metaphor_file_sink_config(tenant_name, connector_name), + sink_config=metaphor_sink_config(tenant_name, connector_name), ) diff --git a/metaphor/common/base_config.py b/metaphor/common/base_config.py index b1c0a874..a842b803 100644 --- a/metaphor/common/base_config.py +++ b/metaphor/common/base_config.py @@ -7,7 +7,7 @@ from smart_open import open from metaphor.common.dataclass import ConnectorConfig -from metaphor.common.file_sink import FileSinkConfig +from metaphor.common.sink import SinkConfig from metaphor.common.variable import variable_substitution # Create a generic variable that can be 'BaseConfig', or any subclass. @@ -18,7 +18,7 @@ class OutputConfig: """Config for where to output the data""" - file: Optional[FileSinkConfig] = None + file: Optional[SinkConfig] = None @dataclass() diff --git a/metaphor/common/cli.py b/metaphor/common/cli.py index 33244e33..5b14af1a 100644 --- a/metaphor/common/cli.py +++ b/metaphor/common/cli.py @@ -14,5 +14,5 @@ def cli_main(extractor_cls: Type[BaseExtractor], config_file: str): name=EventUtil.class_fqcn(extractor_cls), description=extractor_cls._description, platform=extractor_cls._platform, - file_sink_config=base_config.output.file, + sink_config=base_config.output.file, ) diff --git a/metaphor/common/file_sink.py b/metaphor/common/file_sink.py deleted file mode 100644 index f5d0d4e1..00000000 --- a/metaphor/common/file_sink.py +++ /dev/null @@ -1,140 +0,0 @@ -import json -import logging -import tempfile -from dataclasses import field -from datetime import datetime, timezone -from os import path -from typing import List, Optional -from zipfile import ZIP_DEFLATED, ZipFile - -from pydantic.dataclasses import dataclass - -from metaphor.common.dataclass import ConnectorConfig -from metaphor.common.event_util import EventUtil -from metaphor.common.logger import LOG_FILE, debug_files, get_logger -from metaphor.common.sink import Sink -from metaphor.common.storage import ( - BaseStorage, - LocalStorage, - S3Storage, - S3StorageConfig, -) -from metaphor.common.utils import chunk_by_size -from metaphor.models.crawler_run_metadata import CrawlerRunMetadata - -logger = get_logger() - - -@dataclass(config=ConnectorConfig) -class FileSinkConfig: - # Location of the sink directory, where the MCE file and logs will be output to. - # Can be local file directory, s3://bucket/ or s3://bucket/path/ - directory: str - - # Output logs - write_logs: bool = True - - # Limit each file to have at most 200 items - batch_size_count: int = 200 - - # Limit each file to < 100 MB in size - batch_size_bytes: int = 100 * 1000 * 1000 - - # IAM role to assume before writing to file - assume_role_arn: Optional[str] = None - - # IAM credential to access S3 bucket - s3_auth_config: S3StorageConfig = field(default_factory=lambda: S3StorageConfig()) - - -class FileSink(Sink): - """File sink functions""" - - def __init__(self, config: FileSinkConfig): - self.path = f'{config.directory.rstrip("/")}/{int(datetime.now().timestamp())}' - self.write_logs = config.write_logs - self.batch_size_count = config.batch_size_count - self.batch_size_bytes = config.batch_size_bytes - logger.info(f"Write files to {self.path}") - - if config.directory.startswith("s3://"): - self._storage: BaseStorage = S3Storage( - config.assume_role_arn, config.s3_auth_config - ) - else: - self._storage = LocalStorage() - - def _sink(self, messages: List[dict]) -> bool: - """Write records to file with auto-splitting""" - - logger.info("Split MCE records into chunks") - slices = chunk_by_size( - messages, - self.batch_size_count, - self.batch_size_bytes, - lambda item: len(json.dumps(item)), - ) - - for part, slice in enumerate(slices): - file_name = f"{part+1}-of-{len(slices)}.json" - logger.info(f"Writing {file_name} ({slice.stop - slice.start} records)") - self._storage.write_file( - f"{self.path}/{file_name}", - json.dumps(messages[slice]), - ) - - logger.info(f"Written {len(slices)} MCE files") - - return True - - def sink_logs(self): - if not self.write_logs: - logger.info("Skip writing logs") - return - - logging.shutdown() - - _, zip_file = tempfile.mkstemp(suffix=".zip") - dir_name = datetime.now(timezone.utc).strftime("%Y-%m-%d %H-%M-%S") - - with ZipFile(zip_file, "w", ZIP_DEFLATED) as file: - arcname = f"{dir_name}/run.log" - file.write(LOG_FILE, arcname=arcname) - - for debug_file in debug_files: - arcname = f"{dir_name}/{path.basename(debug_file)}" - file.write(debug_file, arcname=arcname) - - with open(zip_file, "rb") as file: - self._storage.write_file(f"{self.path}/log.zip", file.read(), True) - - def sink_metadata(self, metadata: CrawlerRunMetadata): - if not self.write_logs: - logger.info("Skip writing metadata") - return - - content = json.dumps(EventUtil.clean_nones(metadata.to_dict())).encode() - - self._storage.write_file(f"{self.path}/run.metadata", content, True) - - def write_file(self, filename: str, content: str): - """Write content into a file in the output sink - - Parameters - ------- - filename : str - The filename to store the content under the output sink - content : str - The content to write to the file - """ - self._storage.write_file(f"{self.path}/{filename}", content.encode(), True) - - def remove_file(self, filename: str): - """Remove a file in the output sink - - Parameters - ------- - filename : str - The file to remove - """ - self._storage.delete_files([f"{self.path}/{filename}"]) diff --git a/metaphor/common/runner.py b/metaphor/common/runner.py index 58382650..7d8301b3 100644 --- a/metaphor/common/runner.py +++ b/metaphor/common/runner.py @@ -4,8 +4,9 @@ from metaphor.common.base_extractor import BaseExtractor from metaphor.common.event_util import ENTITY_TYPES, EventUtil -from metaphor.common.file_sink import FileSink, FileSinkConfig, S3StorageConfig from metaphor.common.logger import get_logger +from metaphor.common.sink import SinkConfig, StreamSink +from metaphor.common.storage import S3StorageConfig from metaphor.models.crawler_run_metadata import CrawlerRunMetadata, Platform, RunStatus from metaphor.models.metadata_change_event import MetadataChangeEvent @@ -17,7 +18,7 @@ def run_connector( name: str, description: str, platform: Optional[Platform] = None, - file_sink_config: Optional[FileSinkConfig] = None, + sink_config: Optional[SinkConfig] = None, ) -> Tuple[List[MetadataChangeEvent], CrawlerRunMetadata]: """Run a connector and write the resulting events to files and/or API. @@ -31,7 +32,7 @@ def run_connector( Textual description of the connector platform : Optional[Platform] Platform of the connector - file_sink_config : Optional[FileSinkConfig] + sink_config : Optional[SinkConfig] Optional configuration for outputting events to files or cloud storage Returns @@ -80,22 +81,21 @@ def run_connector( stack_trace=stacktrace, ) - if file_sink_config is not None: - file_sink = FileSink(file_sink_config) - file_sink.sink(events) - file_sink.sink_metadata(run_metadata) - file_sink.sink_logs() + if sink_config is not None: + with StreamSink(sink_config, run_metadata) as sink: + for event in events: + sink.write_event(event) return events, run_metadata -def metaphor_file_sink_config( +def metaphor_sink_config( tenant: str, connector_name: str, is_metaphor_cloud=False, s3_auth_config=S3StorageConfig(), -) -> FileSinkConfig: - """Create a FileSinkConfig for outputting events to a Metaphor tenant's cloud storage +) -> SinkConfig: + """Create a SinkConfig for outputting events to a Metaphor tenant's cloud storage Parameters ---------- @@ -108,7 +108,7 @@ def metaphor_file_sink_config( Returns ------- - FileSinkConfig + SinkConfig the config created """ @@ -117,13 +117,13 @@ def metaphor_file_sink_config( if is_metaphor_cloud else f"metaphor-mce-{tenant}" ) - return FileSinkConfig( + return SinkConfig( directory=f"s3://{bucket}/{connector_name}", s3_auth_config=s3_auth_config ) -def local_file_sink_config(directory: str) -> FileSinkConfig: - """Create a FileSinkConfig for outputting events to a local directory +def local_sink_config(directory: str) -> SinkConfig: + """Create a SinkConfig for outputting events to a local directory Parameters ---------- @@ -132,8 +132,8 @@ def local_file_sink_config(directory: str) -> FileSinkConfig: Returns ------- - FileSinkConfig + SinkConfig the config created """ - return FileSinkConfig(directory=directory) + return SinkConfig(directory=directory) diff --git a/metaphor/common/sink.py b/metaphor/common/sink.py index 859ca2cf..324d3ceb 100644 --- a/metaphor/common/sink.py +++ b/metaphor/common/sink.py @@ -1,43 +1,249 @@ +import json import logging -from abc import ABC, abstractmethod -from concurrent.futures import ThreadPoolExecutor -from typing import Generator, List +import tempfile +from dataclasses import field +from datetime import datetime, timezone +from os import path +from typing import Optional +from zipfile import ZIP_DEFLATED, ZipFile +from pydantic.dataclasses import dataclass + +from metaphor.common.dataclass import ConnectorConfig +from metaphor.common.event_util import EventUtil +from metaphor.common.logger import LOG_FILE, debug_files, get_logger +from metaphor.common.storage import ( + BaseStorage, + LocalStorage, + S3Storage, + S3StorageConfig, +) +from metaphor.models.crawler_run_metadata import CrawlerRunMetadata from metaphor.models.metadata_change_event import MetadataChangeEvent -from .event_util import EventUtil +logger = get_logger() + + +@dataclass(config=ConnectorConfig) +class SinkConfig: + # Location of the sink directory, where the MCE file and logs will be output to. + # Can be local file directory, s3://bucket/ or s3://bucket/path/ + directory: str + + # Output logs + write_logs: bool = True + + # Limit each file to have at most 200 items + batch_size_count: int = 200 + + # Limit each file to < 100 MB in size + batch_size_bytes: int = 100 * 1000 * 1000 + + # IAM role to assume before writing to file + assume_role_arn: Optional[str] = None + + # IAM credential to access S3 bucket + s3_auth_config: S3StorageConfig = field(default_factory=lambda: S3StorageConfig()) + + +class StreamSink: + def __init__( + self, config: SinkConfig, metadata: Optional[CrawlerRunMetadata] = None + ): + self.path = f'{config.directory.rstrip("/")}/{int(datetime.now().timestamp())}' + self.metadata = metadata + self.log_execution = config.write_logs + """ + Whether to log the execution process. + """ + + self.items_per_batch = config.batch_size_count + """ + Maximum number of MCE items per batch. + """ + + self.bytes_per_batch = config.batch_size_bytes + """ + Maximum bytesize for a batch. + """ + + self._event_util = EventUtil() + + self._current_batch: int = -1 + """ + The batch currently being processed. Starts from -1. + """ + self._items: int = 0 + """ + How many items we've processed so far in the current batch. + """ + self._bytes: int = 0 + """ + How many bytes we've processed so far in the current batch. + """ + + self._completed_batches: int = 0 + """ + Number of finished batches. + """ + + logger.info(f"Write files to {self.path}") + if config.directory.startswith("s3://"): + self._storage: BaseStorage = S3Storage( + config.assume_role_arn, config.s3_auth_config + ) + else: + self._storage = LocalStorage() + + self._entered = False + + def __enter__(self): + self._entered = True + return self -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) + def __exit__(self, exception_type, exception_value, traceback): + # TODO do something with exceptions here? + self._exit() + def _get_batch_file_path(self, batch: int) -> str: + return f"{self.path}/{batch}.json" -class Sink(ABC): - """Base class for metadata sinks""" + def _get_current_batch_file_path(self) -> str: + return self._get_batch_file_path(self._current_batch) - def sink(self, events: List[MetadataChangeEvent]) -> bool: - """Sink MCE messages to the destination""" - event_util = EventUtil() - records = [event_util.trim_event(e) for e in events] + def _should_start_new_batch( + self, + payload_len: int, + ) -> bool: + """ + We should move on from the current batch if the item we're working with + is going to make the current batch exceed the count or bytesize limit. - logger.info("validating MCE records") - with ThreadPoolExecutor() as tpe: - valid_records = [ - r - for r in tpe.map(event_util.validate_message, records) - if r is not None - ] + If the current batch is less than 0, it means we need to create the first + batch. + """ + return self._current_batch < 0 or ( + self._bytes + payload_len + > self.bytes_per_batch # If we're gonna exceed the batch byte count limit + or self._items + 1 + > self.items_per_batch # If we're gonna exceed the batch item count limit + ) - if len(valid_records) == 0: + def _finalize_current_batch(self) -> None: + """ + Inserts a `']'` character to the current batch file, then increments `self._completed_batched` + by 1. + """ + self._storage.write_file(self._get_current_batch_file_path(), "]", append=True) + logger.info( + f"Finished batch #{self._current_batch}, bytesize = {self._bytes}, item count = {self._items}" + ) + self._completed_batches += 1 + + def write_event(self, event: MetadataChangeEvent) -> bool: + """ + Writes a single `MetadataChangeEvent` to the output folder. Automatically splits the stream + into multiple batches. + + This method should be called only context managed, i.e. called within a `with` context: + + ```python + with StreamSink(config, metadata) as sink: + for event in events: + sink.write_event(event) + ``` + """ + + if not self._entered: + raise ValueError( + "Cannot call this method when StreamSink isn't context managed" + ) + + message = self._event_util.trim_event(event) + validated_message = self._event_util.validate_message(message) + if validated_message is None: return False - return self._sink(valid_records) + payload = json.dumps(validated_message) + payload_size = len(payload) + + if self._should_start_new_batch( + payload_size + 1 + ): # The actual payload is always prefixed by a single character. + if self._current_batch >= 0: + self._finalize_current_batch() + + # ... then reset the counters... + self._bytes = 0 + self._items = 0 + self._current_batch += 1 + + # For the new batch, we insert a single '[' character to the file's start. + payload_prefix = "[" + else: + # Otherwise we are inserting to the current batch file. + # We separate the current event with the previously inserted ones with a ','. + payload_prefix = "," + + self._storage.write_file( + self._get_current_batch_file_path(), payload_prefix + payload, append=True + ) + self._items += 1 + self._bytes += payload_size + 1 + return True + + def _exit(self) -> None: + if not self._entered: + raise ValueError( + "Cannot call this method when StreamSink isn't context managed" + ) + self._finalize_batches() + self._write_execution_logs() + self._write_run_metadata() + self._entered = False + + def _finalize_batches(self) -> None: + """ + Finalizes the entire event stream by finishing the last batch file, and renaming the + batch files. + """ + if self._current_batch != -1: + self._finalize_current_batch() + for batch in range(self._completed_batches): + self._storage.rename_file( + self._get_batch_file_path(batch), + f"{self.path}/{batch+1}-of-{self._completed_batches}.json", + ) + logger.info(f"Wrote {self._completed_batches} MCE files") + + def _write_execution_logs(self): + if not self.log_execution: + logger.info("Skip writing logs") + return + + logging.shutdown() + + _, zip_file = tempfile.mkstemp(suffix=".zip") + dir_name = datetime.now(timezone.utc).strftime("%Y-%m-%d %H-%M-%S") + + with ZipFile(zip_file, "w", ZIP_DEFLATED) as file: + arcname = f"{dir_name}/run.log" + file.write(LOG_FILE, arcname=arcname) + + for debug_file in debug_files: + arcname = f"{dir_name}/{path.basename(debug_file)}" + file.write(debug_file, arcname=arcname) + + with open(zip_file, "rb") as file: + self._storage.write_file(f"{self.path}/log.zip", file.read(), True) - @staticmethod - def _chunks(records: List, n: int) -> Generator[List, None, None]: - """Yield successive n-sized chunks from list.""" - for i in range(0, len(records), n): - yield records[i : i + n] + def _write_run_metadata(self): + if not self.log_execution: + logger.info("Skip writing metadata") + return - @abstractmethod - def _sink(self, messages: List[dict]) -> bool: - """Sink metadata records to the destination, should be overridden""" + if self.metadata: + content = json.dumps( + EventUtil.clean_nones(self.metadata.to_dict()) + ).encode() + self._storage.write_file(f"{self.path}/run.metadata", content, True) diff --git a/metaphor/common/storage.py b/metaphor/common/storage.py index 757e838f..c46f44b4 100644 --- a/metaphor/common/storage.py +++ b/metaphor/common/storage.py @@ -8,6 +8,12 @@ from pydantic.dataclasses import dataclass from smart_open import open +try: + from mypy_boto3_s3 import S3Client +except ImportError: + # Ignore this since mypy plugins are dev dependencies + pass + from metaphor.common.dataclass import ConnectorConfig from metaphor.common.logger import get_logger @@ -28,7 +34,7 @@ class BaseStorage(ABC): @abstractmethod def write_file( - self, path: str, payload: Union[str, bytes], binary_mode=False + self, path: str, payload: Union[str, bytes], binary_mode=False, append=False ) -> None: """write a file to the given path""" @@ -40,18 +46,25 @@ def list_files(self, path: str, suffix: Optional[str]) -> List[str]: def delete_files(self, paths: List[str]) -> None: """delete the given file(s)""" + @abstractmethod + def rename_file(self, old_path: str, new_path: str) -> None: + """rename a file""" + class LocalStorage(BaseStorage): """Storage implementation for local file system""" def write_file( - self, path: str, payload: Union[str, bytes], binary_mode=False + self, path: str, payload: Union[str, bytes], binary_mode=False, append=False ) -> None: os.makedirs(os.path.expanduser(os.path.dirname(path)), exist_ok=True) - mode = "wb" if binary_mode else "w" + mode = "a" if append else "w" + if binary_mode: + mode += "b" + with open(path, mode) as fp: - fp.write(payload) + fp.write(payload) # type: ignore def list_files(self, path: str, suffix: Optional[str]) -> List[str]: directory = os.path.expanduser(path) @@ -69,6 +82,9 @@ def delete_files(self, paths: List[str]) -> None: for path in paths: os.remove(path) + def rename_file(self, old_path: str, new_path: str) -> None: + os.rename(old_path, new_path) + @dataclass(config=ConnectorConfig) class S3StorageConfig: @@ -98,11 +114,11 @@ def __init__( else: self._session = session - self._client = self._session.client("s3") + self._client: S3Client = self._session.client("s3") # type: ignore logger.info("Created S3 client") def write_file( - self, path: str, payload: Union[str, bytes], binary_mode=False + self, path: str, payload: Union[str, bytes], binary_mode=False, append=False ) -> None: transport_params = { **OWNER_FULL_CONTROL_ACL, @@ -111,9 +127,11 @@ def write_file( "client": self._client, } - mode = "wb" if binary_mode else "w" + mode = "a" if append else "w" + if binary_mode: + mode += "b" with open(path, mode, transport_params=transport_params) as fp: - fp.write(payload) + fp.write(payload) # type: ignore def list_files(self, path: str, suffix: Optional[str]) -> List[str]: bucket, key = S3Storage.parse_s3_uri(path) @@ -133,9 +151,9 @@ def list_files(self, path: str, suffix: Optional[str]) -> List[str]: objects.extend(resp.get("Contents", [])) return [ - f"s3://{bucket}/{file['Key']}" + f"s3://{bucket}/{file.get('Key', '')}" for file in objects - if suffix is None or file.get("Key").endswith(suffix) + if suffix is None or file.get("Key", "").endswith(suffix) ] def delete_files(self, paths: List[str]) -> None: @@ -146,6 +164,16 @@ def delete_files(self, paths: List[str]) -> None: Key=key, ) + def rename_file(self, old_path: str, new_path: str) -> None: + old_bucket, old_key = S3Storage.parse_s3_uri(old_path) + new_bucket, new_key = S3Storage.parse_s3_uri(new_path) + self._client.copy_object( + Bucket=new_bucket, + CopySource=f"{old_bucket}/{old_key}", + Key=new_key, + ) + self._client.delete_object(Bucket=old_bucket, Key=old_key) + @staticmethod def parse_s3_uri(uri: str) -> Tuple[str, str]: """parse S3 URI and return (bucket, key)""" diff --git a/pyproject.toml b/pyproject.toml index bb92d554..55061359 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.13.118" +version = "0.13.119" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/common/test_config.py b/tests/common/test_config.py index 308348df..15ad5f98 100644 --- a/tests/common/test_config.py +++ b/tests/common/test_config.py @@ -4,7 +4,7 @@ from metaphor.common.base_config import BaseConfig, OutputConfig from metaphor.common.dataclass import ConnectorConfig -from metaphor.common.file_sink import FileSinkConfig +from metaphor.common.sink import SinkConfig def test_yaml_config(test_root_dir): @@ -12,7 +12,7 @@ def test_yaml_config(test_root_dir): assert config == BaseConfig( output=OutputConfig( - file=FileSinkConfig( + file=SinkConfig( directory="path", assume_role_arn="arn", write_logs=False, diff --git a/tests/common/test_file_sink.py b/tests/common/test_sink.py similarity index 75% rename from tests/common/test_file_sink.py rename to tests/common/test_sink.py index a8872c3d..fbb7b624 100644 --- a/tests/common/test_file_sink.py +++ b/tests/common/test_sink.py @@ -3,11 +3,12 @@ from os import path from zipfile import ZipFile +import pytest from freezegun import freeze_time from metaphor.common.event_util import EventUtil -from metaphor.common.file_sink import FileSink, FileSinkConfig from metaphor.common.logger import add_debug_file +from metaphor.common.sink import SinkConfig, StreamSink from metaphor.models.crawler_run_metadata import CrawlerRunMetadata, RunStatus from metaphor.models.metadata_change_event import ( DataPlatform, @@ -23,7 +24,7 @@ def events_from_json(file): @freeze_time("2000-01-01") -def test_file_sink_no_split(test_root_dir): +def test_file_sink_no_split(test_root_dir: str): directory = tempfile.mkdtemp() messages = [ @@ -40,13 +41,14 @@ def test_file_sink_no_split(test_root_dir): ] # Set batch_size_bytes so large that all messages can fit in the same file - sink = FileSink(FileSinkConfig(directory=directory, batch_size_bytes=1000000)) - assert sink.sink(messages) is True + with StreamSink(SinkConfig(directory=directory, batch_size_bytes=1000000)) as sink: + for event in messages: + assert sink.write_event(event) assert messages == events_from_json(f"{directory}/946684800/1-of-1.json") @freeze_time("2000-01-01") -def test_file_sink_split(test_root_dir): +def test_file_sink_split(test_root_dir: str): directory = tempfile.mkdtemp() messages = [ @@ -78,8 +80,9 @@ def test_file_sink_split(test_root_dir): ] # Set batch_size_bytes so small that only one message can be fit in each file - sink = FileSink(FileSinkConfig(directory=directory, batch_size_bytes=10)) - assert sink.sink(messages) is True + with StreamSink(SinkConfig(directory=directory, batch_size_bytes=10)) as sink: + for message in messages: + assert sink.write_event(message) is True assert messages[0:1] == events_from_json(f"{directory}/946684800/1-of-5.json") assert messages[1:2] == events_from_json(f"{directory}/946684800/2-of-5.json") assert messages[2:3] == events_from_json(f"{directory}/946684800/3-of-5.json") @@ -100,8 +103,9 @@ def test_sink_metadata(test_root_dir): entity_count=1.0, ) - sink = FileSink(FileSinkConfig(directory=directory)) - sink.sink_metadata(metadata) + with StreamSink(SinkConfig(directory=directory), metadata): + # It's gonna write it when exiting the context + pass assert EventUtil.clean_nones(metadata.to_dict()) == load_json( f"{directory}/946684800/run.metadata" @@ -115,8 +119,9 @@ def test_sink_logs(test_root_dir): directory = tempfile.mkdtemp() - sink = FileSink(FileSinkConfig(directory=directory)) - sink.sink_logs() + with StreamSink(SinkConfig(directory=directory)): + # It's gonna write it when exiting the context + pass zip_file = f"{directory}/946684800/log.zip" @@ -128,20 +133,16 @@ def test_sink_logs(test_root_dir): assert path.basename(debug_file) in base_names -@freeze_time("2000-01-01") -def test_sink_file(test_root_dir): +def test_stream_sink_not_in_context_manager(): directory = tempfile.mkdtemp() - - sink = FileSink(FileSinkConfig(directory=directory)) - filename = "test.txt" - sink.write_file(filename, "the content") - - full_path = f"{directory}/946684800/{filename}" - assert path.exists(full_path) - - with open(full_path) as f: - content = f.read() - assert content == "the content" - - sink.remove_file(filename) - assert path.exists(full_path) is False + with pytest.raises(ValueError): + sink = StreamSink(SinkConfig(directory=directory)) + sink.write_event( + MetadataChangeEvent( + dataset=Dataset( + logical_id=DatasetLogicalID( + name="foo1", platform=DataPlatform.BIGQUERY + ) + ) + ) + )