diff --git a/metadata-ingestion/src/datahub/configuration/source_common.py b/metadata-ingestion/src/datahub/configuration/source_common.py index a9f891ddb7b1e..80b6ceb576c1c 100644 --- a/metadata-ingestion/src/datahub/configuration/source_common.py +++ b/metadata-ingestion/src/datahub/configuration/source_common.py @@ -54,6 +54,13 @@ class DatasetSourceConfigMixin(PlatformInstanceConfigMixin, EnvConfigMixin): """ +class LowerCaseDatasetUrnConfigMixin(ConfigModel): + convert_urns_to_lowercase: bool = Field( + default=False, + description="Whether to convert dataset urns to lowercase.", + ) + + class DatasetLineageProviderConfigBase(EnvConfigMixin): """ Any non-Dataset source that produces lineage to Datasets should inherit this class. diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 0bcc220cad49b..b86844b1c4c83 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -29,6 +29,7 @@ from datahub.ingestion.api.report import Report from datahub.ingestion.api.source_helpers import ( auto_browse_path_v2, + auto_lowercase_urns, auto_materialize_referenced_tags, auto_status_aspect, auto_workunit_reporter, @@ -192,7 +193,30 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run ) + auto_lowercase_dataset_urns: Optional[MetadataWorkUnitProcessor] = None + if ( + self.ctx.pipeline_config + and self.ctx.pipeline_config.source + and self.ctx.pipeline_config.source.config + and ( + ( + hasattr( + self.ctx.pipeline_config.source.config, + "convert_urns_to_lowercase", + ) + and self.ctx.pipeline_config.source.config.convert_urns_to_lowercase + ) + or ( + hasattr(self.ctx.pipeline_config.source.config, "get") + and self.ctx.pipeline_config.source.config.get( + "convert_urns_to_lowercase" + ) + ) + ) + ): + auto_lowercase_dataset_urns = auto_lowercase_urns return [ + auto_lowercase_dataset_urns, auto_status_aspect, auto_materialize_referenced_tags, browse_path_processor, diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 7fc15cf829678..2ce9e07bc57bc 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -35,7 +35,7 @@ from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import guess_entity_type -from datahub.utilities.urns.urn_iter import list_urns +from datahub.utilities.urns.urn_iter import list_urns, lowercase_dataset_urns if TYPE_CHECKING: from datahub.ingestion.api.source import SourceReport @@ -70,7 +70,6 @@ def auto_status_aspect( for wu in stream: urn = wu.get_urn() all_urns.add(urn) - if not wu.is_primary_source: # If this is a non-primary source, we pretend like we've seen the status # aspect so that we don't try to emit a removal for it. @@ -173,6 +172,23 @@ def auto_materialize_referenced_tags( ).as_workunit() +def auto_lowercase_urns( + stream: Iterable[MetadataWorkUnit], +) -> Iterable[MetadataWorkUnit]: + """Lowercase all dataset urns""" + + for wu in stream: + try: + old_urn = wu.get_urn() + lowercase_dataset_urns(wu.metadata) + wu.id = wu.id.replace(old_urn, wu.get_urn()) + + yield wu + except Exception as e: + logger.warning(f"Failed to lowercase urns for {wu}: {e}", exc_info=True) + yield wu + + def auto_browse_path_v2( stream: Iterable[MetadataWorkUnit], *, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index b4a04d96b532b..648aad9112fb3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -16,7 +16,6 @@ make_dataplatform_instance_urn, make_dataset_urn, make_tag_urn, - set_dataset_urn_to_lower, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import BigQueryDatasetKey, ContainerKey, ProjectIdKey @@ -218,8 +217,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): if self.config.enable_legacy_sharded_table_support: BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = "" - set_dataset_urn_to_lower(self.config.convert_urns_to_lowercase) - self.bigquery_data_dictionary = BigQuerySchemaApi( self.report.schema_api_perf, self.config.get_bigquery_client() ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 483355a85ac05..944814b6936a4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -206,11 +206,6 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool: description="This flag enables the data lineage extraction from Data Lineage API exposed by Google Data Catalog. NOTE: This extractor can't build views lineage. It's recommended to enable the view's DDL parsing. Read the docs to have more information about: https://cloud.google.com/data-catalog/docs/concepts/about-data-lineage", ) - convert_urns_to_lowercase: bool = Field( - default=False, - description="Convert urns to lowercase.", - ) - enable_legacy_sharded_table_support: bool = Field( default=True, description="Use the legacy sharded table urn suffix added.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index 566304e1999b7..d5039360da567 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -18,7 +18,10 @@ from datahub.configuration.common import AllowDenyPattern from datahub.configuration.kafka import KafkaConsumerConnectionConfig -from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.configuration.source_common import ( + DatasetSourceConfigMixin, + LowerCaseDatasetUrnConfigMixin, +) from datahub.emitter import mce_builder from datahub.emitter.mce_builder import ( make_data_platform_urn, @@ -76,7 +79,11 @@ class KafkaTopicConfigKeys(str, Enum): UNCLEAN_LEADER_ELECTION_CONFIG = "unclean.leader.election.enable" -class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): +class KafkaSourceConfig( + StatefulIngestionConfigBase, + DatasetSourceConfigMixin, + LowerCaseDatasetUrnConfigMixin, +): connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig() topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"]) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py index 677d32c8bac08..08cc74aec3977 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py @@ -7,7 +7,10 @@ from pydantic import Field from datahub.configuration.common import AllowDenyPattern, ConfigModel -from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.configuration.source_common import ( + DatasetSourceConfigMixin, + LowerCaseDatasetUrnConfigMixin, +) from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig from datahub.ingestion.source.state.stale_entity_removal_handler import ( @@ -21,7 +24,11 @@ logger: logging.Logger = logging.getLogger(__name__) -class SQLCommonConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): +class SQLCommonConfig( + StatefulIngestionConfigBase, + DatasetSourceConfigMixin, + LowerCaseDatasetUrnConfigMixin, +): options: dict = pydantic.Field( default_factory=dict, description="Any options specified here will be passed to [SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py index 51390873712d3..a57ee39848855 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py @@ -7,7 +7,10 @@ from pydantic import Field from datahub.configuration.common import AllowDenyPattern, ConfigModel -from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.configuration.source_common import ( + DatasetSourceConfigMixin, + LowerCaseDatasetUrnConfigMixin, +) from datahub.configuration.validate_field_removal import pydantic_removed_field from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.ingestion.source.state.stale_entity_removal_handler import ( @@ -91,6 +94,7 @@ class UnityCatalogSourceConfig( BaseUsageConfig, DatasetSourceConfigMixin, StatefulProfilingConfigMixin, + LowerCaseDatasetUrnConfigMixin, ): token: str = pydantic.Field(description="Databricks personal access token") workspace_url: str = pydantic.Field( diff --git a/metadata-ingestion/src/datahub/utilities/urns/urn_iter.py b/metadata-ingestion/src/datahub/utilities/urns/urn_iter.py index 261f95331af61..e13d439161064 100644 --- a/metadata-ingestion/src/datahub/utilities/urns/urn_iter.py +++ b/metadata-ingestion/src/datahub/utilities/urns/urn_iter.py @@ -3,7 +3,11 @@ from avro.schema import Field, RecordSchema from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.metadata.schema_classes import DictWrapper +from datahub.metadata.schema_classes import ( + DictWrapper, + MetadataChangeEventClass, + MetadataChangeProposalClass, +) from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub.utilities.urns.urn import Urn, guess_entity_type @@ -32,7 +36,7 @@ def list_urns_with_path( if isinstance(model, MetadataChangeProposalWrapper): if model.entityUrn: - urns.append((model.entityUrn, ["urn"])) + urns.append((model.entityUrn, ["entityUrn"])) if model.entityKeyAspect: urns.extend( _add_prefix_to_paths( @@ -83,7 +87,15 @@ def list_urns(model: Union[DictWrapper, MetadataChangeProposalWrapper]) -> List[ return [urn for urn, _ in list_urns_with_path(model)] -def transform_urns(model: DictWrapper, func: Callable[[str], str]) -> None: +def transform_urns( + model: Union[ + DictWrapper, + MetadataChangeEventClass, + MetadataChangeProposalClass, + MetadataChangeProposalWrapper, + ], + func: Callable[[str], str], +) -> None: """ Rewrites all URNs in the given object according to the given function. """ @@ -95,7 +107,9 @@ def transform_urns(model: DictWrapper, func: Callable[[str], str]) -> None: def _modify_at_path( - model: Union[DictWrapper, list], path: _Path, new_value: str + model: Union[DictWrapper, MetadataChangeProposalWrapper, list], + path: _Path, + new_value: str, ) -> None: assert len(path) > 0 @@ -103,6 +117,8 @@ def _modify_at_path( if isinstance(path[0], int): assert isinstance(model, list) model[path[0]] = new_value + elif isinstance(model, MetadataChangeProposalWrapper): + setattr(model, path[0], new_value) else: assert isinstance(model, DictWrapper) model._inner_dict[path[0]] = new_value @@ -120,7 +136,14 @@ def _lowercase_dataset_urn(dataset_urn: str) -> str: return str(cur_urn) -def lowercase_dataset_urns(model: DictWrapper) -> None: +def lowercase_dataset_urns( + model: Union[ + DictWrapper, + MetadataChangeEventClass, + MetadataChangeProposalClass, + MetadataChangeProposalWrapper, + ] +) -> None: def modify_urn(urn: str) -> str: if guess_entity_type(urn) == "dataset": return _lowercase_dataset_urn(urn) diff --git a/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py b/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py index b6ec6ebce240c..b667af8bb41e9 100644 --- a/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py +++ b/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py @@ -16,6 +16,7 @@ from datahub.ingestion.api.source_helpers import ( auto_browse_path_v2, auto_empty_dataset_usage_statistics, + auto_lowercase_urns, auto_status_aspect, auto_workunit, ) @@ -275,6 +276,75 @@ def test_auto_browse_path_v2_legacy_browse_path(telemetry_ping_mock): assert paths["platform,dataset-2,PROD)"] == _make_browse_path_entries(["something"]) +def test_auto_lowercase_aspects(): + mcws = auto_workunit( + [ + MetadataChangeProposalWrapper( + entityUrn=make_dataset_urn( + "bigquery", "myProject.mySchema.myTable", "PROD" + ), + aspect=models.DatasetKeyClass( + "urn:li:dataPlatform:bigquery", "myProject.mySchema.myTable", "PROD" + ), + ), + MetadataChangeProposalWrapper( + entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a", + aspect=models.ContainerPropertiesClass( + name="test", + ), + ), + models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-Public-Data.Covid19_Aha.staffing,PROD)", + aspects=[ + models.DatasetPropertiesClass( + customProperties={ + "key": "value", + }, + ), + ], + ), + ), + ] + ) + + expected = [ + *list( + auto_workunit( + [ + MetadataChangeProposalWrapper( + entityUrn="urn:li:dataset:(urn:li:dataPlatform:bigquery,myproject.myschema.mytable,PROD)", + aspect=models.DatasetKeyClass( + "urn:li:dataPlatform:bigquery", + "myProject.mySchema.myTable", + "PROD", + ), + ), + MetadataChangeProposalWrapper( + entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a", + aspect=models.ContainerPropertiesClass( + name="test", + ), + ), + models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_aha.staffing,PROD)", + aspects=[ + models.DatasetPropertiesClass( + customProperties={ + "key": "value", + }, + ), + ], + ), + ), + ] + ) + ), + ] + assert list(auto_lowercase_urns(mcws)) == expected + + @patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") def test_auto_browse_path_v2_container_over_legacy_browse_path(telemetry_ping_mock): structure = {"a": {"b": ["c"]}}