From 6b2d22213d9104812beb7bf806c41bf8b0116cdf Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 2 Oct 2023 11:58:45 +0200 Subject: [PATCH 01/12] Adding flag to auto lowercase dataset urns --- .../src/datahub/ingestion/api/source.py | 16 ++++++++ .../datahub/ingestion/api/source_helpers.py | 32 ++++++++++++++- .../datahub/ingestion/run/pipeline_config.py | 5 +++ .../tests/unit/test_source_helpers.py | 41 +++++++++++++++++++ 4 files changed, 93 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 0bcc220cad49b..4e9b431f3dc06 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -29,6 +29,7 @@ from datahub.ingestion.api.report import Report from datahub.ingestion.api.source_helpers import ( auto_browse_path_v2, + auto_lowercase_urns, auto_materialize_referenced_tags, auto_status_aspect, auto_workunit_reporter, @@ -192,7 +193,17 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run ) + auto_lowercase_urns: Optional[MetadataWorkUnitProcessor] = None + if ( + self.ctx.pipeline_config + and self.ctx.pipeline_config.flags.auto_lowercase_urns + ): + auto_lowercase_urns = self._get_auto_lowercase_urn_processor( + enabled=self.ctx.pipeline_config.flags.auto_lowercase_urns + ) + return [ + auto_lowercase_urns, auto_status_aspect, auto_materialize_referenced_tags, browse_path_processor, @@ -238,6 +249,11 @@ def get_report(self) -> SourceReport: def close(self) -> None: pass + def _get_auto_lowercase_urn_processor( + self, enabled: bool + ) -> MetadataWorkUnitProcessor: + return partial(auto_lowercase_urns, enabled=enabled) + def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor: config = self.get_config() platform = getattr(self, "platform", None) or getattr(config, "platform", None) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 7fc15cf829678..cdef161c215e3 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -33,8 +33,9 @@ ) from datahub.telemetry import telemetry from datahub.utilities.urns.dataset_urn import DatasetUrn +from datahub.utilities.urns.error import InvalidUrnError from datahub.utilities.urns.tag_urn import TagUrn -from datahub.utilities.urns.urn import guess_entity_type +from datahub.utilities.urns.urn import Urn, guess_entity_type from datahub.utilities.urns.urn_iter import list_urns if TYPE_CHECKING: @@ -173,6 +174,35 @@ def auto_materialize_referenced_tags( ).as_workunit() +def auto_lowercase_urns( + stream: Iterable[MetadataWorkUnit], enabled: bool = False +) -> Iterable[MetadataWorkUnit]: + """Lowercase all dataset urns""" + + if not enabled: + return stream + + for wu in stream: + try: + urn = Urn.create_from_string(wu.metadata.entityUrn) + if urn.get_type() == DatasetUrn.ENTITY_TYPE: + dataset_urn = DatasetUrn.create_from_string(wu.metadata.entityUrn) + lowercased_urn = DatasetUrn.create_from_ids( + dataset_urn.get_data_platform_urn().get_platform_name(), + dataset_urn.get_dataset_name().lower(), + dataset_urn.get_env(), + ) + wu.metadata.entityUrn = str(lowercased_urn) + wu.id = wu.id.replace( + dataset_urn.get_dataset_name(), lowercased_urn.get_dataset_name() + ) + yield wu + else: + yield wu + except InvalidUrnError: + yield wu + + def auto_browse_path_v2( stream: Iterable[MetadataWorkUnit], *, diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py index ff9a7a6f3d146..767288f76057d 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py @@ -57,6 +57,11 @@ class FlagsConfig(ConfigModel): ), ) + auto_lowercase_urns: bool = Field( + default=False, + description="Wether to Lowercase entity urns.", + ) + class PipelineConfig(ConfigModel): # Once support for discriminated unions gets merged into Pydantic, we can diff --git a/metadata-ingestion/tests/unit/test_source_helpers.py b/metadata-ingestion/tests/unit/test_source_helpers.py index b6ec6ebce240c..e7147f8a837df 100644 --- a/metadata-ingestion/tests/unit/test_source_helpers.py +++ b/metadata-ingestion/tests/unit/test_source_helpers.py @@ -16,6 +16,7 @@ from datahub.ingestion.api.source_helpers import ( auto_browse_path_v2, auto_empty_dataset_usage_statistics, + auto_lowercase_urns, auto_status_aspect, auto_workunit, ) @@ -275,6 +276,46 @@ def test_auto_browse_path_v2_legacy_browse_path(telemetry_ping_mock): assert paths["platform,dataset-2,PROD)"] == _make_browse_path_entries(["something"]) +def test_auto_lowercase_aspects(): + mcws = [ + MetadataChangeProposalWrapper( + entityUrn=make_dataset_urn( + "bigquery", "myProject.mySchema.myTable", "PROD" + ), + aspect=models.DatasetKeyClass( + "bigquery", "myProject.mySchema.myTable", "PROD" + ), + ).as_workunit(), + MetadataChangeProposalWrapper( + entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a", + aspect=models.ContainerPropertiesClass( + name="test", + ), + ).as_workunit(), + ] + expected = [ + *list( + auto_workunit( + [ + MetadataChangeProposalWrapper( + entityUrn="urn:li:dataset:(urn:li:dataPlatform:bigquery,myproject.myschema.mytable,PROD)", + aspect=models.DatasetKeyClass( + "bigquery", "myProject.mySchema.myTable", "PROD" + ), + ), + MetadataChangeProposalWrapper( + entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a", + aspect=models.ContainerPropertiesClass( + name="test", + ), + ), + ] + ) + ), + ] + assert list(auto_lowercase_urns(mcws, True)) == expected + + @patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") def test_auto_browse_path_v2_container_over_legacy_browse_path(telemetry_ping_mock): structure = {"a": {"b": ["c"]}} From 70de90587c2dfc45386c00addb0871bf6a807ea6 Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 2 Oct 2023 12:44:00 +0200 Subject: [PATCH 02/12] Fixing linter issues --- .../src/datahub/ingestion/api/source_helpers.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index cdef161c215e3..38a039532dcda 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -18,6 +18,7 @@ from datahub.emitter.mce_builder import make_dataplatform_instance_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.schema_classes import ( BrowsePathEntryClass, BrowsePathsClass, @@ -184,15 +185,20 @@ def auto_lowercase_urns( for wu in stream: try: - urn = Urn.create_from_string(wu.metadata.entityUrn) + urn = Urn.create_from_string(wu.get_urn()) if urn.get_type() == DatasetUrn.ENTITY_TYPE: - dataset_urn = DatasetUrn.create_from_string(wu.metadata.entityUrn) + dataset_urn = DatasetUrn.create_from_string(str(urn)) lowercased_urn = DatasetUrn.create_from_ids( dataset_urn.get_data_platform_urn().get_platform_name(), dataset_urn.get_dataset_name().lower(), dataset_urn.get_env(), ) - wu.metadata.entityUrn = str(lowercased_urn) + + if isinstance(wu.metadata, MetadataChangeEvent): + wu.metadata.proposedSnapshot.urn = str(lowercased_urn) + else: + wu.metadata.entityUrn = str(lowercased_urn) + wu.id = wu.id.replace( dataset_urn.get_dataset_name(), lowercased_urn.get_dataset_name() ) From 2727ee76660430fabf7d797b56e4688a0b142c44 Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 2 Oct 2023 13:49:17 +0200 Subject: [PATCH 03/12] Fixing typo --- metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py index 767288f76057d..abb4af439109e 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py @@ -59,7 +59,7 @@ class FlagsConfig(ConfigModel): auto_lowercase_urns: bool = Field( default=False, - description="Wether to Lowercase entity urns.", + description="Wether to lowercase dataset entity urns.", ) From 89801537e41f446ff059b669ad03d0c99fef7a9c Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 3 Oct 2023 00:12:41 +0200 Subject: [PATCH 04/12] Using urn iter's lowercase_dataset_urns --- .../datahub/ingestion/api/source_helpers.py | 38 ++++--------- .../src/datahub/utilities/urns/urn_iter.py | 33 +++++++++-- .../tests/unit/test_source_helpers.py | 57 ++++++++++++++----- 3 files changed, 83 insertions(+), 45 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 38a039532dcda..1571051a842cd 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -1,4 +1,5 @@ import logging +import traceback from datetime import datetime, timezone from typing import ( TYPE_CHECKING, @@ -18,7 +19,6 @@ from datahub.emitter.mce_builder import make_dataplatform_instance_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.schema_classes import ( BrowsePathEntryClass, BrowsePathsClass, @@ -34,10 +34,9 @@ ) from datahub.telemetry import telemetry from datahub.utilities.urns.dataset_urn import DatasetUrn -from datahub.utilities.urns.error import InvalidUrnError from datahub.utilities.urns.tag_urn import TagUrn -from datahub.utilities.urns.urn import Urn, guess_entity_type -from datahub.utilities.urns.urn_iter import list_urns +from datahub.utilities.urns.urn import guess_entity_type +from datahub.utilities.urns.urn_iter import list_urns, lowercase_dataset_urns if TYPE_CHECKING: from datahub.ingestion.api.source import SourceReport @@ -72,7 +71,6 @@ def auto_status_aspect( for wu in stream: urn = wu.get_urn() all_urns.add(urn) - if not wu.is_primary_source: # If this is a non-primary source, we pretend like we've seen the status # aspect so that we don't try to emit a removal for it. @@ -185,27 +183,15 @@ def auto_lowercase_urns( for wu in stream: try: - urn = Urn.create_from_string(wu.get_urn()) - if urn.get_type() == DatasetUrn.ENTITY_TYPE: - dataset_urn = DatasetUrn.create_from_string(str(urn)) - lowercased_urn = DatasetUrn.create_from_ids( - dataset_urn.get_data_platform_urn().get_platform_name(), - dataset_urn.get_dataset_name().lower(), - dataset_urn.get_env(), - ) - - if isinstance(wu.metadata, MetadataChangeEvent): - wu.metadata.proposedSnapshot.urn = str(lowercased_urn) - else: - wu.metadata.entityUrn = str(lowercased_urn) - - wu.id = wu.id.replace( - dataset_urn.get_dataset_name(), lowercased_urn.get_dataset_name() - ) - yield wu - else: - yield wu - except InvalidUrnError: + old_urn = wu.get_urn() + lowercase_dataset_urns(wu.metadata) + wu.id = wu.id.replace(old_urn, wu.get_urn()) + + yield wu + except Exception: + logger.warning( + f"Failed to lowercase urns for {wu} the exception was: {traceback.format_exc()}" + ) yield wu diff --git a/metadata-ingestion/src/datahub/utilities/urns/urn_iter.py b/metadata-ingestion/src/datahub/utilities/urns/urn_iter.py index 261f95331af61..e13d439161064 100644 --- a/metadata-ingestion/src/datahub/utilities/urns/urn_iter.py +++ b/metadata-ingestion/src/datahub/utilities/urns/urn_iter.py @@ -3,7 +3,11 @@ from avro.schema import Field, RecordSchema from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.metadata.schema_classes import DictWrapper +from datahub.metadata.schema_classes import ( + DictWrapper, + MetadataChangeEventClass, + MetadataChangeProposalClass, +) from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub.utilities.urns.urn import Urn, guess_entity_type @@ -32,7 +36,7 @@ def list_urns_with_path( if isinstance(model, MetadataChangeProposalWrapper): if model.entityUrn: - urns.append((model.entityUrn, ["urn"])) + urns.append((model.entityUrn, ["entityUrn"])) if model.entityKeyAspect: urns.extend( _add_prefix_to_paths( @@ -83,7 +87,15 @@ def list_urns(model: Union[DictWrapper, MetadataChangeProposalWrapper]) -> List[ return [urn for urn, _ in list_urns_with_path(model)] -def transform_urns(model: DictWrapper, func: Callable[[str], str]) -> None: +def transform_urns( + model: Union[ + DictWrapper, + MetadataChangeEventClass, + MetadataChangeProposalClass, + MetadataChangeProposalWrapper, + ], + func: Callable[[str], str], +) -> None: """ Rewrites all URNs in the given object according to the given function. """ @@ -95,7 +107,9 @@ def transform_urns(model: DictWrapper, func: Callable[[str], str]) -> None: def _modify_at_path( - model: Union[DictWrapper, list], path: _Path, new_value: str + model: Union[DictWrapper, MetadataChangeProposalWrapper, list], + path: _Path, + new_value: str, ) -> None: assert len(path) > 0 @@ -103,6 +117,8 @@ def _modify_at_path( if isinstance(path[0], int): assert isinstance(model, list) model[path[0]] = new_value + elif isinstance(model, MetadataChangeProposalWrapper): + setattr(model, path[0], new_value) else: assert isinstance(model, DictWrapper) model._inner_dict[path[0]] = new_value @@ -120,7 +136,14 @@ def _lowercase_dataset_urn(dataset_urn: str) -> str: return str(cur_urn) -def lowercase_dataset_urns(model: DictWrapper) -> None: +def lowercase_dataset_urns( + model: Union[ + DictWrapper, + MetadataChangeEventClass, + MetadataChangeProposalClass, + MetadataChangeProposalWrapper, + ] +) -> None: def modify_urn(urn: str) -> str: if guess_entity_type(urn) == "dataset": return _lowercase_dataset_urn(urn) diff --git a/metadata-ingestion/tests/unit/test_source_helpers.py b/metadata-ingestion/tests/unit/test_source_helpers.py index e7147f8a837df..00f11e4edb683 100644 --- a/metadata-ingestion/tests/unit/test_source_helpers.py +++ b/metadata-ingestion/tests/unit/test_source_helpers.py @@ -277,22 +277,37 @@ def test_auto_browse_path_v2_legacy_browse_path(telemetry_ping_mock): def test_auto_lowercase_aspects(): - mcws = [ - MetadataChangeProposalWrapper( - entityUrn=make_dataset_urn( - "bigquery", "myProject.mySchema.myTable", "PROD" + mcws = auto_workunit( + [ + MetadataChangeProposalWrapper( + entityUrn=make_dataset_urn( + "bigquery", "myProject.mySchema.myTable", "PROD" + ), + aspect=models.DatasetKeyClass( + "urn:li:dataPlatform:bigquery", "myProject.mySchema.myTable", "PROD" + ), ), - aspect=models.DatasetKeyClass( - "bigquery", "myProject.mySchema.myTable", "PROD" + MetadataChangeProposalWrapper( + entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a", + aspect=models.ContainerPropertiesClass( + name="test", + ), ), - ).as_workunit(), - MetadataChangeProposalWrapper( - entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a", - aspect=models.ContainerPropertiesClass( - name="test", + models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-Public-Data.Covid19_Aha.staffing,PROD)", + aspects=[ + models.DatasetPropertiesClass( + customProperties={ + "key": "value", + }, + ), + ], + ), ), - ).as_workunit(), - ] + ] + ) + expected = [ *list( auto_workunit( @@ -300,7 +315,9 @@ def test_auto_lowercase_aspects(): MetadataChangeProposalWrapper( entityUrn="urn:li:dataset:(urn:li:dataPlatform:bigquery,myproject.myschema.mytable,PROD)", aspect=models.DatasetKeyClass( - "bigquery", "myProject.mySchema.myTable", "PROD" + "urn:li:dataPlatform:bigquery", + "myProject.mySchema.myTable", + "PROD", ), ), MetadataChangeProposalWrapper( @@ -309,6 +326,18 @@ def test_auto_lowercase_aspects(): name="test", ), ), + models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_aha.staffing,PROD)", + aspects=[ + models.DatasetPropertiesClass( + customProperties={ + "key": "value", + }, + ), + ], + ), + ), ] ) ), From 241a676c9b4fd0d0f3764deaf5ab0af8b9942d23 Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 3 Oct 2023 00:15:39 +0200 Subject: [PATCH 05/12] Fixing typo --- metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py index abb4af439109e..29c3e0afeade8 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py @@ -59,7 +59,7 @@ class FlagsConfig(ConfigModel): auto_lowercase_urns: bool = Field( default=False, - description="Wether to lowercase dataset entity urns.", + description="Whether to lowercase dataset entity urns.", ) From 461e8d8295613acaec2c8c8d0c2ec472c7f7ba01 Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 3 Oct 2023 08:58:28 +0200 Subject: [PATCH 06/12] Adding config option to lowercase dataset urns --- .../src/datahub/configuration/source_common.py | 11 +++++++++++ .../src/datahub/ingestion/api/source.py | 14 ++++++++------ .../ingestion/source/bigquery_v2/bigquery.py | 3 --- .../source/bigquery_v2/bigquery_config.py | 5 ----- .../src/datahub/ingestion/source/kafka.py | 11 +++++++++-- .../src/datahub/ingestion/source/sql/sql_config.py | 11 +++++++++-- .../src/datahub/ingestion/source/unity/config.py | 6 +++++- 7 files changed, 42 insertions(+), 19 deletions(-) diff --git a/metadata-ingestion/src/datahub/configuration/source_common.py b/metadata-ingestion/src/datahub/configuration/source_common.py index 37b93f3e598e1..705cc0197ec62 100644 --- a/metadata-ingestion/src/datahub/configuration/source_common.py +++ b/metadata-ingestion/src/datahub/configuration/source_common.py @@ -54,6 +54,17 @@ class DatasetSourceConfigMixin(PlatformInstanceConfigMixin, EnvConfigMixin): """ +LOWER_CASE_URN_CONFIG_KEY = "convert_urns_to_lowercase" + + +class LowerCaseDatasetUrnConfigMixin(ConfigModel): + convert_urns_to_lowercase2: bool = Field( + default=False, + alias=LOWER_CASE_URN_CONFIG_KEY, + description="Whether to convert dataset urns to lowercase.", + ) + + class DatasetLineageProviderConfigBase(EnvConfigMixin): """ Any non-Dataset source that produces lineage to Datasets should inherit this class. diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 4e9b431f3dc06..9edc697f817c2 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -22,7 +22,10 @@ from pydantic import BaseModel from datahub.configuration.common import ConfigModel -from datahub.configuration.source_common import PlatformInstanceConfigMixin +from datahub.configuration.source_common import ( + LOWER_CASE_URN_CONFIG_KEY, + PlatformInstanceConfigMixin, +) from datahub.emitter.mcp_builder import mcps_from_mce from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit @@ -194,12 +197,11 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: ) auto_lowercase_urns: Optional[MetadataWorkUnitProcessor] = None - if ( - self.ctx.pipeline_config - and self.ctx.pipeline_config.flags.auto_lowercase_urns - ): + if self.ctx.pipeline_config and self.ctx.pipeline_config.source.config: auto_lowercase_urns = self._get_auto_lowercase_urn_processor( - enabled=self.ctx.pipeline_config.flags.auto_lowercase_urns + enabled=self.ctx.pipeline_config.source.config.get( + LOWER_CASE_URN_CONFIG_KEY, False + ) ) return [ diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 8a16b1a4a5f6b..d1b12160c1ca2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -16,7 +16,6 @@ make_dataplatform_instance_urn, make_dataset_urn, make_tag_urn, - set_dataset_urn_to_lower, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import BigQueryDatasetKey, ContainerKey, ProjectIdKey @@ -218,8 +217,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): if self.config.enable_legacy_sharded_table_support: BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = "" - set_dataset_urn_to_lower(self.config.convert_urns_to_lowercase) - self.bigquery_data_dictionary = BigQuerySchemaApi( self.report.schema_api_perf, self.config.get_bigquery_client() ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 3b06a4699c566..49ec3b31f6f27 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -206,11 +206,6 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool: description="This flag enables the data lineage extraction from Data Lineage API exposed by Google Data Catalog. NOTE: This extractor can't build views lineage. It's recommended to enable the view's DDL parsing. Read the docs to have more information about: https://cloud.google.com/data-catalog/docs/concepts/about-data-lineage", ) - convert_urns_to_lowercase: bool = Field( - default=False, - description="Convert urns to lowercase.", - ) - enable_legacy_sharded_table_support: bool = Field( default=True, description="Use the legacy sharded table urn suffix added.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index 566304e1999b7..d5039360da567 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -18,7 +18,10 @@ from datahub.configuration.common import AllowDenyPattern from datahub.configuration.kafka import KafkaConsumerConnectionConfig -from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.configuration.source_common import ( + DatasetSourceConfigMixin, + LowerCaseDatasetUrnConfigMixin, +) from datahub.emitter import mce_builder from datahub.emitter.mce_builder import ( make_data_platform_urn, @@ -76,7 +79,11 @@ class KafkaTopicConfigKeys(str, Enum): UNCLEAN_LEADER_ELECTION_CONFIG = "unclean.leader.election.enable" -class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): +class KafkaSourceConfig( + StatefulIngestionConfigBase, + DatasetSourceConfigMixin, + LowerCaseDatasetUrnConfigMixin, +): connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig() topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"]) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py index 8f1e04b915f3b..06b748c5a2a89 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py @@ -8,7 +8,10 @@ from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated -from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.configuration.source_common import ( + DatasetSourceConfigMixin, + LowerCaseDatasetUrnConfigMixin, +) from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig from datahub.ingestion.source.state.stale_entity_removal_handler import ( StatefulStaleMetadataRemovalConfig, @@ -21,7 +24,11 @@ logger: logging.Logger = logging.getLogger(__name__) -class SQLCommonConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): +class SQLCommonConfig( + StatefulIngestionConfigBase, + DatasetSourceConfigMixin, + LowerCaseDatasetUrnConfigMixin, +): options: dict = pydantic.Field( default_factory=dict, description="Any options specified here will be passed to [SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py index 94ff755e3b254..40a3dbd8f9f14 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py @@ -6,7 +6,10 @@ from pydantic import Field from datahub.configuration.common import AllowDenyPattern, ConfigModel -from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.configuration.source_common import ( + DatasetSourceConfigMixin, + LowerCaseDatasetUrnConfigMixin, +) from datahub.configuration.validate_field_removal import pydantic_removed_field from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.ingestion.source.state.stale_entity_removal_handler import ( @@ -87,6 +90,7 @@ class UnityCatalogSourceConfig( BaseUsageConfig, DatasetSourceConfigMixin, StatefulProfilingConfigMixin, + LowerCaseDatasetUrnConfigMixin, ): token: str = pydantic.Field(description="Databricks personal access token") workspace_url: str = pydantic.Field( From c52ea076c5f881c2bef7898f9acd25a0f770138d Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 3 Oct 2023 23:34:13 +0200 Subject: [PATCH 07/12] Fixing tests --- .../src/datahub/ingestion/api/source.py | 22 +++++++------------ .../datahub/ingestion/api/source_helpers.py | 5 +---- .../tests/unit/test_source_helpers.py | 2 +- 3 files changed, 10 insertions(+), 19 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 9edc697f817c2..79d939ea071b3 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -196,16 +196,15 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run ) - auto_lowercase_urns: Optional[MetadataWorkUnitProcessor] = None - if self.ctx.pipeline_config and self.ctx.pipeline_config.source.config: - auto_lowercase_urns = self._get_auto_lowercase_urn_processor( - enabled=self.ctx.pipeline_config.source.config.get( - LOWER_CASE_URN_CONFIG_KEY, False - ) - ) - + auto_lowercase_dataset_urns: Optional[MetadataWorkUnitProcessor] = None + if ( + self.ctx.pipeline_config + and self.ctx.pipeline_config.source.config + and self.ctx.pipeline_config.source.config.get(LOWER_CASE_URN_CONFIG_KEY) + ): + auto_lowercase_dataset_urns = auto_lowercase_urns return [ - auto_lowercase_urns, + auto_lowercase_dataset_urns, auto_status_aspect, auto_materialize_referenced_tags, browse_path_processor, @@ -251,11 +250,6 @@ def get_report(self) -> SourceReport: def close(self) -> None: pass - def _get_auto_lowercase_urn_processor( - self, enabled: bool - ) -> MetadataWorkUnitProcessor: - return partial(auto_lowercase_urns, enabled=enabled) - def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor: config = self.get_config() platform = getattr(self, "platform", None) or getattr(config, "platform", None) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 1571051a842cd..520543b82180c 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -174,13 +174,10 @@ def auto_materialize_referenced_tags( def auto_lowercase_urns( - stream: Iterable[MetadataWorkUnit], enabled: bool = False + stream: Iterable[MetadataWorkUnit], ) -> Iterable[MetadataWorkUnit]: """Lowercase all dataset urns""" - if not enabled: - return stream - for wu in stream: try: old_urn = wu.get_urn() diff --git a/metadata-ingestion/tests/unit/test_source_helpers.py b/metadata-ingestion/tests/unit/test_source_helpers.py index 00f11e4edb683..b667af8bb41e9 100644 --- a/metadata-ingestion/tests/unit/test_source_helpers.py +++ b/metadata-ingestion/tests/unit/test_source_helpers.py @@ -342,7 +342,7 @@ def test_auto_lowercase_aspects(): ) ), ] - assert list(auto_lowercase_urns(mcws, True)) == expected + assert list(auto_lowercase_urns(mcws)) == expected @patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") From e507f60046b5ea875d933953fcbed60e2fc341cd Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 4 Oct 2023 00:28:01 +0200 Subject: [PATCH 08/12] Fixing tests --- .../src/datahub/configuration/source_common.py | 6 +----- metadata-ingestion/src/datahub/ingestion/api/source.py | 7 ++----- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/metadata-ingestion/src/datahub/configuration/source_common.py b/metadata-ingestion/src/datahub/configuration/source_common.py index 705cc0197ec62..636fcb96c43f5 100644 --- a/metadata-ingestion/src/datahub/configuration/source_common.py +++ b/metadata-ingestion/src/datahub/configuration/source_common.py @@ -54,13 +54,9 @@ class DatasetSourceConfigMixin(PlatformInstanceConfigMixin, EnvConfigMixin): """ -LOWER_CASE_URN_CONFIG_KEY = "convert_urns_to_lowercase" - - class LowerCaseDatasetUrnConfigMixin(ConfigModel): - convert_urns_to_lowercase2: bool = Field( + convert_urns_to_lowercase: bool = Field( default=False, - alias=LOWER_CASE_URN_CONFIG_KEY, description="Whether to convert dataset urns to lowercase.", ) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 79d939ea071b3..4946dd93e81c9 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -22,10 +22,7 @@ from pydantic import BaseModel from datahub.configuration.common import ConfigModel -from datahub.configuration.source_common import ( - LOWER_CASE_URN_CONFIG_KEY, - PlatformInstanceConfigMixin, -) +from datahub.configuration.source_common import PlatformInstanceConfigMixin from datahub.emitter.mcp_builder import mcps_from_mce from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit @@ -200,7 +197,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: if ( self.ctx.pipeline_config and self.ctx.pipeline_config.source.config - and self.ctx.pipeline_config.source.config.get(LOWER_CASE_URN_CONFIG_KEY) + and self.ctx.pipeline_config.source.config.convert_urns_to_lowercase ): auto_lowercase_dataset_urns = auto_lowercase_urns return [ From 672c4e4aa525263e98fea1862984577f472ef54d Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 4 Oct 2023 08:20:52 +0200 Subject: [PATCH 09/12] Fixing pr review comments --- metadata-ingestion/src/datahub/ingestion/api/source.py | 3 +++ .../src/datahub/ingestion/api/source_helpers.py | 7 ++----- .../src/datahub/ingestion/run/pipeline_config.py | 5 ----- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 4946dd93e81c9..ae8cbd179ac68 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -197,6 +197,9 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: if ( self.ctx.pipeline_config and self.ctx.pipeline_config.source.config + and hasattr( + self.ctx.pipeline_config.source.config, "convert_urns_to_lowercase" + ) and self.ctx.pipeline_config.source.config.convert_urns_to_lowercase ): auto_lowercase_dataset_urns = auto_lowercase_urns diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 520543b82180c..2ce9e07bc57bc 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -1,5 +1,4 @@ import logging -import traceback from datetime import datetime, timezone from typing import ( TYPE_CHECKING, @@ -185,10 +184,8 @@ def auto_lowercase_urns( wu.id = wu.id.replace(old_urn, wu.get_urn()) yield wu - except Exception: - logger.warning( - f"Failed to lowercase urns for {wu} the exception was: {traceback.format_exc()}" - ) + except Exception as e: + logger.warning(f"Failed to lowercase urns for {wu}: {e}", exc_info=True) yield wu diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py index 29c3e0afeade8..ff9a7a6f3d146 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py @@ -57,11 +57,6 @@ class FlagsConfig(ConfigModel): ), ) - auto_lowercase_urns: bool = Field( - default=False, - description="Whether to lowercase dataset entity urns.", - ) - class PipelineConfig(ConfigModel): # Once support for discriminated unions gets merged into Pydantic, we can From 624f956063dd8f0e37636b0c9691eb047b351dd0 Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 4 Oct 2023 12:10:51 +0200 Subject: [PATCH 10/12] Fixing config check --- metadata-ingestion/src/datahub/ingestion/api/source.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index ae8cbd179ac68..419fbdcc0c464 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -196,11 +196,9 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: auto_lowercase_dataset_urns: Optional[MetadataWorkUnitProcessor] = None if ( self.ctx.pipeline_config + and self.ctx.pipeline_config.source and self.ctx.pipeline_config.source.config - and hasattr( - self.ctx.pipeline_config.source.config, "convert_urns_to_lowercase" - ) - and self.ctx.pipeline_config.source.config.convert_urns_to_lowercase + and self.ctx.pipeline_config.source.config.get("convert_urns_to_lowercase") ): auto_lowercase_dataset_urns = auto_lowercase_urns return [ From eb4346e1bf8b621b9996a0aeaf408283fa9a1375 Mon Sep 17 00:00:00 2001 From: treff7es Date: Thu, 5 Oct 2023 07:48:49 +0200 Subject: [PATCH 11/12] fixing tests --- metadata-ingestion/src/datahub/ingestion/api/source.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 419fbdcc0c464..f113eaab20c83 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -198,7 +198,10 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: self.ctx.pipeline_config and self.ctx.pipeline_config.source and self.ctx.pipeline_config.source.config - and self.ctx.pipeline_config.source.config.get("convert_urns_to_lowercase") + and hasattr( + self.ctx.pipeline_config.source.config, "convert_urns_to_lowercase" + ) + and self.ctx.pipeline_config.source.config.convert_urns_to_lowercase ): auto_lowercase_dataset_urns = auto_lowercase_urns return [ From b7313442655717bb4b981bcfb90a933143d09316 Mon Sep 17 00:00:00 2001 From: treff7es Date: Fri, 6 Oct 2023 10:09:49 +0200 Subject: [PATCH 12/12] Fixing to work with Pydantic config and dict as well --- .../src/datahub/ingestion/api/source.py | 17 ++++++++++++++--- .../datahub/ingestion/source/sql/sql_config.py | 1 - 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index f113eaab20c83..b86844b1c4c83 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -198,10 +198,21 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: self.ctx.pipeline_config and self.ctx.pipeline_config.source and self.ctx.pipeline_config.source.config - and hasattr( - self.ctx.pipeline_config.source.config, "convert_urns_to_lowercase" + and ( + ( + hasattr( + self.ctx.pipeline_config.source.config, + "convert_urns_to_lowercase", + ) + and self.ctx.pipeline_config.source.config.convert_urns_to_lowercase + ) + or ( + hasattr(self.ctx.pipeline_config.source.config, "get") + and self.ctx.pipeline_config.source.config.get( + "convert_urns_to_lowercase" + ) + ) ) - and self.ctx.pipeline_config.source.config.convert_urns_to_lowercase ): auto_lowercase_dataset_urns = auto_lowercase_urns return [ diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py index 0a0973d1117e0..08cc74aec3977 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py @@ -7,7 +7,6 @@ from pydantic import Field from datahub.configuration.common import AllowDenyPattern, ConfigModel -from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated from datahub.configuration.source_common import ( DatasetSourceConfigMixin, LowerCaseDatasetUrnConfigMixin,