Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/redshift): patch instead of replace redshift custom properties #9293

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c7c8560
add patch for custom properties for redshift ingest
ethan-cartwright Nov 21, 2023
61ce0c4
remove unnecessary imports
ethan-cartwright Nov 21, 2023
02ba8d7
remove unneeded dataset_properties mwu
ethan-cartwright Nov 22, 2023
4e19c6b
add unit test
ethan-cartwright Nov 22, 2023
314fb2d
clean up imports
ethan-cartwright Nov 22, 2023
5f4ccee
WIP: add back some fields, TODO: create set methods for qualified nam…
ethan-cartwright Nov 27, 2023
ec0be9d
add patch methods
ethan-cartwright Dec 7, 2023
c796133
tests passing
ethan-cartwright Dec 12, 2023
7074a29
add tests
ethan-cartwright Dec 12, 2023
a8e10fe
fix linting
ethan-cartwright Dec 13, 2023
0e98060
Merge branch 'master' into add_patch_custom_properties_redshift
ethan-cartwright Dec 26, 2023
ce36600
refactor to helper method
ethan-cartwright Dec 26, 2023
7d2b3e7
Merge branch 'master' into add_patch_custom_properties_redshift
ethan-cartwright Dec 28, 2023
763789a
generalize typing to avoid circular dependency
ethan-cartwright Dec 28, 2023
5954997
remove unnecessary logs and imports
ethan-cartwright Dec 28, 2023
8e583eb
Merge branch 'master' into add_patch_custom_properties_redshift
ethan-cartwright Jan 25, 2024
27e6bc0
make it configurable; use DatasetPropertiesClass; fix type hinting
ethan-cartwright Jan 25, 2024
a619ea6
remove type Any import
ethan-cartwright Jan 25, 2024
52e13e3
use assert isinstance instead of type casting
ethan-cartwright Jan 25, 2024
01a53b4
test refactor
ethan-cartwright Jan 25, 2024
8a0492c
improve smoketest code
ethan-cartwright Jan 25, 2024
9c4b05c
put BaseTable in quotes
ethan-cartwright Jan 25, 2024
f296516
fix linting
ethan-cartwright Jan 29, 2024
246f6d3
add missing return statement
ethan-cartwright Jan 29, 2024
34bd024
Merge branch 'master' into add_patch_custom_properties_redshift
ethan-cartwright Feb 2, 2024
a43cbef
update unit tests for patch flag
ethan-cartwright Feb 5, 2024
59016d8
add pytest setup fixture
ethan-cartwright Feb 5, 2024
4c2a9c0
fix lint source
anshbansal Feb 27, 2024
6ee9fe9
Merge branch 'master' into add_patch_custom_properties_redshift
anshbansal Feb 27, 2024
e2728e3
some lint fix
anshbansal Feb 27, 2024
01c5ad7
Merge branch 'master' into add_patch_custom_properties_redshift
ethan-cartwright Feb 28, 2024
d4868a3
Merge branch 'add_patch_custom_properties_redshift' of https://github…
ethan-cartwright Feb 28, 2024
efaa28c
reverted smoke tests
ethan-cartwright Feb 28, 2024
95ee34e
genericize dataset props patch builder
ethan-cartwright Mar 4, 2024
8a55ea7
address linting
ethan-cartwright Mar 4, 2024
738dd9a
remove log statement
ethan-cartwright Mar 4, 2024
e942648
black formatting
ethan-cartwright Mar 4, 2024
d7e64de
Merge branch 'master' into add_patch_custom_properties_redshift
anshbansal Mar 5, 2024
6a65d2a
enable patch by default
anshbansal Mar 5, 2024
9723412
address PR comments
ethan-cartwright Mar 7, 2024
e396c56
Merge branch 'add_patch_custom_properties_redshift' of https://github…
ethan-cartwright Mar 7, 2024
89cf4c6
address linter
ethan-cartwright Mar 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
BrowsePathsV2Class,
ChangeTypeClass,
ContainerClass,
DatasetPropertiesClass,
DatasetUsageStatisticsClass,
MetadataChangeEventClass,
MetadataChangeProposalClass,
StatusClass,
TimeWindowSizeClass,
)
from datahub.specific.dataset import DatasetPatchBuilder
from datahub.telemetry import telemetry
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.tag_urn import TagUrn
Expand Down Expand Up @@ -58,6 +60,22 @@ def auto_workunit(
yield item.as_workunit()


def create_dataset_props_patch_builder(
dataset_urn: str,
dataset_properties: DatasetPropertiesClass,
) -> DatasetPatchBuilder:
"""Creates a patch builder with a table's or view's attributes and dataset properties"""
patch_builder = DatasetPatchBuilder(dataset_urn)
patch_builder.set_display_name(dataset_properties.name)
patch_builder.set_description(dataset_properties.description)
patch_builder.set_created(dataset_properties.created)
patch_builder.set_last_modified(dataset_properties.lastModified)
patch_builder.set_qualified_name(dataset_properties.qualifiedName)
patch_builder.add_custom_properties(dataset_properties.customProperties)

return patch_builder


def auto_status_aspect(
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -371,9 +389,9 @@ def auto_empty_dataset_usage_statistics(
userCounts=[],
fieldCounts=[],
),
changeType=ChangeTypeClass.CREATE
if all_buckets
else ChangeTypeClass.UPSERT,
changeType=(
ChangeTypeClass.CREATE if all_buckets else ChangeTypeClass.UPSERT
),
).as_workunit()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ class RedshiftConfig(
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run. This config works with rest-sink only.",
)

patch_custom_properties: bool = Field(
default=True,
description="Whether to patch custom properties on existing datasets rather than replace.",
)

resolve_temp_table_in_lineage: bool = Field(
default=True,
description="Whether to resolve temp table appear in lineage to upstream permanent tables.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
TestableSource,
TestConnectionReport,
)
from datahub.ingestion.api.source_helpers import create_dataset_props_patch_builder
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
Expand Down Expand Up @@ -756,24 +757,36 @@ def gen_dataset_workunits(

dataset_properties = DatasetProperties(
name=table.name,
created=TimeStamp(time=int(table.created.timestamp() * 1000))
if table.created
else None,
lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000))
if table.last_altered
else TimeStamp(time=int(table.created.timestamp() * 1000))
if table.created
else None,
created=(
TimeStamp(time=int(table.created.timestamp() * 1000))
if table.created
else None
),
lastModified=(
TimeStamp(time=int(table.last_altered.timestamp() * 1000))
if table.last_altered
else (
TimeStamp(time=int(table.created.timestamp() * 1000))
if table.created
else None
)
),
description=table.comment,
qualifiedName=str(datahub_dataset_name),
customProperties=custom_properties,
)

