Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): Adding config option to auto lowercase dataset urns #8928

Merged
merged 15 commits into from
Oct 12, 2023
7 changes: 7 additions & 0 deletions metadata-ingestion/src/datahub/configuration/source_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ class DatasetSourceConfigMixin(PlatformInstanceConfigMixin, EnvConfigMixin):
"""


class LowerCaseDatasetUrnConfigMixin(ConfigModel):
convert_urns_to_lowercase: bool = Field(
default=False,
description="Whether to convert dataset urns to lowercase.",
Copy link
Collaborator

@pedro93 pedro93 Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: This property isn't specific to just dataset urns, is it?
We should ideally have a consistent property across all sources that lower cases any urns it generates or references to ensure we have clean lineage across all sources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently, it is only lowercase dataset urns

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that a design decision? What is the reasoning behind it?

)


class DatasetLineageProviderConfigBase(EnvConfigMixin):
"""
Any non-Dataset source that produces lineage to Datasets should inherit this class.
Expand Down
24 changes: 24 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
auto_lowercase_urns,
auto_materialize_referenced_tags,
auto_status_aspect,
auto_workunit_reporter,
Expand Down Expand Up @@ -192,7 +193,30 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run
)

auto_lowercase_dataset_urns: Optional[MetadataWorkUnitProcessor] = None
if (
self.ctx.pipeline_config
and self.ctx.pipeline_config.source
and self.ctx.pipeline_config.source.config
and (
(
hasattr(
self.ctx.pipeline_config.source.config,
"convert_urns_to_lowercase",
)
and self.ctx.pipeline_config.source.config.convert_urns_to_lowercase
)
or (
hasattr(self.ctx.pipeline_config.source.config, "get")
and self.ctx.pipeline_config.source.config.get(
"convert_urns_to_lowercase"
)
)
)
):
auto_lowercase_dataset_urns = auto_lowercase_urns
return [
auto_lowercase_dataset_urns,
auto_status_aspect,
auto_materialize_referenced_tags,
browse_path_processor,
Expand Down
20 changes: 18 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.tag_urn import TagUrn
from datahub.utilities.urns.urn import guess_entity_type
from datahub.utilities.urns.urn_iter import list_urns
from datahub.utilities.urns.urn_iter import list_urns, lowercase_dataset_urns

if TYPE_CHECKING:
from datahub.ingestion.api.source import SourceReport
Expand Down Expand Up @@ -70,7 +70,6 @@ def auto_status_aspect(
for wu in stream:
urn = wu.get_urn()
all_urns.add(urn)

if not wu.is_primary_source:
# If this is a non-primary source, we pretend like we've seen the status
# aspect so that we don't try to emit a removal for it.
Expand Down Expand Up @@ -173,6 +172,23 @@ def auto_materialize_referenced_tags(
).as_workunit()


def auto_lowercase_urns(
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
"""Lowercase all dataset urns"""

for wu in stream:
try:
old_urn = wu.get_urn()
lowercase_dataset_urns(wu.metadata)
wu.id = wu.id.replace(old_urn, wu.get_urn())

yield wu
except Exception as e:
logger.warning(f"Failed to lowercase urns for {wu}: {e}", exc_info=True)
yield wu


def auto_browse_path_v2(
stream: Iterable[MetadataWorkUnit],
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
make_dataplatform_instance_urn,
make_dataset_urn,
make_tag_urn,
set_dataset_urn_to_lower,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import BigQueryDatasetKey, ContainerKey, ProjectIdKey
Expand Down Expand Up @@ -218,8 +217,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
if self.config.enable_legacy_sharded_table_support:
BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = ""

set_dataset_urn_to_lower(self.config.convert_urns_to_lowercase)

self.bigquery_data_dictionary = BigQuerySchemaApi(
self.report.schema_api_perf, self.config.get_bigquery_client()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,6 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool:
description="This flag enables the data lineage extraction from Data Lineage API exposed by Google Data Catalog. NOTE: This extractor can't build views lineage. It's recommended to enable the view's DDL parsing. Read the docs to have more information about: https://cloud.google.com/data-catalog/docs/concepts/about-data-lineage",
)

convert_urns_to_lowercase: bool = Field(
default=False,
description="Convert urns to lowercase.",
)

enable_legacy_sharded_table_support: bool = Field(
default=True,
description="Use the legacy sharded table urn suffix added.",
Expand Down
11 changes: 9 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import (
make_data_platform_urn,
Expand Down Expand Up @@ -76,7 +79,11 @@ class KafkaTopicConfigKeys(str, Enum):
UNCLEAN_LEADER_ELECTION_CONFIG = "unclean.leader.election.enable"


class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
class KafkaSourceConfig(
StatefulIngestionConfigBase,
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()

topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from pydantic import Field

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand All @@ -21,7 +24,11 @@
logger: logging.Logger = logging.getLogger(__name__)


class SQLCommonConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
class SQLCommonConfig(
StatefulIngestionConfigBase,
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
options: dict = pydantic.Field(
default_factory=dict,
description="Any options specified here will be passed to [SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from pydantic import Field

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand Down Expand Up @@ -91,6 +94,7 @@ class UnityCatalogSourceConfig(
BaseUsageConfig,
DatasetSourceConfigMixin,
StatefulProfilingConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
token: str = pydantic.Field(description="Databricks personal access token")
workspace_url: str = pydantic.Field(
Expand Down
33 changes: 28 additions & 5 deletions metadata-ingestion/src/datahub/utilities/urns/urn_iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
from avro.schema import Field, RecordSchema

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import DictWrapper
from datahub.metadata.schema_classes import (
DictWrapper,
MetadataChangeEventClass,
MetadataChangeProposalClass,
)
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.urn import Urn, guess_entity_type

Expand Down Expand Up @@ -32,7 +36,7 @@ def list_urns_with_path(

if isinstance(model, MetadataChangeProposalWrapper):
if model.entityUrn:
urns.append((model.entityUrn, ["urn"]))
urns.append((model.entityUrn, ["entityUrn"]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch

if model.entityKeyAspect:
urns.extend(
_add_prefix_to_paths(
Expand Down Expand Up @@ -83,7 +87,15 @@ def list_urns(model: Union[DictWrapper, MetadataChangeProposalWrapper]) -> List[
return [urn for urn, _ in list_urns_with_path(model)]


def transform_urns(model: DictWrapper, func: Callable[[str], str]) -> None:
def transform_urns(
model: Union[
DictWrapper,
MetadataChangeEventClass,
MetadataChangeProposalClass,
MetadataChangeProposalWrapper,
],
func: Callable[[str], str],
) -> None:
"""
Rewrites all URNs in the given object according to the given function.
"""
Expand All @@ -95,14 +107,18 @@ def transform_urns(model: DictWrapper, func: Callable[[str], str]) -> None:


def _modify_at_path(
model: Union[DictWrapper, list], path: _Path, new_value: str
model: Union[DictWrapper, MetadataChangeProposalWrapper, list],
path: _Path,
new_value: str,
) -> None:
assert len(path) > 0

if len(path) == 1:
if isinstance(path[0], int):
assert isinstance(model, list)
model[path[0]] = new_value
elif isinstance(model, MetadataChangeProposalWrapper):
setattr(model, path[0], new_value)
else:
assert isinstance(model, DictWrapper)
model._inner_dict[path[0]] = new_value
Expand All @@ -120,7 +136,14 @@ def _lowercase_dataset_urn(dataset_urn: str) -> str:
return str(cur_urn)


def lowercase_dataset_urns(model: DictWrapper) -> None:
def lowercase_dataset_urns(
model: Union[
DictWrapper,
MetadataChangeEventClass,
MetadataChangeProposalClass,
MetadataChangeProposalWrapper,
]
) -> None:
def modify_urn(urn: str) -> str:
if guess_entity_type(urn) == "dataset":
return _lowercase_dataset_urn(urn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
auto_empty_dataset_usage_statistics,
auto_lowercase_urns,
auto_status_aspect,
auto_workunit,
)
Expand Down Expand Up @@ -275,6 +276,75 @@ def test_auto_browse_path_v2_legacy_browse_path(telemetry_ping_mock):
assert paths["platform,dataset-2,PROD)"] == _make_browse_path_entries(["something"])


def test_auto_lowercase_aspects():
mcws = auto_workunit(
[
MetadataChangeProposalWrapper(
entityUrn=make_dataset_urn(
"bigquery", "myProject.mySchema.myTable", "PROD"
),
aspect=models.DatasetKeyClass(
"urn:li:dataPlatform:bigquery", "myProject.mySchema.myTable", "PROD"
),
),
MetadataChangeProposalWrapper(
entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a",
aspect=models.ContainerPropertiesClass(
name="test",
),
),
models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-Public-Data.Covid19_Aha.staffing,PROD)",
aspects=[
models.DatasetPropertiesClass(
customProperties={
"key": "value",
},
),
],
),
),
]
)

expected = [
*list(
auto_workunit(
[
MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:bigquery,myproject.myschema.mytable,PROD)",
aspect=models.DatasetKeyClass(
"urn:li:dataPlatform:bigquery",
"myProject.mySchema.myTable",
"PROD",
),
),
MetadataChangeProposalWrapper(
entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a",
aspect=models.ContainerPropertiesClass(
name="test",
),
),
models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_aha.staffing,PROD)",
aspects=[
models.DatasetPropertiesClass(
customProperties={
"key": "value",
},
),
],
),
),
]
)
),
]
assert list(auto_lowercase_urns(mcws)) == expected


@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping")
def test_auto_browse_path_v2_container_over_legacy_browse_path(telemetry_ping_mock):
structure = {"a": {"b": ["c"]}}
Expand Down