From c7c85606c97841999088f6a273b650a6496098db Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Tue, 21 Nov 2023 17:01:52 -0500 Subject: [PATCH 01/33] add patch for custom properties for redshift ingest --- .../ingestion/source/redshift/redshift.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index c7d01021773b1..ed2e1f43e99ad 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -104,6 +104,17 @@ from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.registries.domain_registry import DomainRegistry + +import logging +from typing import Union + +from datahub.configuration.kafka import KafkaProducerConnectionConfig +from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig +from datahub.emitter.mce_builder import make_dataset_urn +from datahub.emitter.rest_emitter import DataHubRestEmitter +from datahub.specific.dataset import DatasetPatchBuilder + + logger: logging.Logger = logging.getLogger(__name__) @@ -738,7 +749,13 @@ def gen_dataset_workunits( ) if custom_properties: - dataset_properties.customProperties = custom_properties + patch_builder = DatasetPatchBuilder(dataset_urn) + for key, value in custom_properties.items(): + patch_builder.add_custom_property(key, value) + for patch_mcp in patch_builder.build(): + yield MetadataWorkUnit( + id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp=patch_mcp + ) yield MetadataChangeProposalWrapper( entityUrn=dataset_urn, aspect=dataset_properties From 61ce0c4042ec7a07aa967d10c4a5dac075876841 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Tue, 21 Nov 2023 17:48:55 -0500 Subject: [PATCH 02/33] remove unnecessary imports --- .../src/datahub/ingestion/source/redshift/redshift.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index ed2e1f43e99ad..af1d0d4da9800 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -103,15 +103,6 @@ from datahub.utilities.mapping import Constants from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.registries.domain_registry import DomainRegistry - - -import logging -from typing import Union - -from datahub.configuration.kafka import KafkaProducerConnectionConfig -from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig -from datahub.emitter.mce_builder import make_dataset_urn -from datahub.emitter.rest_emitter import DataHubRestEmitter from datahub.specific.dataset import DatasetPatchBuilder From 02ba8d760d13810b14b0caa349e63649b7db89c7 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Wed, 22 Nov 2023 15:55:58 -0500 Subject: [PATCH 03/33] remove unneeded dataset_properties mwu --- .../ingestion/source/redshift/redshift.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index af1d0d4da9800..c425126dabded 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -725,20 +725,6 @@ def gen_dataset_workunits( dataset_urn, table, str(datahub_dataset_name) ) - dataset_properties = DatasetProperties( - name=table.name, - created=TimeStamp(time=int(table.created.timestamp() * 1000)) - if table.created - else None, - lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000)) - if table.last_altered - else TimeStamp(time=int(table.created.timestamp() * 1000)) - if table.created - else None, - description=table.comment, - qualifiedName=str(datahub_dataset_name), - ) - if custom_properties: patch_builder = DatasetPatchBuilder(dataset_urn) for key, value in custom_properties.items(): @@ -748,10 +734,6 @@ def gen_dataset_workunits( id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp=patch_mcp ) - yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=dataset_properties - ).as_workunit() - # TODO: Check if needed # if tags_to_add: # yield gen_tags_aspect_workunit(dataset_urn, tags_to_add) From 4e19c6b3cfeea33a1c995f149b4358104e8e69eb Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Wed, 22 Nov 2023 17:38:16 -0500 Subject: [PATCH 04/33] add unit test --- .../tests/unit/test_redshift_source.py | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 metadata-ingestion/tests/unit/test_redshift_source.py diff --git a/metadata-ingestion/tests/unit/test_redshift_source.py b/metadata-ingestion/tests/unit/test_redshift_source.py new file mode 100644 index 0000000000000..0a6a7725757ec --- /dev/null +++ b/metadata-ingestion/tests/unit/test_redshift_source.py @@ -0,0 +1,35 @@ +from datahub.ingestion.source.redshift.config import RedshiftConfig +from datahub.ingestion.source.redshift.redshift import RedshiftSource +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.redshift.redshift_schema import ( + RedshiftTable, + RedshiftColumn, +) +from datahub.metadata.schema_classes import MetadataChangeProposalClass +from typing import cast + + +def test_gen_dataset_workunits_patch_custom_properties(): + config = RedshiftConfig(host_port="localhost:5439", database="test") + source: RedshiftSource = RedshiftSource(config, ctx=PipelineContext(run_id="test")) + gen = source.gen_dataset_workunits( + table=RedshiftTable( + name="category", + columns=[], + created=None, + comment="", + ), + database="dev", + schema="public", + sub_type="test_sub_type", + custom_properties={"my_key": "my_value"}, + ) + + custom_props_exist = False + for item in gen: + mcp = cast(MetadataChangeProposalClass, item.metadata) + if mcp.aspectName == "datasetProperties": + assert mcp.changeType == "PATCH" + custom_props_exist = True + + assert custom_props_exist From 314fb2d0e011a39c626e2a592f0ec20f585e735e Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Wed, 22 Nov 2023 17:40:57 -0500 Subject: [PATCH 05/33] clean up imports --- .../src/datahub/ingestion/source/redshift/redshift.py | 1 - metadata-ingestion/tests/unit/test_redshift_source.py | 1 - 2 files changed, 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index c425126dabded..b2285ae2965ae 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -105,7 +105,6 @@ from datahub.utilities.registries.domain_registry import DomainRegistry from datahub.specific.dataset import DatasetPatchBuilder - logger: logging.Logger = logging.getLogger(__name__) diff --git a/metadata-ingestion/tests/unit/test_redshift_source.py b/metadata-ingestion/tests/unit/test_redshift_source.py index 0a6a7725757ec..338b66d01262e 100644 --- a/metadata-ingestion/tests/unit/test_redshift_source.py +++ b/metadata-ingestion/tests/unit/test_redshift_source.py @@ -3,7 +3,6 @@ from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.redshift.redshift_schema import ( RedshiftTable, - RedshiftColumn, ) from datahub.metadata.schema_classes import MetadataChangeProposalClass from typing import cast From 5f4ccee783667d2fbd535a886076216c1c0b63c2 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Mon, 27 Nov 2023 10:34:15 -0500 Subject: [PATCH 06/33] WIP: add back some fields, TODO: create set methods for qualified name, created, lastModified --- .../src/datahub/ingestion/source/redshift/redshift.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index b2285ae2965ae..f43f44bfcfcac 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -724,10 +724,13 @@ def gen_dataset_workunits( dataset_urn, table, str(datahub_dataset_name) ) + patch_builder = DatasetPatchBuilder(dataset_urn) + patch_builder.set_display_name(table.name) + patch_builder.set_description(table.comment) + # patch_builder.set_qualified_name(str(datahub_dataset_name)) + if custom_properties: - patch_builder = DatasetPatchBuilder(dataset_urn) - for key, value in custom_properties.items(): - patch_builder.add_custom_property(key, value) + patch_builder.set_custom_properties(custom_properties) for patch_mcp in patch_builder.build(): yield MetadataWorkUnit( id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp=patch_mcp From ec0be9ddef8071337d1bae1465e29df58396cdce Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Thu, 7 Dec 2023 14:18:59 -0500 Subject: [PATCH 07/33] add patch methods --- .../ingestion/source/redshift/redshift.py | 27 +++++++++----- .../src/datahub/specific/dataset.py | 36 +++++++++++++++++++ 2 files changed, 54 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index f43f44bfcfcac..b3c5f761201e8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -727,18 +727,27 @@ def gen_dataset_workunits( patch_builder = DatasetPatchBuilder(dataset_urn) patch_builder.set_display_name(table.name) patch_builder.set_description(table.comment) - # patch_builder.set_qualified_name(str(datahub_dataset_name)) + patch_builder.set_created( + created=TimeStamp(time=int(table.created.timestamp() * 1000)) + if table.created + else None + ) + patch_builder.set_last_modified( + timestamp=TimeStamp(time=int(table.last_altered.timestamp() * 1000)) + if table.last_altered + else TimeStamp(time=int(table.created.timestamp() * 1000)) + if table.created + else None + ) + patch_builder.set_qualified_name(str(datahub_dataset_name)) if custom_properties: - patch_builder.set_custom_properties(custom_properties) - for patch_mcp in patch_builder.build(): - yield MetadataWorkUnit( - id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp=patch_mcp - ) + patch_builder.add_custom_properties(custom_properties) - # TODO: Check if needed - # if tags_to_add: - # yield gen_tags_aspect_workunit(dataset_urn, tags_to_add) + for patch_mcp in patch_builder.build(): + yield MetadataWorkUnit( + id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp=patch_mcp + ) schema_container_key = gen_schema_key( db_name=database, diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index fcfe049fb15cf..5f3d8d3f3e20c 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -18,6 +18,7 @@ UpstreamClass as Upstream, UpstreamLineageClass as UpstreamLineage, ) +from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp from datahub.specific.custom_properties import CustomPropertiesPatchHelper from datahub.specific.ownership import OwnershipPatchHelper from datahub.utilities.urns.tag_urn import TagUrn @@ -213,6 +214,13 @@ def add_custom_property(self, key: str, value: str) -> "DatasetPatchBuilder": self.custom_properties_patch_helper.add_property(key, value) return self + def add_custom_properties( + self, custom_properties: Dict[str, str] + ) -> "DatasetPatchBuilder": + for key, value in custom_properties.items(): + self.custom_properties_patch_helper.add_property(key, value) + return self + def remove_custom_property(self, key: str) -> "DatasetPatchBuilder": self.custom_properties_patch_helper.remove_property(key) return self @@ -226,3 +234,31 @@ def set_display_name(self, display_name: str) -> "DatasetPatchBuilder": value=display_name, ) return self + + def set_qualified_name(self, qualified_name: str) -> "DatasetPatchBuilder": + if qualified_name is not None: + self._add_patch( + DatasetProperties.ASPECT_NAME, + "replace", + path="/qualifiedName", + value=qualified_name, + ) + return self + + def set_created(self, timestamp: TimeStamp) -> "DatasetPatchBuilder": + if timestamp is not None: + self._add_patch( + DatasetProperties.ASPECT_NAME, + "replace", + path="/created", + value=timestamp, + ) + + def set_last_modified(self, timestamp: TimeStamp) -> "DatasetPatchBuilder": + if timestamp is not None: + self._add_patch( + DatasetProperties.ASPECT_NAME, + "replace", + path="/lastModified", + value=timestamp, + ) From c796133d81cf6c5e69e590ea9128ef57abf7f056 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Tue, 12 Dec 2023 12:29:30 -0500 Subject: [PATCH 08/33] tests passing --- .../src/datahub/ingestion/source/redshift/redshift.py | 2 +- metadata-ingestion/src/datahub/specific/dataset.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index b3c5f761201e8..429b674388d73 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -728,7 +728,7 @@ def gen_dataset_workunits( patch_builder.set_display_name(table.name) patch_builder.set_description(table.comment) patch_builder.set_created( - created=TimeStamp(time=int(table.created.timestamp() * 1000)) + timestamp=TimeStamp(time=int(table.created.timestamp() * 1000)) if table.created else None ) diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index 5f3d8d3f3e20c..bfbf46cf1e145 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -249,10 +249,11 @@ def set_created(self, timestamp: TimeStamp) -> "DatasetPatchBuilder": if timestamp is not None: self._add_patch( DatasetProperties.ASPECT_NAME, - "replace", + "add", path="/created", value=timestamp, ) + return self def set_last_modified(self, timestamp: TimeStamp) -> "DatasetPatchBuilder": if timestamp is not None: @@ -262,3 +263,4 @@ def set_last_modified(self, timestamp: TimeStamp) -> "DatasetPatchBuilder": path="/lastModified", value=timestamp, ) + return self From 7074a29bd26e6b2bfb37af0e68b549bfae7370dc Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Tue, 12 Dec 2023 12:30:10 -0500 Subject: [PATCH 09/33] add tests --- smoke-test/tests/patch/common_patch_tests.py | 42 +++-- .../tests/patch/test_dataset_patches.py | 151 +++++++++++++++--- 2 files changed, 164 insertions(+), 29 deletions(-) diff --git a/smoke-test/tests/patch/common_patch_tests.py b/smoke-test/tests/patch/common_patch_tests.py index f1d6abf5da794..adfa56163bc33 100644 --- a/smoke-test/tests/patch/common_patch_tests.py +++ b/smoke-test/tests/patch/common_patch_tests.py @@ -2,17 +2,42 @@ import uuid from typing import Dict, Optional, Type -from datahub.emitter.mce_builder import (make_tag_urn, make_term_urn, - make_user_urn) +from datahub.emitter.mce_builder import make_tag_urn, make_term_urn, make_user_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_patch_builder import MetadataPatchProposal from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig -from datahub.metadata.schema_classes import (AuditStampClass, GlobalTagsClass, - GlossaryTermAssociationClass, - GlossaryTermsClass, OwnerClass, - OwnershipClass, - OwnershipTypeClass, - TagAssociationClass, _Aspect) +from datahub.metadata.schema_classes import ( + AuditStampClass, + DatasetPropertiesClass, + GlobalTagsClass, + GlossaryTermAssociationClass, + GlossaryTermsClass, + OwnerClass, + OwnershipClass, + OwnershipTypeClass, + TagAssociationClass, + _Aspect, +) + + +def get_dataset_property( + graph: DataHubGraph, dataset_urn: str, property_name: str +) -> Optional[Dict[str, str]]: + """ + Generic function to get a specific property of a dataset. + + :param graph: Instance of DataHubGraph. + :param dataset_urn: URN of the dataset. + :param property_name: Name of the property to retrieve. + :return: Property value or None if the property doesn't exist. + """ + dataset_properties = graph.get_aspect( + entity_urn=dataset_urn, + aspect_type=DatasetPropertiesClass, + ) + assert dataset_properties + + return getattr(dataset_properties, property_name, None) def helper_test_entity_terms_patch( @@ -71,7 +96,6 @@ def get_terms(graph, entity_urn): def helper_test_dataset_tags_patch( test_entity_urn: str, patch_builder_class: Type[MetadataPatchProposal] ): - tag_urn = make_tag_urn(tag=f"testTag-{uuid.uuid4()}") tag_association = TagAssociationClass(tag=tag_urn, context="test") diff --git a/smoke-test/tests/patch/test_dataset_patches.py b/smoke-test/tests/patch/test_dataset_patches.py index 6704d19760fb9..c1f55436f9c09 100644 --- a/smoke-test/tests/patch/test_dataset_patches.py +++ b/smoke-test/tests/patch/test_dataset_patches.py @@ -1,29 +1,48 @@ import time +import logging import uuid from typing import Dict, Optional -from datahub.emitter.mce_builder import (make_dataset_urn, make_tag_urn, - make_term_urn, make_user_urn) +from datahub.utilities.time import datetime_to_ts_millis +from datetime import datetime as dt +from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp +from datahub.emitter.mce_builder import ( + make_dataset_urn, + make_tag_urn, + make_term_urn, + make_user_urn, +) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig -from datahub.metadata.schema_classes import (AuditStampClass, - DatasetLineageTypeClass, - DatasetPropertiesClass, - EditableSchemaFieldInfoClass, - EditableSchemaMetadataClass, - GlobalTagsClass, - GlossaryTermAssociationClass, - GlossaryTermsClass, OwnerClass, - OwnershipClass, - OwnershipTypeClass, - TagAssociationClass, - UpstreamClass, - UpstreamLineageClass) +from datahub.metadata.schema_classes import ( + AuditStampClass, + DatasetLineageTypeClass, + DatasetPropertiesClass, + EditableSchemaFieldInfoClass, + EditableSchemaMetadataClass, + GlobalTagsClass, + GlossaryTermAssociationClass, + GlossaryTermsClass, + OwnerClass, + OwnershipClass, + OwnershipTypeClass, + TagAssociationClass, + UpstreamClass, + UpstreamLineageClass, +) from datahub.specific.dataset import DatasetPatchBuilder from tests.patch.common_patch_tests import ( - helper_test_custom_properties_patch, helper_test_dataset_tags_patch, - helper_test_entity_terms_patch, helper_test_ownership_patch) + get_dataset_property, + helper_test_custom_properties_patch, + helper_test_dataset_tags_patch, + helper_test_entity_terms_patch, + helper_test_ownership_patch, +) + + +log = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) # Common Aspect Patch Tests @@ -135,7 +154,6 @@ def get_field_info( def test_field_terms_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn( platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" ) @@ -195,7 +213,6 @@ def test_field_terms_patch(wait_for_healthchecks): def test_field_tags_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn( platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" ) @@ -285,7 +302,6 @@ def get_custom_properties( def test_custom_properties_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn( platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" ) @@ -324,3 +340,98 @@ def test_custom_properties_patch(wait_for_healthchecks): assert ( custom_properties["test_description_property"] == "test_description_value" ) + + +def test_qualified_name_patch(wait_for_healthchecks): + dataset_urn = make_dataset_urn( + platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" + ) + orig_dataset_properties = DatasetPropertiesClass( + name="test_name", qualifiedName="to_be_replaced" + ) + + mcpw = MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=orig_dataset_properties + ) + + with DataHubGraph(DataHubGraphConfig()) as graph: + graph.emit(mcpw) + # assert qualfied name looks as expected + qualified_name = get_dataset_property(graph, dataset_urn, "qualifiedName") + assert qualified_name + assert qualified_name == "to_be_replaced" + + with DataHubGraph(DataHubGraphConfig()) as graph: + for patch_mcp in ( + DatasetPatchBuilder(dataset_urn) + .set_qualified_name("new_qualified_name") + .build() + ): + graph.emit_mcp(patch_mcp) + + get_dataset_property(graph, dataset_urn, "qualifiedName") == "new_qualified_name" + + +def test_created_patch(wait_for_healthchecks): + test_time = datetime_to_ts_millis(dt.now()) + + log.error("hello world above") + dataset_urn = make_dataset_urn( + platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" + ) + orig_dataset_properties = DatasetPropertiesClass( + name="test_name", created=TimeStamp(test_time) + ) + + mcpw = MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=orig_dataset_properties + ) + + with DataHubGraph(DataHubGraphConfig()) as graph: + graph.emit(mcpw) + log.error("hello world above") + test_time = datetime_to_ts_millis(dt.now()) + with DataHubGraph(DataHubGraphConfig()) as graph: + for patch_mcp in ( + DatasetPatchBuilder(dataset_urn).set_created(TimeStamp(test_time)).build() + ): + graph.emit_mcp(patch_mcp) + + assert get_dataset_property(graph, dataset_urn, "created").time == test_time + + +def test_last_modified_patch(wait_for_healthchecks): + test_time = datetime_to_ts_millis(dt.now()) + + dataset_urn = make_dataset_urn( + platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" + ) + orig_dataset_properties = DatasetPropertiesClass( + name="test_name", lastModified=TimeStamp(test_time) + ) + # STOPPED HERE -- also need to fix test_created_patch -- also need to run these to test! + + mcpw = MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=orig_dataset_properties + ) + + with DataHubGraph(DataHubGraphConfig()) as graph: + graph.emit(mcpw) + # assert qualfied name looks as expected + last_modified = get_dataset_property(graph, dataset_urn, "lastModified") + assert last_modified + assert last_modified.time == test_time + + new_test_time = datetime_to_ts_millis(dt.now()) + + with DataHubGraph(DataHubGraphConfig()) as graph: + for patch_mcp in ( + DatasetPatchBuilder(dataset_urn) + .set_last_modified(TimeStamp(new_test_time)) + .build() + ): + graph.emit_mcp(patch_mcp) + + assert ( + get_dataset_property(graph, dataset_urn, "lastModified").time == new_test_time + ) From a8e10fe4a01e9ba6905a8fe15aa527cc52cfc044 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Tue, 12 Dec 2023 21:18:03 -0500 Subject: [PATCH 10/33] fix linting --- .../src/datahub/ingestion/source/redshift/redshift.py | 2 +- metadata-ingestion/src/datahub/specific/dataset.py | 2 +- metadata-ingestion/tests/unit/test_redshift_source.py | 5 +++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 429b674388d73..8385b9bb4d96e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -99,11 +99,11 @@ TimeType, ) from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass +from datahub.specific.dataset import DatasetPatchBuilder from datahub.utilities import memory_footprint from datahub.utilities.mapping import Constants from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.registries.domain_registry import DomainRegistry -from datahub.specific.dataset import DatasetPatchBuilder logger: logging.Logger = logging.getLogger(__name__) diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index bfbf46cf1e145..cef531d1f3114 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -2,6 +2,7 @@ from urllib.parse import quote from datahub.emitter.mcp_patch_builder import MetadataPatchProposal +from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp from datahub.metadata.schema_classes import ( DatasetPropertiesClass as DatasetProperties, EditableDatasetPropertiesClass as EditableDatasetProperties, @@ -18,7 +19,6 @@ UpstreamClass as Upstream, UpstreamLineageClass as UpstreamLineage, ) -from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp from datahub.specific.custom_properties import CustomPropertiesPatchHelper from datahub.specific.ownership import OwnershipPatchHelper from datahub.utilities.urns.tag_urn import TagUrn diff --git a/metadata-ingestion/tests/unit/test_redshift_source.py b/metadata-ingestion/tests/unit/test_redshift_source.py index 338b66d01262e..a8f8092a23304 100644 --- a/metadata-ingestion/tests/unit/test_redshift_source.py +++ b/metadata-ingestion/tests/unit/test_redshift_source.py @@ -1,11 +1,12 @@ +from typing import cast + +from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.redshift.config import RedshiftConfig from datahub.ingestion.source.redshift.redshift import RedshiftSource -from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.redshift.redshift_schema import ( RedshiftTable, ) from datahub.metadata.schema_classes import MetadataChangeProposalClass -from typing import cast def test_gen_dataset_workunits_patch_custom_properties(): From ce3660004ce23dca9afd282804d2ee99cd8966dc Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Tue, 26 Dec 2023 12:54:18 -0500 Subject: [PATCH 11/33] refactor to helper method --- .../datahub/ingestion/api/source_helpers.py | 36 +++++++++++++++++++ .../ingestion/source/redshift/redshift.py | 32 ++++------------- .../src/datahub/specific/dataset.py | 4 +-- .../tests/unit/test_redshift_source.py | 4 +-- 4 files changed, 45 insertions(+), 31 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 66365ef0cdc45..33b538a867fd0 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -19,6 +19,8 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import entity_supports_aspect from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.sql.sql_generic import BaseTable +from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp from datahub.metadata.schema_classes import ( BrowsePathEntryClass, BrowsePathsClass, @@ -32,6 +34,7 @@ TagKeyClass, TimeWindowSizeClass, ) +from datahub.specific.dataset import DatasetPatchBuilder from datahub.telemetry import telemetry from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub.utilities.urns.tag_urn import TagUrn @@ -59,6 +62,39 @@ def auto_workunit( yield item.as_workunit() +def create_dataset_props_patch_builder( + datahub_dataset_name: str, + dataset_urn: str, + table: BaseTable, + custom_properties: Optional[Dict[str, str]] = None, +) -> DatasetPatchBuilder: + """Creates a patch builder with a table's or view's attributes and dataset properties""" + patch_builder = DatasetPatchBuilder(dataset_urn) + if table.name: + patch_builder.set_display_name(table.name) + if table.comment: + patch_builder.set_description(table.comment) + if table.created: + patch_builder.set_created( + timestamp=TimeStamp(time=int(table.created.timestamp() * 1000)) + ) + + if table.last_altered: + patch_builder.set_last_modified( + timestamp=TimeStamp(time=int(table.last_altered.timestamp() * 1000)) + ) + elif table.created: + patch_builder.set_last_modified( + timestamp=TimeStamp(time=int(table.created.timestamp() * 1000)) + ) + patch_builder.set_qualified_name(str(datahub_dataset_name)) + + if custom_properties: + patch_builder.add_custom_properties(custom_properties) + + return patch_builder + + def auto_status_aspect( stream: Iterable[MetadataWorkUnit], ) -> Iterable[MetadataWorkUnit]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index ff3cd01694847..f4d8eacf1dacf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -32,6 +32,7 @@ TestableSource, TestConnectionReport, ) +from datahub.ingestion.api.source_helpers import create_dataset_props_patch_builder from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.common.subtypes import ( DatasetContainerSubTypes, @@ -77,11 +78,8 @@ METADATA_EXTRACTION, PROFILING, ) -from datahub.metadata.com.linkedin.pegasus2avro.common import SubTypes, TimeStamp -from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( - DatasetProperties, - ViewProperties, -) +from datahub.metadata.com.linkedin.pegasus2avro.common import SubTypes +from datahub.metadata.com.linkedin.pegasus2avro.dataset import ViewProperties from datahub.metadata.com.linkedin.pegasus2avro.schema import ( ArrayType, BooleanType, @@ -97,7 +95,6 @@ TimeType, ) from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass -from datahub.specific.dataset import DatasetPatchBuilder from datahub.utilities import memory_footprint from datahub.utilities.mapping import Constants from datahub.utilities.perf_timer import PerfTimer @@ -725,29 +722,12 @@ def gen_dataset_workunits( dataset_urn, table, str(datahub_dataset_name) ) - patch_builder = DatasetPatchBuilder(dataset_urn) - patch_builder.set_display_name(table.name) - patch_builder.set_description(table.comment) - patch_builder.set_created( - timestamp=TimeStamp(time=int(table.created.timestamp() * 1000)) - if table.created - else None - ) - patch_builder.set_last_modified( - timestamp=TimeStamp(time=int(table.last_altered.timestamp() * 1000)) - if table.last_altered - else TimeStamp(time=int(table.created.timestamp() * 1000)) - if table.created - else None + patch_builder = create_dataset_props_patch_builder( + datahub_dataset_name, dataset_urn, table, custom_properties ) - patch_builder.set_qualified_name(str(datahub_dataset_name)) - - if custom_properties: - patch_builder.add_custom_properties(custom_properties) - for patch_mcp in patch_builder.build(): yield MetadataWorkUnit( - id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp=patch_mcp + id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp ) schema_container_key = gen_schema_key( diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index 8d89d92b1ab87..dc1ac4c418b0b 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -344,7 +344,7 @@ def set_qualified_name(self, qualified_name: str) -> "DatasetPatchBuilder": if qualified_name is not None: self._add_patch( DatasetProperties.ASPECT_NAME, - "replace", + "add", path="/qualifiedName", value=qualified_name, ) @@ -364,7 +364,7 @@ def set_last_modified(self, timestamp: TimeStamp) -> "DatasetPatchBuilder": if timestamp is not None: self._add_patch( DatasetProperties.ASPECT_NAME, - "replace", + "add", path="/lastModified", value=timestamp, ) diff --git a/metadata-ingestion/tests/unit/test_redshift_source.py b/metadata-ingestion/tests/unit/test_redshift_source.py index a8f8092a23304..c76b6442ad607 100644 --- a/metadata-ingestion/tests/unit/test_redshift_source.py +++ b/metadata-ingestion/tests/unit/test_redshift_source.py @@ -3,9 +3,7 @@ from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.redshift.config import RedshiftConfig from datahub.ingestion.source.redshift.redshift import RedshiftSource -from datahub.ingestion.source.redshift.redshift_schema import ( - RedshiftTable, -) +from datahub.ingestion.source.redshift.redshift_schema import RedshiftTable from datahub.metadata.schema_classes import MetadataChangeProposalClass From 763789af6b7903e155c3b167bceb8f57123d3fa8 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Thu, 28 Dec 2023 14:07:48 -0500 Subject: [PATCH 12/33] generalize typing to avoid circular dependency --- .../src/datahub/ingestion/api/source_helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 33b538a867fd0..c0877e57966f4 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -2,6 +2,7 @@ from datetime import datetime, timezone from typing import ( TYPE_CHECKING, + Any, Callable, Dict, Iterable, @@ -19,7 +20,6 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import entity_supports_aspect from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.sql.sql_generic import BaseTable from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp from datahub.metadata.schema_classes import ( BrowsePathEntryClass, @@ -65,7 +65,7 @@ def auto_workunit( def create_dataset_props_patch_builder( datahub_dataset_name: str, dataset_urn: str, - table: BaseTable, + table: Any, # TODO: refactor to avoid circular import while type-hinting with BaseTable custom_properties: Optional[Dict[str, str]] = None, ) -> DatasetPatchBuilder: """Creates a patch builder with a table's or view's attributes and dataset properties""" From 595499799f50e10655a58276c5692dab80c0848f Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Thu, 28 Dec 2023 14:13:11 -0500 Subject: [PATCH 13/33] remove unnecessary logs and imports --- .../tests/patch/test_dataset_patches.py | 56 ++++++------------- 1 file changed, 16 insertions(+), 40 deletions(-) diff --git a/smoke-test/tests/patch/test_dataset_patches.py b/smoke-test/tests/patch/test_dataset_patches.py index c1f55436f9c09..006fda97dbf50 100644 --- a/smoke-test/tests/patch/test_dataset_patches.py +++ b/smoke-test/tests/patch/test_dataset_patches.py @@ -1,48 +1,26 @@ -import time -import logging import uuid +from datetime import datetime as dt from typing import Dict, Optional -from datahub.utilities.time import datetime_to_ts_millis -from datetime import datetime as dt -from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp -from datahub.emitter.mce_builder import ( - make_dataset_urn, - make_tag_urn, - make_term_urn, - make_user_urn, -) +from datahub.emitter.mce_builder import (make_dataset_urn, make_tag_urn, + make_term_urn) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig -from datahub.metadata.schema_classes import ( - AuditStampClass, - DatasetLineageTypeClass, - DatasetPropertiesClass, - EditableSchemaFieldInfoClass, - EditableSchemaMetadataClass, - GlobalTagsClass, - GlossaryTermAssociationClass, - GlossaryTermsClass, - OwnerClass, - OwnershipClass, - OwnershipTypeClass, - TagAssociationClass, - UpstreamClass, - UpstreamLineageClass, -) +from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp +from datahub.metadata.schema_classes import (DatasetLineageTypeClass, + DatasetPropertiesClass, + EditableSchemaFieldInfoClass, + EditableSchemaMetadataClass, + GlossaryTermAssociationClass, + TagAssociationClass, + UpstreamClass, + UpstreamLineageClass) from datahub.specific.dataset import DatasetPatchBuilder - +from datahub.utilities.time import datetime_to_ts_millis from tests.patch.common_patch_tests import ( - get_dataset_property, - helper_test_custom_properties_patch, - helper_test_dataset_tags_patch, - helper_test_entity_terms_patch, - helper_test_ownership_patch, -) - - -log = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO) + get_dataset_property, helper_test_custom_properties_patch, + helper_test_dataset_tags_patch, helper_test_entity_terms_patch, + helper_test_ownership_patch) # Common Aspect Patch Tests @@ -375,7 +353,6 @@ def test_qualified_name_patch(wait_for_healthchecks): def test_created_patch(wait_for_healthchecks): test_time = datetime_to_ts_millis(dt.now()) - log.error("hello world above") dataset_urn = make_dataset_urn( platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" ) @@ -389,7 +366,6 @@ def test_created_patch(wait_for_healthchecks): with DataHubGraph(DataHubGraphConfig()) as graph: graph.emit(mcpw) - log.error("hello world above") test_time = datetime_to_ts_millis(dt.now()) with DataHubGraph(DataHubGraphConfig()) as graph: for patch_mcp in ( From 27e6bc098e3f2d379f8ea5bc206652f83dcabecd Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Thu, 25 Jan 2024 11:05:16 -0500 Subject: [PATCH 14/33] make it configurable; use DatasetPropertiesClass; fix type hinting --- .../datahub/ingestion/api/source_helpers.py | 15 +++-- .../ingestion/source/redshift/config.py | 5 ++ .../ingestion/source/redshift/redshift.py | 57 +++++++++++++++---- 3 files changed, 61 insertions(+), 16 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 092801c3e939b..2bdf2cca81a07 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -20,6 +20,9 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import entity_supports_aspect from datahub.ingestion.api.workunit import MetadataWorkUnit + +if TYPE_CHECKING: + from datahub.ingestion.source.sql.sql_generic import BaseTable from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp from datahub.metadata.schema_classes import ( BrowsePathEntryClass, @@ -27,6 +30,7 @@ BrowsePathsV2Class, ChangeTypeClass, ContainerClass, + DatasetPropertiesClass, DatasetUsageStatisticsClass, MetadataChangeEventClass, MetadataChangeProposalClass, @@ -62,10 +66,9 @@ def auto_workunit( def create_dataset_props_patch_builder( - datahub_dataset_name: str, dataset_urn: str, - table: Any, # TODO: refactor to avoid circular import while type-hinting with BaseTable - custom_properties: Optional[Dict[str, str]] = None, + properties: DatasetPropertiesClass, + table: BaseTable, ) -> DatasetPatchBuilder: """Creates a patch builder with a table's or view's attributes and dataset properties""" patch_builder = DatasetPatchBuilder(dataset_urn) @@ -86,10 +89,10 @@ def create_dataset_props_patch_builder( patch_builder.set_last_modified( timestamp=TimeStamp(time=int(table.created.timestamp() * 1000)) ) - patch_builder.set_qualified_name(str(datahub_dataset_name)) + patch_builder.set_qualified_name(str(properties.qualifiedName)) - if custom_properties: - patch_builder.add_custom_properties(custom_properties) + if properties.customProperties: + patch_builder.add_custom_properties(properties.customProperties) return patch_builder diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py index 540adbf4bfd15..23f0199e825ae 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py @@ -139,6 +139,11 @@ class RedshiftConfig( description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run. This config works with rest-sink only.", ) + patch_custom_properties: bool = Field( + default=False, + description="Whether to patch custom properties on existing datasets rather than replace.", + ) + @root_validator(pre=True) def check_email_is_set_on_usage(cls, values): if values.get("include_usage_statistics"): diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 0733b3f7f6814..dc18fb0d94a0b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -78,8 +78,11 @@ METADATA_EXTRACTION, PROFILING, ) -from datahub.metadata.com.linkedin.pegasus2avro.common import SubTypes -from datahub.metadata.com.linkedin.pegasus2avro.dataset import ViewProperties +from datahub.metadata.com.linkedin.pegasus2avro.common import SubTypes, TimeStamp +from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( + DatasetProperties, + ViewProperties, +) from datahub.metadata.com.linkedin.pegasus2avro.schema import ( ArrayType, BooleanType, @@ -94,7 +97,11 @@ StringType, TimeType, ) -from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass +from datahub.metadata.schema_classes import ( + DatasetPropertiesClass, + GlobalTagsClass, + TagAssociationClass, +) from datahub.utilities import memory_footprint from datahub.utilities.mapping import Constants from datahub.utilities.perf_timer import PerfTimer @@ -724,15 +731,45 @@ def gen_dataset_workunits( yield from self.gen_schema_metadata( dataset_urn, table, str(datahub_dataset_name) ) - - patch_builder = create_dataset_props_patch_builder( - datahub_dataset_name, dataset_urn, table, custom_properties - ) - for patch_mcp in patch_builder.build(): - yield MetadataWorkUnit( - id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp + if self.config.patch_custom_properties: + patch_builder = create_dataset_props_patch_builder( + dataset_urn, + DatasetPropertiesClass( + customProperties=custom_properties, + qualifiedName=datahub_dataset_name, + ), + table, + ) + for patch_mcp in patch_builder.build(): + yield MetadataWorkUnit( + id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp + ) + else: + dataset_properties = DatasetProperties( + name=table.name, + created=TimeStamp(time=int(table.created.timestamp() * 1000)) + if table.created + else None, + lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000)) + if table.last_altered + else TimeStamp(time=int(table.created.timestamp() * 1000)) + if table.created + else None, + description=table.comment, + qualifiedName=str(datahub_dataset_name), ) + if custom_properties: + dataset_properties.customProperties = custom_properties + + yield MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=dataset_properties + ).as_workunit() + + # TODO: Check if needed + # if tags_to_add: + # yield gen_tags_aspect_workunit(dataset_urn, tags_to_add) + schema_container_key = gen_schema_key( db_name=database, schema=schema, From a619ea6459d280136838f899e2dfb1f4aa574b6a Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Thu, 25 Jan 2024 11:06:34 -0500 Subject: [PATCH 15/33] remove type Any import --- metadata-ingestion/src/datahub/ingestion/api/source_helpers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 2bdf2cca81a07..a3adc61903b87 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -2,7 +2,6 @@ from datetime import datetime, timezone from typing import ( TYPE_CHECKING, - Any, Callable, Dict, Iterable, From 52e13e375f382fcdaf84f7ee0b216a7671f23fb6 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Thu, 25 Jan 2024 11:11:26 -0500 Subject: [PATCH 16/33] use assert isinstance instead of type casting --- metadata-ingestion/tests/unit/test_redshift_source.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/tests/unit/test_redshift_source.py b/metadata-ingestion/tests/unit/test_redshift_source.py index c76b6442ad607..a8ddd0066bcae 100644 --- a/metadata-ingestion/tests/unit/test_redshift_source.py +++ b/metadata-ingestion/tests/unit/test_redshift_source.py @@ -25,7 +25,8 @@ def test_gen_dataset_workunits_patch_custom_properties(): custom_props_exist = False for item in gen: - mcp = cast(MetadataChangeProposalClass, item.metadata) + assert isinstance(item.metadata, MetadataChangeProposalClass) + mcp = item.metadata if mcp.aspectName == "datasetProperties": assert mcp.changeType == "PATCH" custom_props_exist = True From 01a53b4d7b14a6b3069634c0edaf04ad0985b93e Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Thu, 25 Jan 2024 16:25:06 -0500 Subject: [PATCH 17/33] test refactor --- .../tests/patch/test_dataset_patches.py | 172 +++++++----------- 1 file changed, 69 insertions(+), 103 deletions(-) diff --git a/smoke-test/tests/patch/test_dataset_patches.py b/smoke-test/tests/patch/test_dataset_patches.py index 006fda97dbf50..db5f4d2f53618 100644 --- a/smoke-test/tests/patch/test_dataset_patches.py +++ b/smoke-test/tests/patch/test_dataset_patches.py @@ -2,64 +2,69 @@ from datetime import datetime as dt from typing import Dict, Optional -from datahub.emitter.mce_builder import (make_dataset_urn, make_tag_urn, - make_term_urn) +from datahub.emitter.mce_builder import make_dataset_urn, make_tag_urn, make_term_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp -from datahub.metadata.schema_classes import (DatasetLineageTypeClass, - DatasetPropertiesClass, - EditableSchemaFieldInfoClass, - EditableSchemaMetadataClass, - GlossaryTermAssociationClass, - TagAssociationClass, - UpstreamClass, - UpstreamLineageClass) +from datahub.metadata.schema_classes import ( + DatasetLineageTypeClass, + DatasetPropertiesClass, + EditableSchemaFieldInfoClass, + EditableSchemaMetadataClass, + GlossaryTermAssociationClass, + TagAssociationClass, + UpstreamClass, + UpstreamLineageClass, +) from datahub.specific.dataset import DatasetPatchBuilder from datahub.utilities.time import datetime_to_ts_millis from tests.patch.common_patch_tests import ( - get_dataset_property, helper_test_custom_properties_patch, - helper_test_dataset_tags_patch, helper_test_entity_terms_patch, - helper_test_ownership_patch) + get_dataset_property, + helper_test_custom_properties_patch, + helper_test_dataset_tags_patch, + helper_test_entity_terms_patch, + helper_test_ownership_patch, +) + + +def make_dataset_urn_helper(suffix=""): + return make_dataset_urn( + platform="hive", name=f"SampleHiveDataset{suffix}{uuid.uuid4()}", env="PROD" + ) + + +def apply_patch_helper(graph: DataHubGraph, dataset_urn: str, patch_actions): + for patch_mcp in patch_actions: + graph.emit_mcp(patch_mcp) + pass # Common Aspect Patch Tests # Ownership def test_dataset_ownership_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn( - platform="hive", name=f"SampleHiveDataset{uuid.uuid4()}", env="PROD" - ) + dataset_urn = make_dataset_urn_helper() + helper_test_ownership_patch(dataset_urn, DatasetPatchBuilder) # Tags def test_dataset_tags_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn( - platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" - ) + dataset_urn = make_dataset_urn_helper("-") helper_test_dataset_tags_patch(dataset_urn, DatasetPatchBuilder) # Terms def test_dataset_terms_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn( - platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" - ) + dataset_urn = make_dataset_urn_helper("-") helper_test_entity_terms_patch(dataset_urn, DatasetPatchBuilder) def test_dataset_upstream_lineage_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn( - platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" - ) + dataset_urn = make_dataset_urn_helper("-") - other_dataset_urn = make_dataset_urn( - platform="hive", name=f"SampleHiveDataset2-{uuid.uuid4()}", env="PROD" - ) + other_dataset_urn = make_dataset_urn_helper("2-") - patch_dataset_urn = make_dataset_urn( - platform="hive", name=f"SampleHiveDataset3-{uuid.uuid4()}", env="PROD" - ) + patch_dataset_urn = make_dataset_urn_helper("3-") upstream_lineage = UpstreamLineageClass( upstreams=[ @@ -80,13 +85,13 @@ def test_dataset_upstream_lineage_patch(wait_for_healthchecks): ) assert upstream_lineage_read.upstreams[0].dataset == other_dataset_urn - for patch_mcp in ( + apply_patch_helper( + graph, + dataset_urn, DatasetPatchBuilder(dataset_urn) .add_upstream_lineage(upstream_lineage_to_add) - .build() - ): - graph.emit_mcp(patch_mcp) - pass + .build(), + ) upstream_lineage_read = graph.get_aspect_v2( entity_urn=dataset_urn, @@ -132,9 +137,7 @@ def get_field_info( def test_field_terms_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn( - platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" - ) + dataset_urn = make_dataset_urn_helper("-") field_path = "foo.bar" @@ -191,9 +194,7 @@ def test_field_terms_patch(wait_for_healthchecks): def test_field_tags_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn( - platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" - ) + dataset_urn = make_dataset_urn_helper("-") field_path = "foo.bar" @@ -280,9 +281,7 @@ def get_custom_properties( def test_custom_properties_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn( - platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" - ) + dataset_urn = make_dataset_urn_helper("-") orig_dataset_properties = DatasetPropertiesClass( name="test_name", description="test_description" ) @@ -321,9 +320,7 @@ def test_custom_properties_patch(wait_for_healthchecks): def test_qualified_name_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn( - platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" - ) + dataset_urn = make_dataset_urn_helper("-") orig_dataset_properties = DatasetPropertiesClass( name="test_name", qualifiedName="to_be_replaced" ) @@ -350,64 +347,33 @@ def test_qualified_name_patch(wait_for_healthchecks): get_dataset_property(graph, dataset_urn, "qualifiedName") == "new_qualified_name" -def test_created_patch(wait_for_healthchecks): - test_time = datetime_to_ts_millis(dt.now()) - - dataset_urn = make_dataset_urn( - platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" - ) - orig_dataset_properties = DatasetPropertiesClass( - name="test_name", created=TimeStamp(test_time) - ) - - mcpw = MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=orig_dataset_properties - ) - - with DataHubGraph(DataHubGraphConfig()) as graph: - graph.emit(mcpw) - test_time = datetime_to_ts_millis(dt.now()) - with DataHubGraph(DataHubGraphConfig()) as graph: - for patch_mcp in ( - DatasetPatchBuilder(dataset_urn).set_created(TimeStamp(test_time)).build() - ): - graph.emit_mcp(patch_mcp) - - assert get_dataset_property(graph, dataset_urn, "created").time == test_time +def test_timestamp_patch_types(wait_for_healthchecks): + for patch_type in ["created", "lastModified"]: + test_time = datetime_to_ts_millis(dt.now()) + dataset_urn = make_dataset_urn_helper("-") + orig_dataset_properties = DatasetPropertiesClass( + name="test_name", **{patch_type: TimeStamp(test_time)} + ) -def test_last_modified_patch(wait_for_healthchecks): - test_time = datetime_to_ts_millis(dt.now()) - - dataset_urn = make_dataset_urn( - platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" - ) - orig_dataset_properties = DatasetPropertiesClass( - name="test_name", lastModified=TimeStamp(test_time) - ) - # STOPPED HERE -- also need to fix test_created_patch -- also need to run these to test! - - mcpw = MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=orig_dataset_properties - ) + mcpw = MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=orig_dataset_properties + ) - with DataHubGraph(DataHubGraphConfig()) as graph: - graph.emit(mcpw) - # assert qualfied name looks as expected - last_modified = get_dataset_property(graph, dataset_urn, "lastModified") - assert last_modified - assert last_modified.time == test_time + with DataHubGraph(DataHubGraphConfig()) as graph: + graph.emit(mcpw) + dataset_property = get_dataset_property(graph, dataset_urn, patch_type) + assert dataset_property + assert dataset_property.time == test_time - new_test_time = datetime_to_ts_millis(dt.now()) + new_test_time = datetime_to_ts_millis(dt.now()) - with DataHubGraph(DataHubGraphConfig()) as graph: - for patch_mcp in ( - DatasetPatchBuilder(dataset_urn) - .set_last_modified(TimeStamp(new_test_time)) - .build() - ): - graph.emit_mcp(patch_mcp) + with DataHubGraph(DataHubGraphConfig()) as graph: + patch_builder = DatasetPatchBuilder(dataset_urn) + patch_method = getattr(patch_builder, f"set_{patch_type}") + for patch_mcp in patch_method(TimeStamp(new_test_time)).build(): + graph.emit_mcp(patch_mcp) - assert ( - get_dataset_property(graph, dataset_urn, "lastModified").time == new_test_time - ) + assert ( + get_dataset_property(graph, dataset_urn, patch_type).time == new_test_time + ) From 8a0492c47f825e22f8f07438c702a72bf0c66f72 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Thu, 25 Jan 2024 17:02:48 -0500 Subject: [PATCH 18/33] improve smoketest code --- .../tests/patch/test_dataset_patches.py | 62 ++++++++++--------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/smoke-test/tests/patch/test_dataset_patches.py b/smoke-test/tests/patch/test_dataset_patches.py index db5f4d2f53618..613114d60344b 100644 --- a/smoke-test/tests/patch/test_dataset_patches.py +++ b/smoke-test/tests/patch/test_dataset_patches.py @@ -33,10 +33,19 @@ def make_dataset_urn_helper(suffix=""): ) -def apply_patch_helper(graph: DataHubGraph, dataset_urn: str, patch_actions): - for patch_mcp in patch_actions: - graph.emit_mcp(patch_mcp) - pass +def create_dataset_properties_helper(name: str, patch_type: str, patch_type_value: str): + if patch_type == "created" or patch_type == "lastModified": + return DatasetPropertiesClass( + name=name, **{patch_type: TimeStamp(datetime_to_ts_millis(dt.now()))} + ) + else: + return DatasetPropertiesClass(name=name, **{patch_type: patch_type_value}) + + +def setup(urn_suffix: str, property_details: Dict[str, str]): + dataset_urn = make_dataset_urn_helper(urn_suffix) + orig_dataset_properties = create_dataset_properties_helper(**property_details) + return (dataset_urn, orig_dataset_properties) # Common Aspect Patch Tests @@ -61,9 +70,7 @@ def test_dataset_terms_patch(wait_for_healthchecks): def test_dataset_upstream_lineage_patch(wait_for_healthchecks): dataset_urn = make_dataset_urn_helper("-") - other_dataset_urn = make_dataset_urn_helper("2-") - patch_dataset_urn = make_dataset_urn_helper("3-") upstream_lineage = UpstreamLineageClass( @@ -85,13 +92,13 @@ def test_dataset_upstream_lineage_patch(wait_for_healthchecks): ) assert upstream_lineage_read.upstreams[0].dataset == other_dataset_urn - apply_patch_helper( - graph, - dataset_urn, + for patch_mcp in ( DatasetPatchBuilder(dataset_urn) .add_upstream_lineage(upstream_lineage_to_add) - .build(), - ) + .build() + ): + graph.emit_mcp(patch_mcp) + pass upstream_lineage_read = graph.get_aspect_v2( entity_urn=dataset_urn, @@ -281,10 +288,10 @@ def get_custom_properties( def test_custom_properties_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn_helper("-") - orig_dataset_properties = DatasetPropertiesClass( - name="test_name", description="test_description" + dataset_urn, orig_dataset_properties = setup( + "-", {"name": "test_name", "description": "test_description"} ) + helper_test_custom_properties_patch( test_entity_urn=dataset_urn, patch_builder_class=DatasetPatchBuilder, @@ -320,9 +327,8 @@ def test_custom_properties_patch(wait_for_healthchecks): def test_qualified_name_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn_helper("-") - orig_dataset_properties = DatasetPropertiesClass( - name="test_name", qualifiedName="to_be_replaced" + dataset_urn, orig_dataset_properties = setup( + "-", {"name": "test_name", "qualifiedName": "to_be_replaced"} ) mcpw = MetadataChangeProposalWrapper( @@ -336,7 +342,6 @@ def test_qualified_name_patch(wait_for_healthchecks): assert qualified_name assert qualified_name == "to_be_replaced" - with DataHubGraph(DataHubGraphConfig()) as graph: for patch_mcp in ( DatasetPatchBuilder(dataset_urn) .set_qualified_name("new_qualified_name") @@ -344,16 +349,17 @@ def test_qualified_name_patch(wait_for_healthchecks): ): graph.emit_mcp(patch_mcp) - get_dataset_property(graph, dataset_urn, "qualifiedName") == "new_qualified_name" + assert ( + get_dataset_property(graph, dataset_urn, "qualifiedName") + == "new_qualified_name" + ) def test_timestamp_patch_types(wait_for_healthchecks): for patch_type in ["created", "lastModified"]: test_time = datetime_to_ts_millis(dt.now()) - - dataset_urn = make_dataset_urn_helper("-") - orig_dataset_properties = DatasetPropertiesClass( - name="test_name", **{patch_type: TimeStamp(test_time)} + dataset_urn, orig_dataset_properties = setup( + "-", {"name": "test_name", patch_type: test_time} ) mcpw = MetadataChangeProposalWrapper( @@ -366,14 +372,14 @@ def test_timestamp_patch_types(wait_for_healthchecks): assert dataset_property assert dataset_property.time == test_time - new_test_time = datetime_to_ts_millis(dt.now()) + new_test_time = datetime_to_ts_millis(dt.now()) - with DataHubGraph(DataHubGraphConfig()) as graph: patch_builder = DatasetPatchBuilder(dataset_urn) patch_method = getattr(patch_builder, f"set_{patch_type}") for patch_mcp in patch_method(TimeStamp(new_test_time)).build(): graph.emit_mcp(patch_mcp) - assert ( - get_dataset_property(graph, dataset_urn, patch_type).time == new_test_time - ) + assert ( + get_dataset_property(graph, dataset_urn, patch_type).time + == new_test_time + ) From 9c4b05c4c8ab7fddfd9ed39aaeb8444418914de7 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Thu, 25 Jan 2024 17:07:17 -0500 Subject: [PATCH 19/33] put BaseTable in quotes --- metadata-ingestion/src/datahub/ingestion/api/source_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index a3adc61903b87..90c89f79dda2d 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -67,7 +67,7 @@ def auto_workunit( def create_dataset_props_patch_builder( dataset_urn: str, properties: DatasetPropertiesClass, - table: BaseTable, + table: "BaseTable", ) -> DatasetPatchBuilder: """Creates a patch builder with a table's or view's attributes and dataset properties""" patch_builder = DatasetPatchBuilder(dataset_urn) From f29651671187dc294ffd5987dbd163afbb7ab387 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Mon, 29 Jan 2024 10:39:39 -0500 Subject: [PATCH 20/33] fix linting --- metadata-ingestion/src/datahub/specific/dataset.py | 1 - metadata-ingestion/tests/unit/test_redshift_source.py | 2 -- 2 files changed, 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index cdbe4753264dc..6bd79ed222741 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -342,7 +342,6 @@ def set_display_name(self, display_name: str) -> "DatasetPatchBuilder": ) return self - def set_qualified_name(self, qualified_name: str) -> "DatasetPatchBuilder": if qualified_name is not None: self._add_patch( diff --git a/metadata-ingestion/tests/unit/test_redshift_source.py b/metadata-ingestion/tests/unit/test_redshift_source.py index a8ddd0066bcae..2fc290005187c 100644 --- a/metadata-ingestion/tests/unit/test_redshift_source.py +++ b/metadata-ingestion/tests/unit/test_redshift_source.py @@ -1,5 +1,3 @@ -from typing import cast - from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.redshift.config import RedshiftConfig from datahub.ingestion.source.redshift.redshift import RedshiftSource From 246f6d3f7c6048234f3887080e2c04c34ca9adb3 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Mon, 29 Jan 2024 13:16:11 -0500 Subject: [PATCH 21/33] add missing return statement --- metadata-ingestion/src/datahub/specific/dataset.py | 1 + 1 file changed, 1 insertion(+) diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index 6bd79ed222741..b5a668f93695d 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -370,6 +370,7 @@ def set_last_modified(self, timestamp: TimeStamp) -> "DatasetPatchBuilder": path="/lastModified", value=timestamp, ) + return self def set_structured_property( self, property_name: str, value: Union[str, float, List[Union[str, float]]] From a43cbefd73635a73defae5b9171c83d2d3898bd8 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Mon, 5 Feb 2024 11:06:26 -0500 Subject: [PATCH 22/33] update unit tests for patch flag --- .../tests/unit/test_redshift_source.py | 40 +++++++++++++++++-- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/tests/unit/test_redshift_source.py b/metadata-ingestion/tests/unit/test_redshift_source.py index 2fc290005187c..819aaabe2a044 100644 --- a/metadata-ingestion/tests/unit/test_redshift_source.py +++ b/metadata-ingestion/tests/unit/test_redshift_source.py @@ -1,12 +1,15 @@ +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp import MetadataChangeProposalClass from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.redshift.config import RedshiftConfig from datahub.ingestion.source.redshift.redshift import RedshiftSource from datahub.ingestion.source.redshift.redshift_schema import RedshiftTable -from datahub.metadata.schema_classes import MetadataChangeProposalClass -def test_gen_dataset_workunits_patch_custom_properties(): - config = RedshiftConfig(host_port="localhost:5439", database="test") +def test_gen_dataset_workunits_patch_custom_properties_patch(): + config = RedshiftConfig( + host_port="localhost:5439", database="test", patch_custom_properties=True + ) source: RedshiftSource = RedshiftSource(config, ctx=PipelineContext(run_id="test")) gen = source.gen_dataset_workunits( table=RedshiftTable( @@ -23,10 +26,39 @@ def test_gen_dataset_workunits_patch_custom_properties(): custom_props_exist = False for item in gen: - assert isinstance(item.metadata, MetadataChangeProposalClass) mcp = item.metadata if mcp.aspectName == "datasetProperties": + assert isinstance(item.metadata, MetadataChangeProposalClass) assert mcp.changeType == "PATCH" custom_props_exist = True + else: + assert isinstance(item.metadata, MetadataChangeProposalWrapper) + + assert custom_props_exist + + +def test_gen_dataset_workunits_patch_custom_properties_upsert(): + config = RedshiftConfig(host_port="localhost:5439", database="test") + source: RedshiftSource = RedshiftSource(config, ctx=PipelineContext(run_id="test")) + gen = source.gen_dataset_workunits( + table=RedshiftTable( + name="category", + columns=[], + created=None, + comment="", + ), + database="dev", + schema="public", + sub_type="test_sub_type", + custom_properties={"my_key": "my_value"}, + ) + + custom_props_exist = False + for item in gen: + assert isinstance(item.metadata, MetadataChangeProposalWrapper) + mcp = item.metadata + if mcp.aspectName == "datasetProperties": + assert mcp.changeType == "UPSERT" + custom_props_exist = True assert custom_props_exist From 59016d8caba829317cb29cb0fb9fdc4b88811962 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Mon, 5 Feb 2024 14:24:12 -0500 Subject: [PATCH 23/33] add pytest setup fixture --- .../tests/unit/test_redshift_source.py | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/metadata-ingestion/tests/unit/test_redshift_source.py b/metadata-ingestion/tests/unit/test_redshift_source.py index 819aaabe2a044..510c223b49a70 100644 --- a/metadata-ingestion/tests/unit/test_redshift_source.py +++ b/metadata-ingestion/tests/unit/test_redshift_source.py @@ -1,3 +1,4 @@ +import pytest from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp import MetadataChangeProposalClass from datahub.ingestion.api.common import PipelineContext @@ -5,10 +6,17 @@ from datahub.ingestion.source.redshift.redshift import RedshiftSource from datahub.ingestion.source.redshift.redshift_schema import RedshiftTable +from typing import Iterable +from datahub.ingestion.api.workunit import MetadataWorkUnit -def test_gen_dataset_workunits_patch_custom_properties_patch(): + +@pytest.fixture +def redshift_source_setup(request) -> Iterable[MetadataWorkUnit]: + custom_props_flag = request.param config = RedshiftConfig( - host_port="localhost:5439", database="test", patch_custom_properties=True + host_port="localhost:5439", + database="test", + patch_custom_properties=custom_props_flag, ) source: RedshiftSource = RedshiftSource(config, ctx=PipelineContext(run_id="test")) gen = source.gen_dataset_workunits( @@ -23,7 +31,12 @@ def test_gen_dataset_workunits_patch_custom_properties_patch(): sub_type="test_sub_type", custom_properties={"my_key": "my_value"}, ) + return gen + +@pytest.mark.parametrize("redshift_source_setup", [True], indirect=True) +def test_gen_dataset_workunits_patch_custom_properties_patch(redshift_source_setup): + gen = redshift_source_setup custom_props_exist = False for item in gen: mcp = item.metadata @@ -37,22 +50,9 @@ def test_gen_dataset_workunits_patch_custom_properties_patch(): assert custom_props_exist -def test_gen_dataset_workunits_patch_custom_properties_upsert(): - config = RedshiftConfig(host_port="localhost:5439", database="test") - source: RedshiftSource = RedshiftSource(config, ctx=PipelineContext(run_id="test")) - gen = source.gen_dataset_workunits( - table=RedshiftTable( - name="category", - columns=[], - created=None, - comment="", - ), - database="dev", - schema="public", - sub_type="test_sub_type", - custom_properties={"my_key": "my_value"}, - ) - +@pytest.mark.parametrize("redshift_source_setup", [False], indirect=True) +def test_gen_dataset_workunits_patch_custom_properties_upsert(redshift_source_setup): + gen = redshift_source_setup custom_props_exist = False for item in gen: assert isinstance(item.metadata, MetadataChangeProposalWrapper) From 4c2a9c0889a5bba4b96f42f9c07767ac4d0b2587 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Tue, 27 Feb 2024 18:07:55 +0530 Subject: [PATCH 24/33] fix lint source --- .../tests/unit/test_redshift_source.py | 15 +++++++++------ smoke-test/tests/patch/test_dataset_patches.py | 1 + 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/tests/unit/test_redshift_source.py b/metadata-ingestion/tests/unit/test_redshift_source.py index 510c223b49a70..aafd079cf26cd 100644 --- a/metadata-ingestion/tests/unit/test_redshift_source.py +++ b/metadata-ingestion/tests/unit/test_redshift_source.py @@ -1,17 +1,20 @@ +from typing import Iterable + import pytest -from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.emitter.mcp import MetadataChangeProposalClass + +from datahub.emitter.mcp import ( + MetadataChangeProposalClass, + MetadataChangeProposalWrapper, +) from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.redshift.config import RedshiftConfig from datahub.ingestion.source.redshift.redshift import RedshiftSource from datahub.ingestion.source.redshift.redshift_schema import RedshiftTable -from typing import Iterable -from datahub.ingestion.api.workunit import MetadataWorkUnit - @pytest.fixture -def redshift_source_setup(request) -> Iterable[MetadataWorkUnit]: +def redshift_source_setup(request: pytest.FixtureRequest) -> Iterable[MetadataWorkUnit]: custom_props_flag = request.param config = RedshiftConfig( host_port="localhost:5439", diff --git a/smoke-test/tests/patch/test_dataset_patches.py b/smoke-test/tests/patch/test_dataset_patches.py index c502a6bc0d61e..9feef1ea3114c 100644 --- a/smoke-test/tests/patch/test_dataset_patches.py +++ b/smoke-test/tests/patch/test_dataset_patches.py @@ -18,6 +18,7 @@ ) from datahub.specific.dataset import DatasetPatchBuilder from datahub.utilities.time import datetime_to_ts_millis + from tests.patch.common_patch_tests import ( get_dataset_property, helper_test_custom_properties_patch, From e2728e351f5acb81b91ff6e4a2ade1c926bf457c Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Tue, 27 Feb 2024 18:14:54 +0530 Subject: [PATCH 25/33] some lint fix --- smoke-test/tests/patch/test_dataset_patches.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/smoke-test/tests/patch/test_dataset_patches.py b/smoke-test/tests/patch/test_dataset_patches.py index 9feef1ea3114c..0807ba9de5f79 100644 --- a/smoke-test/tests/patch/test_dataset_patches.py +++ b/smoke-test/tests/patch/test_dataset_patches.py @@ -385,7 +385,6 @@ def test_timestamp_patch_types(wait_for_healthchecks): for patch_mcp in patch_method(TimeStamp(new_test_time)).build(): graph.emit_mcp(patch_mcp) - assert ( - get_dataset_property(graph, dataset_urn, patch_type).time - == new_test_time - ) + dataset_property = get_dataset_property(graph, dataset_urn, patch_type) + assert dataset_property + assert dataset_property.time == new_test_time From efaa28c822e531bb119fc8903d438809b7f3fb27 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Wed, 28 Feb 2024 01:10:59 -0500 Subject: [PATCH 26/33] reverted smoke tests --- smoke-test/tests/patch/common_patch_tests.py | 21 --- .../tests/patch/test_dataset_patches.py | 126 +++++------------- 2 files changed, 31 insertions(+), 116 deletions(-) diff --git a/smoke-test/tests/patch/common_patch_tests.py b/smoke-test/tests/patch/common_patch_tests.py index bd05e3e594f9f..9530edb760c13 100644 --- a/smoke-test/tests/patch/common_patch_tests.py +++ b/smoke-test/tests/patch/common_patch_tests.py @@ -8,7 +8,6 @@ from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig from datahub.metadata.schema_classes import ( AuditStampClass, - DatasetPropertiesClass, GlobalTagsClass, GlossaryTermAssociationClass, GlossaryTermsClass, @@ -20,26 +19,6 @@ ) -def get_dataset_property( - graph: DataHubGraph, dataset_urn: str, property_name: str -) -> Optional[Dict[str, str]]: - """ - Generic function to get a specific property of a dataset. - - :param graph: Instance of DataHubGraph. - :param dataset_urn: URN of the dataset. - :param property_name: Name of the property to retrieve. - :return: Property value or None if the property doesn't exist. - """ - dataset_properties = graph.get_aspect( - entity_urn=dataset_urn, - aspect_type=DatasetPropertiesClass, - ) - assert dataset_properties - - return getattr(dataset_properties, property_name, None) - - def helper_test_entity_terms_patch( test_entity_urn: str, patch_builder_class: Type[MetadataPatchProposal], diff --git a/smoke-test/tests/patch/test_dataset_patches.py b/smoke-test/tests/patch/test_dataset_patches.py index 0807ba9de5f79..ec6b4a91fa6be 100644 --- a/smoke-test/tests/patch/test_dataset_patches.py +++ b/smoke-test/tests/patch/test_dataset_patches.py @@ -1,11 +1,9 @@ import uuid -from datetime import datetime as dt from typing import Dict, Optional from datahub.emitter.mce_builder import make_dataset_urn, make_tag_urn, make_term_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig -from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp from datahub.metadata.schema_classes import ( DatasetLineageTypeClass, DatasetPropertiesClass, @@ -17,10 +15,8 @@ UpstreamLineageClass, ) from datahub.specific.dataset import DatasetPatchBuilder -from datahub.utilities.time import datetime_to_ts_millis from tests.patch.common_patch_tests import ( - get_dataset_property, helper_test_custom_properties_patch, helper_test_dataset_tags_patch, helper_test_entity_terms_patch, @@ -28,51 +24,43 @@ ) -def make_dataset_urn_helper(suffix=""): - return make_dataset_urn( - platform="hive", name=f"SampleHiveDataset{suffix}{uuid.uuid4()}", env="PROD" - ) - - -def create_dataset_properties_helper(name: str, patch_type: str, patch_type_value: str): - if patch_type == "created" or patch_type == "lastModified": - return DatasetPropertiesClass( - name=name, **{patch_type: TimeStamp(datetime_to_ts_millis(dt.now()))} - ) - else: - return DatasetPropertiesClass(name=name, **{patch_type: patch_type_value}) - - -def setup(urn_suffix: str, property_details: Dict[str, str]): - dataset_urn = make_dataset_urn_helper(urn_suffix) - orig_dataset_properties = create_dataset_properties_helper(**property_details) - return (dataset_urn, orig_dataset_properties) - - # Common Aspect Patch Tests # Ownership def test_dataset_ownership_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn_helper() - + dataset_urn = make_dataset_urn( + platform="hive", name=f"SampleHiveDataset{uuid.uuid4()}", env="PROD" + ) helper_test_ownership_patch(dataset_urn, DatasetPatchBuilder) # Tags def test_dataset_tags_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn_helper("-") + dataset_urn = make_dataset_urn( + platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" + ) helper_test_dataset_tags_patch(dataset_urn, DatasetPatchBuilder) # Terms def test_dataset_terms_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn_helper("-") + dataset_urn = make_dataset_urn( + platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" + ) helper_test_entity_terms_patch(dataset_urn, DatasetPatchBuilder) def test_dataset_upstream_lineage_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn_helper("-") - other_dataset_urn = make_dataset_urn_helper("2-") - patch_dataset_urn = make_dataset_urn_helper("3-") + dataset_urn = make_dataset_urn( + platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" + ) + + other_dataset_urn = make_dataset_urn( + platform="hive", name=f"SampleHiveDataset2-{uuid.uuid4()}", env="PROD" + ) + + patch_dataset_urn = make_dataset_urn( + platform="hive", name=f"SampleHiveDataset3-{uuid.uuid4()}", env="PROD" + ) upstream_lineage = UpstreamLineageClass( upstreams=[ @@ -145,7 +133,9 @@ def get_field_info( def test_field_terms_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn_helper("-") + dataset_urn = make_dataset_urn( + platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" + ) field_path = "foo.bar" @@ -204,7 +194,9 @@ def test_field_terms_patch(wait_for_healthchecks): def test_field_tags_patch(wait_for_healthchecks): - dataset_urn = make_dataset_urn_helper("-") + dataset_urn = make_dataset_urn( + platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" + ) field_path = "foo.bar" @@ -294,10 +286,12 @@ def get_custom_properties( def test_custom_properties_patch(wait_for_healthchecks): - dataset_urn, orig_dataset_properties = setup( - "-", {"name": "test_name", "description": "test_description"} + dataset_urn = make_dataset_urn( + platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" + ) + orig_dataset_properties = DatasetPropertiesClass( + name="test_name", description="test_description" ) - helper_test_custom_properties_patch( test_entity_urn=dataset_urn, patch_builder_class=DatasetPatchBuilder, @@ -330,61 +324,3 @@ def test_custom_properties_patch(wait_for_healthchecks): assert ( custom_properties["test_description_property"] == "test_description_value" ) - - -def test_qualified_name_patch(wait_for_healthchecks): - dataset_urn, orig_dataset_properties = setup( - "-", {"name": "test_name", "qualifiedName": "to_be_replaced"} - ) - - mcpw = MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=orig_dataset_properties - ) - - with DataHubGraph(DataHubGraphConfig()) as graph: - graph.emit(mcpw) - # assert qualfied name looks as expected - qualified_name = get_dataset_property(graph, dataset_urn, "qualifiedName") - assert qualified_name - assert qualified_name == "to_be_replaced" - - for patch_mcp in ( - DatasetPatchBuilder(dataset_urn) - .set_qualified_name("new_qualified_name") - .build() - ): - graph.emit_mcp(patch_mcp) - - assert ( - get_dataset_property(graph, dataset_urn, "qualifiedName") - == "new_qualified_name" - ) - - -def test_timestamp_patch_types(wait_for_healthchecks): - for patch_type in ["created", "lastModified"]: - test_time = datetime_to_ts_millis(dt.now()) - dataset_urn, orig_dataset_properties = setup( - "-", {"name": "test_name", patch_type: test_time} - ) - - mcpw = MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=orig_dataset_properties - ) - - with DataHubGraph(DataHubGraphConfig()) as graph: - graph.emit(mcpw) - dataset_property = get_dataset_property(graph, dataset_urn, patch_type) - assert dataset_property - assert dataset_property.time == test_time - - new_test_time = datetime_to_ts_millis(dt.now()) - - patch_builder = DatasetPatchBuilder(dataset_urn) - patch_method = getattr(patch_builder, f"set_{patch_type}") - for patch_mcp in patch_method(TimeStamp(new_test_time)).build(): - graph.emit_mcp(patch_mcp) - - dataset_property = get_dataset_property(graph, dataset_urn, patch_type) - assert dataset_property - assert dataset_property.time == new_test_time From 95ee34ef604806b8c248db09df330f29a8efb573 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Sun, 3 Mar 2024 19:26:29 -0500 Subject: [PATCH 27/33] genericize dataset props patch builder --- .../datahub/ingestion/api/source_helpers.py | 36 +++------ .../ingestion/source/redshift/redshift.py | 74 +++++++++---------- .../src/datahub/specific/dataset.py | 24 +++--- 3 files changed, 60 insertions(+), 74 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 90c89f79dda2d..811fab01c8e69 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -66,32 +66,16 @@ def auto_workunit( def create_dataset_props_patch_builder( dataset_urn: str, - properties: DatasetPropertiesClass, - table: "BaseTable", + dataset_properties: DatasetPropertiesClass, ) -> DatasetPatchBuilder: """Creates a patch builder with a table's or view's attributes and dataset properties""" patch_builder = DatasetPatchBuilder(dataset_urn) - if table.name: - patch_builder.set_display_name(table.name) - if table.comment: - patch_builder.set_description(table.comment) - if table.created: - patch_builder.set_created( - timestamp=TimeStamp(time=int(table.created.timestamp() * 1000)) - ) - - if table.last_altered: - patch_builder.set_last_modified( - timestamp=TimeStamp(time=int(table.last_altered.timestamp() * 1000)) - ) - elif table.created: - patch_builder.set_last_modified( - timestamp=TimeStamp(time=int(table.created.timestamp() * 1000)) - ) - patch_builder.set_qualified_name(str(properties.qualifiedName)) - - if properties.customProperties: - patch_builder.add_custom_properties(properties.customProperties) + patch_builder.set_display_name(dataset_properties.name) + patch_builder.set_description(dataset_properties.description) + patch_builder.set_created(dataset_properties.created) + patch_builder.set_last_modified(dataset_properties.lastModified) + patch_builder.set_qualified_name(dataset_properties.qualifiedName) + patch_builder.add_custom_properties(dataset_properties.customProperties) return patch_builder @@ -409,9 +393,9 @@ def auto_empty_dataset_usage_statistics( userCounts=[], fieldCounts=[], ), - changeType=ChangeTypeClass.CREATE - if all_buckets - else ChangeTypeClass.UPSERT, + changeType=( + ChangeTypeClass.CREATE if all_buckets else ChangeTypeClass.UPSERT + ), ).as_workunit() diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 9bb362b587f9a..8a8132d946690 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -292,13 +292,13 @@ def test_connection(config_dict: dict) -> TestConnectionReport: test_report.capability_report = {} try: RedshiftDataDictionary.get_schemas(connection, database=config.database) - test_report.capability_report[ - SourceCapability.SCHEMA_METADATA - ] = CapabilityReport(capable=True) + test_report.capability_report[SourceCapability.SCHEMA_METADATA] = ( + CapabilityReport(capable=True) + ) except Exception as e: - test_report.capability_report[ - SourceCapability.SCHEMA_METADATA - ] = CapabilityReport(capable=False, failure_reason=str(e)) + test_report.capability_report[SourceCapability.SCHEMA_METADATA] = ( + CapabilityReport(capable=False, failure_reason=str(e)) + ) except Exception as e: test_report.basic_connectivity = CapabilityReport( @@ -758,34 +758,32 @@ def gen_dataset_workunits( yield from self.gen_schema_metadata( dataset_urn, table, str(datahub_dataset_name) ) - if self.config.patch_custom_properties: - patch_builder = create_dataset_props_patch_builder( - dataset_urn, - DatasetPropertiesClass( - customProperties=custom_properties, - qualifiedName=datahub_dataset_name, - ), - table, - ) - for patch_mcp in patch_builder.build(): - yield MetadataWorkUnit( - id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp - ) - else: - dataset_properties = DatasetProperties( - name=table.name, - created=TimeStamp(time=int(table.created.timestamp() * 1000)) + + dataset_properties = DatasetProperties( + name=table.name, + created=( + TimeStamp(time=int(table.created.timestamp() * 1000)) if table.created - else None, - lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000)) + else None + ), + lastModified=( + TimeStamp(time=int(table.last_altered.timestamp() * 1000)) if table.last_altered - else TimeStamp(time=int(table.created.timestamp() * 1000)) - if table.created - else None, - description=table.comment, - qualifiedName=str(datahub_dataset_name), - ) - + else ( + TimeStamp(time=int(table.created.timestamp() * 1000)) + if table.created + else None + ) + ), + description=table.comment, + qualifiedName=str(datahub_dataset_name), + customProperties=custom_properties, + ) + if self.config.patch_custom_properties: + yield from create_dataset_props_patch_builder( + dataset_urn, dataset_properties + ).build() + else: if custom_properties: dataset_properties.customProperties = custom_properties @@ -891,9 +889,9 @@ def cache_tables_and_views(self, connection, database): def get_all_tables( self, ) -> Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]]: - all_tables: Dict[ - str, Dict[str, List[Union[RedshiftView, RedshiftTable]]] - ] = defaultdict(dict) + all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]] = ( + defaultdict(dict) + ) for db in set().union(self.db_tables, self.db_views): tables = self.db_tables.get(db, {}) views = self.db_views.get(db, {}) @@ -911,9 +909,9 @@ def extract_usage( all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]], ) -> Iterable[MetadataWorkUnit]: with PerfTimer() as timer: - redundant_usage_run_skip_handler: Optional[ - RedundantUsageRunSkipHandler - ] = None + redundant_usage_run_skip_handler: Optional[RedundantUsageRunSkipHandler] = ( + None + ) if self.config.enable_stateful_usage_ingestion: redundant_usage_run_skip_handler = RedundantUsageRunSkipHandler( source=self, diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index 0deb1dbfa0c4b..fcaefacd207ee 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -261,14 +261,17 @@ def for_field( def set_description( self, description: str, editable: bool = False ) -> "DatasetPatchBuilder": - self._add_patch( - DatasetProperties.ASPECT_NAME - if not editable - else EditableDatasetProperties.ASPECT_NAME, - "add", - path="/description", - value=description, - ) + if description is not None: + self._add_patch( + ( + DatasetProperties.ASPECT_NAME + if not editable + else EditableDatasetProperties.ASPECT_NAME + ), + "add", + path="/description", + value=description, + ) return self def set_custom_properties( @@ -289,8 +292,9 @@ def add_custom_property(self, key: str, value: str) -> "DatasetPatchBuilder": def add_custom_properties( self, custom_properties: Dict[str, str] ) -> "DatasetPatchBuilder": - for key, value in custom_properties.items(): - self.custom_properties_patch_helper.add_property(key, value) + if custom_properties is not None: + for key, value in custom_properties.items(): + self.custom_properties_patch_helper.add_property(key, value) return self def remove_custom_property(self, key: str) -> "DatasetPatchBuilder": From 8a55ea7abfa47649b99004970e8ca1c96446999c Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Sun, 3 Mar 2024 20:11:52 -0500 Subject: [PATCH 28/33] address linting --- .../datahub/ingestion/api/source_helpers.py | 4 ---- .../ingestion/source/redshift/redshift.py | 15 +++++++------- .../src/datahub/specific/dataset.py | 20 +++++++++++++------ 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 811fab01c8e69..fd197fa0c750d 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -19,10 +19,6 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import entity_supports_aspect from datahub.ingestion.api.workunit import MetadataWorkUnit - -if TYPE_CHECKING: - from datahub.ingestion.source.sql.sql_generic import BaseTable -from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp from datahub.metadata.schema_classes import ( BrowsePathEntryClass, BrowsePathsClass, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 8a8132d946690..1b3da6595cb56 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -99,11 +99,7 @@ StringType, TimeType, ) -from datahub.metadata.schema_classes import ( - DatasetPropertiesClass, - GlobalTagsClass, - TagAssociationClass, -) +from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass from datahub.utilities import memory_footprint from datahub.utilities.dedup_list import deduplicate_list from datahub.utilities.mapping import Constants @@ -779,10 +775,15 @@ def gen_dataset_workunits( qualifiedName=str(datahub_dataset_name), customProperties=custom_properties, ) + logger.error(f"Dataset properties: {dataset_properties}") if self.config.patch_custom_properties: - yield from create_dataset_props_patch_builder( + patch_builder = create_dataset_props_patch_builder( dataset_urn, dataset_properties - ).build() + ) + for patch_mcp in patch_builder.build(): + yield MetadataWorkUnit( + id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp + ) else: if custom_properties: dataset_properties.customProperties = custom_properties diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index fcaefacd207ee..05731c4c72267 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -259,7 +259,7 @@ def for_field( ) def set_description( - self, description: str, editable: bool = False + self, description: Optional[str] = None, editable: bool = False ) -> "DatasetPatchBuilder": if description is not None: self._add_patch( @@ -290,7 +290,7 @@ def add_custom_property(self, key: str, value: str) -> "DatasetPatchBuilder": return self def add_custom_properties( - self, custom_properties: Dict[str, str] + self, custom_properties: Optional[Dict[str, str]] = None ) -> "DatasetPatchBuilder": if custom_properties is not None: for key, value in custom_properties.items(): @@ -301,7 +301,9 @@ def remove_custom_property(self, key: str) -> "DatasetPatchBuilder": self.custom_properties_patch_helper.remove_property(key) return self - def set_display_name(self, display_name: str) -> "DatasetPatchBuilder": + def set_display_name( + self, display_name: Optional[str] = None + ) -> "DatasetPatchBuilder": if display_name is not None: self._add_patch( DatasetProperties.ASPECT_NAME, @@ -311,7 +313,9 @@ def set_display_name(self, display_name: str) -> "DatasetPatchBuilder": ) return self - def set_qualified_name(self, qualified_name: str) -> "DatasetPatchBuilder": + def set_qualified_name( + self, qualified_name: Optional[str] = None + ) -> "DatasetPatchBuilder": if qualified_name is not None: self._add_patch( DatasetProperties.ASPECT_NAME, @@ -321,7 +325,9 @@ def set_qualified_name(self, qualified_name: str) -> "DatasetPatchBuilder": ) return self - def set_created(self, timestamp: TimeStamp) -> "DatasetPatchBuilder": + def set_created( + self, timestamp: Optional[TimeStamp] = None + ) -> "DatasetPatchBuilder": if timestamp is not None: self._add_patch( DatasetProperties.ASPECT_NAME, @@ -331,7 +337,9 @@ def set_created(self, timestamp: TimeStamp) -> "DatasetPatchBuilder": ) return self - def set_last_modified(self, timestamp: TimeStamp) -> "DatasetPatchBuilder": + def set_last_modified( + self, timestamp: Optional[TimeStamp] = None + ) -> "DatasetPatchBuilder": if timestamp is not None: self._add_patch( DatasetProperties.ASPECT_NAME, From 738dd9afcc40a0481d0404be7e5f05808672caff Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Sun, 3 Mar 2024 20:14:12 -0500 Subject: [PATCH 29/33] remove log statement --- .../src/datahub/ingestion/source/redshift/redshift.py | 1 - 1 file changed, 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 1b3da6595cb56..1b34c6277a1a7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -775,7 +775,6 @@ def gen_dataset_workunits( qualifiedName=str(datahub_dataset_name), customProperties=custom_properties, ) - logger.error(f"Dataset properties: {dataset_properties}") if self.config.patch_custom_properties: patch_builder = create_dataset_props_patch_builder( dataset_urn, dataset_properties From e942648eed8a712eaf4553403cf49e4433e100e9 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Mon, 4 Mar 2024 08:15:12 -0500 Subject: [PATCH 30/33] black formatting --- .../ingestion/source/redshift/redshift.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 1b34c6277a1a7..16d631e600ce0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -288,13 +288,13 @@ def test_connection(config_dict: dict) -> TestConnectionReport: test_report.capability_report = {} try: RedshiftDataDictionary.get_schemas(connection, database=config.database) - test_report.capability_report[SourceCapability.SCHEMA_METADATA] = ( - CapabilityReport(capable=True) - ) + test_report.capability_report[ + SourceCapability.SCHEMA_METADATA + ] = CapabilityReport(capable=True) except Exception as e: - test_report.capability_report[SourceCapability.SCHEMA_METADATA] = ( - CapabilityReport(capable=False, failure_reason=str(e)) - ) + test_report.capability_report[ + SourceCapability.SCHEMA_METADATA + ] = CapabilityReport(capable=False, failure_reason=str(e)) except Exception as e: test_report.basic_connectivity = CapabilityReport( @@ -889,9 +889,9 @@ def cache_tables_and_views(self, connection, database): def get_all_tables( self, ) -> Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]]: - all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]] = ( - defaultdict(dict) - ) + all_tables: Dict[ + str, Dict[str, List[Union[RedshiftView, RedshiftTable]]] + ] = defaultdict(dict) for db in set().union(self.db_tables, self.db_views): tables = self.db_tables.get(db, {}) views = self.db_views.get(db, {}) @@ -909,9 +909,9 @@ def extract_usage( all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]], ) -> Iterable[MetadataWorkUnit]: with PerfTimer() as timer: - redundant_usage_run_skip_handler: Optional[RedundantUsageRunSkipHandler] = ( - None - ) + redundant_usage_run_skip_handler: Optional[ + RedundantUsageRunSkipHandler + ] = None if self.config.enable_stateful_usage_ingestion: redundant_usage_run_skip_handler = RedundantUsageRunSkipHandler( source=self, From 6a65d2ab540a8931e6f44ed7227182eae030f96f Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Tue, 5 Mar 2024 16:57:24 +0530 Subject: [PATCH 31/33] enable patch by default --- .../src/datahub/ingestion/source/redshift/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py index c0e8d04f5d3d2..0f5305c609e45 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py @@ -143,7 +143,7 @@ class RedshiftConfig( ) patch_custom_properties: bool = Field( - default=False, + default=True, description="Whether to patch custom properties on existing datasets rather than replace.", ) From 9723412f5031caac235e2f438af546e7d72d186a Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Thu, 7 Mar 2024 09:28:14 -0500 Subject: [PATCH 32/33] address PR comments --- .../ingestion/source/redshift/redshift.py | 3 +-- .../tests/unit/test_redshift_source.py | 16 +++++----------- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 16d631e600ce0..7a64a95013465 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -784,8 +784,7 @@ def gen_dataset_workunits( id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp ) else: - if custom_properties: - dataset_properties.customProperties = custom_properties + dataset_properties.customProperties = custom_properties yield MetadataChangeProposalWrapper( entityUrn=dataset_urn, aspect=dataset_properties diff --git a/metadata-ingestion/tests/unit/test_redshift_source.py b/metadata-ingestion/tests/unit/test_redshift_source.py index aafd079cf26cd..d1da509995058 100644 --- a/metadata-ingestion/tests/unit/test_redshift_source.py +++ b/metadata-ingestion/tests/unit/test_redshift_source.py @@ -1,7 +1,5 @@ from typing import Iterable -import pytest - from datahub.emitter.mcp import ( MetadataChangeProposalClass, MetadataChangeProposalWrapper, @@ -13,9 +11,7 @@ from datahub.ingestion.source.redshift.redshift_schema import RedshiftTable -@pytest.fixture -def redshift_source_setup(request: pytest.FixtureRequest) -> Iterable[MetadataWorkUnit]: - custom_props_flag = request.param +def redshift_source_setup(custom_props_flag: bool) -> Iterable[MetadataWorkUnit]: config = RedshiftConfig( host_port="localhost:5439", database="test", @@ -37,9 +33,8 @@ def redshift_source_setup(request: pytest.FixtureRequest) -> Iterable[MetadataWo return gen -@pytest.mark.parametrize("redshift_source_setup", [True], indirect=True) -def test_gen_dataset_workunits_patch_custom_properties_patch(redshift_source_setup): - gen = redshift_source_setup +def test_gen_dataset_workunits_patch_custom_properties_patch(): + gen = redshift_source_setup(True) custom_props_exist = False for item in gen: mcp = item.metadata @@ -53,9 +48,8 @@ def test_gen_dataset_workunits_patch_custom_properties_patch(redshift_source_set assert custom_props_exist -@pytest.mark.parametrize("redshift_source_setup", [False], indirect=True) -def test_gen_dataset_workunits_patch_custom_properties_upsert(redshift_source_setup): - gen = redshift_source_setup +def test_gen_dataset_workunits_patch_custom_properties_upsert(): + gen = redshift_source_setup(False) custom_props_exist = False for item in gen: assert isinstance(item.metadata, MetadataChangeProposalWrapper) From 89cf4c6f022cf87d0f83a01e8c0617dd57e10f3f Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Thu, 7 Mar 2024 11:14:14 -0500 Subject: [PATCH 33/33] address linter --- .../src/datahub/ingestion/source/redshift/redshift.py | 2 -- metadata-ingestion/tests/unit/test_redshift_source.py | 6 ++++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 7a64a95013465..46dbfc2a219de 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -784,8 +784,6 @@ def gen_dataset_workunits( id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp ) else: - dataset_properties.customProperties = custom_properties - yield MetadataChangeProposalWrapper( entityUrn=dataset_urn, aspect=dataset_properties ).as_workunit() diff --git a/metadata-ingestion/tests/unit/test_redshift_source.py b/metadata-ingestion/tests/unit/test_redshift_source.py index d1da509995058..8198caf50df7f 100644 --- a/metadata-ingestion/tests/unit/test_redshift_source.py +++ b/metadata-ingestion/tests/unit/test_redshift_source.py @@ -9,6 +9,7 @@ from datahub.ingestion.source.redshift.config import RedshiftConfig from datahub.ingestion.source.redshift.redshift import RedshiftSource from datahub.ingestion.source.redshift.redshift_schema import RedshiftTable +from datahub.metadata.schema_classes import MetadataChangeEventClass def redshift_source_setup(custom_props_flag: bool) -> Iterable[MetadataWorkUnit]: @@ -38,12 +39,13 @@ def test_gen_dataset_workunits_patch_custom_properties_patch(): custom_props_exist = False for item in gen: mcp = item.metadata + assert not isinstance(mcp, MetadataChangeEventClass) if mcp.aspectName == "datasetProperties": - assert isinstance(item.metadata, MetadataChangeProposalClass) + assert isinstance(mcp, MetadataChangeProposalClass) assert mcp.changeType == "PATCH" custom_props_exist = True else: - assert isinstance(item.metadata, MetadataChangeProposalWrapper) + assert isinstance(mcp, MetadataChangeProposalWrapper) assert custom_props_exist