if custom_properties:
dataset_properties.customProperties = custom_properties

yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=dataset_properties
).as_workunit()
if self.config.patch_custom_properties:
patch_builder = create_dataset_props_patch_builder(
dataset_urn, dataset_properties
)
for patch_mcp in patch_builder.build():
yield MetadataWorkUnit(
id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp
)
else:
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=dataset_properties
).as_workunit()

# TODO: Check if needed
# if tags_to_add:
Expand Down
70 changes: 60 additions & 10 deletions metadata-ingestion/src/datahub/specific/dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Dict, Generic, List, Optional, Tuple, TypeVar, Union

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp
from datahub.metadata.schema_classes import (
DatasetPropertiesClass as DatasetProperties,
EditableDatasetPropertiesClass as EditableDatasetProperties,
Expand Down Expand Up @@ -258,16 +259,19 @@ def for_field(
)

def set_description(
self, description: str, editable: bool = False
self, description: Optional[str] = None, editable: bool = False
) -> "DatasetPatchBuilder":
self._add_patch(
DatasetProperties.ASPECT_NAME
if not editable
else EditableDatasetProperties.ASPECT_NAME,
"add",
path="/description",
value=description,
)
if description is not None:
self._add_patch(
(
DatasetProperties.ASPECT_NAME
if not editable
else EditableDatasetProperties.ASPECT_NAME
),
"add",
path="/description",
value=description,
)
return self

def set_custom_properties(
Expand All @@ -285,11 +289,21 @@ def add_custom_property(self, key: str, value: str) -> "DatasetPatchBuilder":
self.custom_properties_patch_helper.add_property(key, value)
return self

def add_custom_properties(
self, custom_properties: Optional[Dict[str, str]] = None
) -> "DatasetPatchBuilder":
if custom_properties is not None:
for key, value in custom_properties.items():
self.custom_properties_patch_helper.add_property(key, value)
return self

def remove_custom_property(self, key: str) -> "DatasetPatchBuilder":
self.custom_properties_patch_helper.remove_property(key)
return self

def set_display_name(self, display_name: str) -> "DatasetPatchBuilder":
def set_display_name(
self, display_name: Optional[str] = None
) -> "DatasetPatchBuilder":
if display_name is not None:
self._add_patch(
DatasetProperties.ASPECT_NAME,
Expand All @@ -299,6 +313,42 @@ def set_display_name(self, display_name: str) -> "DatasetPatchBuilder":
)
return self

def set_qualified_name(
self, qualified_name: Optional[str] = None
) -> "DatasetPatchBuilder":
if qualified_name is not None:
self._add_patch(
DatasetProperties.ASPECT_NAME,
"add",
path="/qualifiedName",
value=qualified_name,
)
return self

def set_created(
self, timestamp: Optional[TimeStamp] = None
) -> "DatasetPatchBuilder":
if timestamp is not None:
self._add_patch(
DatasetProperties.ASPECT_NAME,
"add",
path="/created",
value=timestamp,
)
return self

def set_last_modified(
self, timestamp: Optional[TimeStamp] = None
) -> "DatasetPatchBuilder":
if timestamp is not None:
self._add_patch(
DatasetProperties.ASPECT_NAME,
"add",
path="/lastModified",
value=timestamp,
)
return self

def set_structured_property(
self, property_name: str, value: Union[str, float, List[Union[str, float]]]
) -> "DatasetPatchBuilder":
Expand Down
63 changes: 63 additions & 0 deletions metadata-ingestion/tests/unit/test_redshift_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from typing import Iterable

from datahub.emitter.mcp import (
MetadataChangeProposalClass,
MetadataChangeProposalWrapper,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.redshift.config import RedshiftConfig
from datahub.ingestion.source.redshift.redshift import RedshiftSource
from datahub.ingestion.source.redshift.redshift_schema import RedshiftTable
from datahub.metadata.schema_classes import MetadataChangeEventClass


def redshift_source_setup(custom_props_flag: bool) -> Iterable[MetadataWorkUnit]:
config = RedshiftConfig(
host_port="localhost:5439",
database="test",
patch_custom_properties=custom_props_flag,
)
source: RedshiftSource = RedshiftSource(config, ctx=PipelineContext(run_id="test"))
gen = source.gen_dataset_workunits(
table=RedshiftTable(
name="category",
columns=[],
created=None,
comment="",
),
database="dev",
schema="public",
sub_type="test_sub_type",
custom_properties={"my_key": "my_value"},
)
return gen


def test_gen_dataset_workunits_patch_custom_properties_patch():
gen = redshift_source_setup(True)
custom_props_exist = False
for item in gen:
mcp = item.metadata
assert not isinstance(mcp, MetadataChangeEventClass)
if mcp.aspectName == "datasetProperties":
assert isinstance(mcp, MetadataChangeProposalClass)
assert mcp.changeType == "PATCH"
custom_props_exist = True
else:
assert isinstance(mcp, MetadataChangeProposalWrapper)

assert custom_props_exist


def test_gen_dataset_workunits_patch_custom_properties_upsert():
gen = redshift_source_setup(False)
custom_props_exist = False
for item in gen:
assert isinstance(item.metadata, MetadataChangeProposalWrapper)
mcp = item.metadata
if mcp.aspectName == "datasetProperties":
assert mcp.changeType == "UPSERT"
custom_props_exist = True

assert custom_props_exist
Loading