Skip to content

Commit

Permalink
feat(ingest/datahub): support dropping duplicate schema fields (#12308)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jan 9, 2025
1 parent 210e2c1 commit 0c31d9a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ class DataHubSourceConfig(StatefulIngestionConfigBase):

urn_pattern: AllowDenyPattern = Field(default=AllowDenyPattern())

drop_duplicate_schema_fields: bool = Field(
default=False,
description="Whether to drop duplicate schema fields in the schemaMetadata aspect. "
"Useful if the source system has duplicate field paths in the db, but we're pushing to a system with server-side duplicate checking.",
)

@root_validator(skip_on_failure=True)
def check_ingesting_data(cls, values):
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.source_helpers import (
auto_fix_duplicate_schema_field_paths,
auto_workunit_reporter,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.datahub.config import DataHubSourceConfig
from datahub.ingestion.source.datahub.datahub_api_reader import DataHubApiReader
Expand Down Expand Up @@ -57,7 +60,14 @@ def get_report(self) -> SourceReport:

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
# Exactly replicate data from DataHub source
return [partial(auto_workunit_reporter, self.get_report())]
return [
(
auto_fix_duplicate_schema_field_paths
if self.config.drop_duplicate_schema_fields
else None
),
partial(auto_workunit_reporter, self.get_report()),
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.report.stop_time = datetime.now(tz=timezone.utc)
Expand Down

0 comments on commit 0c31d9a

Please sign in to comment.