Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sc-24431] Replace FileSink with StreamSink #776

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/custom.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ run_connector(
name="connector_name", # name should contain only alphanumeric characters plus underscores
platform=Platform.OTHER, # use other unless there's a matching platform
description="connector description", # user-facing description for the connector
file_sink_config=local_file_sink_config("/path/to/output"),
sink_config=local_sink_config("/path/to/output"),
)
```

Once the output content has been verified, you can change [`local_file_sink_config`](../metaphor/common/runner.py) to [`metaphor_file_sink_config`](../metaphor/common/runner.py) to publish to S3 buckets.
Once the output content has been verified, you can change [`local_sink_config`](../metaphor/common/runner.py) to [`metaphor_sink_config`](../metaphor/common/runner.py) to publish to S3 buckets.

## Logging

Expand Down
6 changes: 3 additions & 3 deletions examples/custom_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from metaphor.common.dataclass import ConnectorConfig
from metaphor.common.entity_id import dataset_normalized_name, to_dataset_entity_id
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.file_sink import S3StorageConfig
from metaphor.common.runner import metaphor_file_sink_config, run_connector
from metaphor.common.runner import metaphor_sink_config, run_connector
from metaphor.common.sink import S3StorageConfig
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
AssetStructure,
Expand Down Expand Up @@ -85,7 +85,7 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
name=connector_name,
platform=Platform.CUSTOM_DASHBOARD,
description="This is a custom connector made by Acme, Inc.",
file_sink_config=metaphor_file_sink_config(
sink_config=metaphor_sink_config(
tenant_name,
connector_name,
is_metaphor_cloud=False,
Expand Down
4 changes: 2 additions & 2 deletions examples/custom_dq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from metaphor.common.dataclass import ConnectorConfig
from metaphor.common.entity_id import dataset_normalized_name
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.runner import metaphor_file_sink_config, run_connector
from metaphor.common.runner import metaphor_sink_config, run_connector
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
DataPlatform,
Expand Down Expand Up @@ -60,5 +60,5 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
name=connector_name,
platform=Platform.BIGQUERY,
description="This is a custom connector made by Acme, Inc.",
file_sink_config=metaphor_file_sink_config(tenant_name, connector_name),
sink_config=metaphor_sink_config(tenant_name, connector_name),
)
4 changes: 2 additions & 2 deletions examples/custom_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from metaphor.common.dataclass import ConnectorConfig
from metaphor.common.entity_id import dataset_normalized_name, to_dataset_entity_id
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.runner import metaphor_file_sink_config, run_connector
from metaphor.common.runner import metaphor_sink_config, run_connector
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
DataPlatform,
Expand Down Expand Up @@ -70,5 +70,5 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
name=connector_name,
platform=Platform.BIGQUERY,
description="This is a custom connector made by Acme, Inc.",
file_sink_config=metaphor_file_sink_config(tenant_name, connector_name),
sink_config=metaphor_sink_config(tenant_name, connector_name),
)
4 changes: 2 additions & 2 deletions metaphor/common/base_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from smart_open import open

from metaphor.common.dataclass import ConnectorConfig
from metaphor.common.file_sink import FileSinkConfig
from metaphor.common.sink import SinkConfig
from metaphor.common.variable import variable_substitution

# Create a generic variable that can be 'BaseConfig', or any subclass.
Expand All @@ -18,7 +18,7 @@
class OutputConfig:
"""Config for where to output the data"""

file: Optional[FileSinkConfig] = None
file: Optional[SinkConfig] = None


@dataclass()
Expand Down
2 changes: 1 addition & 1 deletion metaphor/common/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ def cli_main(extractor_cls: Type[BaseExtractor], config_file: str):
name=EventUtil.class_fqcn(extractor_cls),
description=extractor_cls._description,
platform=extractor_cls._platform,
file_sink_config=base_config.output.file,
sink_config=base_config.output.file,
)
140 changes: 0 additions & 140 deletions metaphor/common/file_sink.py

This file was deleted.

34 changes: 17 additions & 17 deletions metaphor/common/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

from metaphor.common.base_extractor import BaseExtractor
from metaphor.common.event_util import ENTITY_TYPES, EventUtil
from metaphor.common.file_sink import FileSink, FileSinkConfig, S3StorageConfig
from metaphor.common.logger import get_logger
from metaphor.common.sink import SinkConfig, StreamSink
from metaphor.common.storage import S3StorageConfig
from metaphor.models.crawler_run_metadata import CrawlerRunMetadata, Platform, RunStatus
from metaphor.models.metadata_change_event import MetadataChangeEvent

Expand All @@ -17,7 +18,7 @@
name: str,
description: str,
platform: Optional[Platform] = None,
file_sink_config: Optional[FileSinkConfig] = None,
sink_config: Optional[SinkConfig] = None,
) -> Tuple[List[MetadataChangeEvent], CrawlerRunMetadata]:
"""Run a connector and write the resulting events to files and/or API.

Expand All @@ -31,7 +32,7 @@
Textual description of the connector
platform : Optional[Platform]
Platform of the connector
file_sink_config : Optional[FileSinkConfig]
sink_config : Optional[SinkConfig]
Optional configuration for outputting events to files or cloud storage

Returns
Expand Down Expand Up @@ -80,22 +81,21 @@
stack_trace=stacktrace,
)

if file_sink_config is not None:
file_sink = FileSink(file_sink_config)
file_sink.sink(events)
file_sink.sink_metadata(run_metadata)
file_sink.sink_logs()
if sink_config is not None:
with StreamSink(sink_config, run_metadata) as sink:
for event in events:
sink.write_event(event)

Check warning on line 87 in metaphor/common/runner.py

View check run for this annotation

Codecov / codecov/patch

metaphor/common/runner.py#L87

Added line #L87 was not covered by tests

return events, run_metadata


def metaphor_file_sink_config(
def metaphor_sink_config(
tenant: str,
connector_name: str,
is_metaphor_cloud=False,
s3_auth_config=S3StorageConfig(),
) -> FileSinkConfig:
"""Create a FileSinkConfig for outputting events to a Metaphor tenant's cloud storage
) -> SinkConfig:
"""Create a SinkConfig for outputting events to a Metaphor tenant's cloud storage

Parameters
----------
Expand All @@ -108,7 +108,7 @@

Returns
-------
FileSinkConfig
SinkConfig
the config created
"""

Expand All @@ -117,13 +117,13 @@
if is_metaphor_cloud
else f"metaphor-mce-{tenant}"
)
return FileSinkConfig(
return SinkConfig(

Check warning on line 120 in metaphor/common/runner.py

View check run for this annotation

Codecov / codecov/patch

metaphor/common/runner.py#L120

Added line #L120 was not covered by tests
directory=f"s3://{bucket}/{connector_name}", s3_auth_config=s3_auth_config
)


def local_file_sink_config(directory: str) -> FileSinkConfig:
"""Create a FileSinkConfig for outputting events to a local directory
def local_sink_config(directory: str) -> SinkConfig:
"""Create a SinkConfig for outputting events to a local directory

Parameters
----------
Expand All @@ -132,8 +132,8 @@

Returns
-------
FileSinkConfig
SinkConfig
the config created
"""

return FileSinkConfig(directory=directory)
return SinkConfig(directory=directory)

Check warning on line 139 in metaphor/common/runner.py

View check run for this annotation

Codecov / codecov/patch

metaphor/common/runner.py#L139

Added line #L139 was not covered by tests
Loading
Loading