Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): Adding config option to auto lowercase dataset urns #8928

Merged
merged 15 commits into from
Oct 12, 2023
7 changes: 7 additions & 0 deletions metadata-ingestion/src/datahub/configuration/source_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ class DatasetSourceConfigMixin(PlatformInstanceConfigMixin, EnvConfigMixin):
"""


class LowerCaseDatasetUrnConfigMixin(ConfigModel):
convert_urns_to_lowercase: bool = Field(
default=False,
description="Whether to convert dataset urns to lowercase.",
Copy link
Collaborator

@pedro93 pedro93 Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: This property isn't specific to just dataset urns, is it?
We should ideally have a consistent property across all sources that lower cases any urns it generates or references to ensure we have clean lineage across all sources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently, it is only lowercase dataset urns

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that a design decision? What is the reasoning behind it?

)


class DatasetLineageProviderConfigBase(EnvConfigMixin):
"""
Any non-Dataset source that produces lineage to Datasets should inherit this class.
Expand Down
17 changes: 5 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,17 +193,15 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run
)

auto_lowercase_urns: Optional[MetadataWorkUnitProcessor] = None
auto_lowercase_dataset_urns: Optional[MetadataWorkUnitProcessor] = None
if (
self.ctx.pipeline_config
and self.ctx.pipeline_config.flags.auto_lowercase_urns
and self.ctx.pipeline_config.source.config
and self.ctx.pipeline_config.source.config.convert_urns_to_lowercase
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
):
auto_lowercase_urns = self._get_auto_lowercase_urn_processor(
enabled=self.ctx.pipeline_config.flags.auto_lowercase_urns
)

auto_lowercase_dataset_urns = auto_lowercase_urns
return [
auto_lowercase_urns,
auto_lowercase_dataset_urns,
auto_status_aspect,
auto_materialize_referenced_tags,
browse_path_processor,
Expand Down Expand Up @@ -249,11 +247,6 @@ def get_report(self) -> SourceReport:
def close(self) -> None:
pass

def _get_auto_lowercase_urn_processor(
self, enabled: bool
) -> MetadataWorkUnitProcessor:
return partial(auto_lowercase_urns, enabled=enabled)

def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor:
config = self.get_config()
platform = getattr(self, "platform", None) or getattr(config, "platform", None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,10 @@ def auto_materialize_referenced_tags(


def auto_lowercase_urns(
stream: Iterable[MetadataWorkUnit], enabled: bool = False
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
"""Lowercase all dataset urns"""

if not enabled:
return stream

for wu in stream:
try:
old_urn = wu.get_urn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class FlagsConfig(ConfigModel):

auto_lowercase_urns: bool = Field(
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
default=False,
description="Wether to lowercase dataset entity urns.",
description="Whether to lowercase dataset entity urns.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this

)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
make_dataplatform_instance_urn,
make_dataset_urn,
make_tag_urn,
set_dataset_urn_to_lower,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import BigQueryDatasetKey, ContainerKey, ProjectIdKey
Expand Down Expand Up @@ -218,8 +217,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
if self.config.enable_legacy_sharded_table_support:
BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = ""

set_dataset_urn_to_lower(self.config.convert_urns_to_lowercase)

self.bigquery_data_dictionary = BigQuerySchemaApi(
self.report.schema_api_perf, self.config.get_bigquery_client()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,6 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool:
description="This flag enables the data lineage extraction from Data Lineage API exposed by Google Data Catalog. NOTE: This extractor can't build views lineage. It's recommended to enable the view's DDL parsing. Read the docs to have more information about: https://cloud.google.com/data-catalog/docs/concepts/about-data-lineage",
)

convert_urns_to_lowercase: bool = Field(
default=False,
description="Convert urns to lowercase.",
)

enable_legacy_sharded_table_support: bool = Field(
default=True,
description="Use the legacy sharded table urn suffix added.",
Expand Down
11 changes: 9 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import (
make_data_platform_urn,
Expand Down Expand Up @@ -76,7 +79,11 @@ class KafkaTopicConfigKeys(str, Enum):
UNCLEAN_LEADER_ELECTION_CONFIG = "unclean.leader.election.enable"


class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
class KafkaSourceConfig(
StatefulIngestionConfigBase,
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()

topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
Expand All @@ -21,7 +24,11 @@
logger: logging.Logger = logging.getLogger(__name__)


class SQLCommonConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
class SQLCommonConfig(
StatefulIngestionConfigBase,
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
options: dict = pydantic.Field(
default_factory=dict,
description="Any options specified here will be passed to [SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from pydantic import Field

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand Down Expand Up @@ -87,6 +90,7 @@ class UnityCatalogSourceConfig(
BaseUsageConfig,
DatasetSourceConfigMixin,
StatefulProfilingConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
token: str = pydantic.Field(description="Databricks personal access token")
workspace_url: str = pydantic.Field(
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/test_source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ def test_auto_lowercase_aspects():
)
),
]
assert list(auto_lowercase_urns(mcws, True)) == expected
assert list(auto_lowercase_urns(mcws)) == expected


@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping")
Expand Down