diff --git a/datahub-frontend/conf/logback.xml b/datahub-frontend/conf/logback.xml
index 78da231b4a71c5..de37c56cba38a7 100644
--- a/datahub-frontend/conf/logback.xml
+++ b/datahub-frontend/conf/logback.xml
@@ -61,7 +61,7 @@
-
+
diff --git a/datahub-web-react/src/app/entity/shared/tabs/Properties/__tests__/useStructuredProperties.test.ts b/datahub-web-react/src/app/entity/shared/tabs/Properties/__tests__/useStructuredProperties.test.ts
new file mode 100644
index 00000000000000..ff7c6e51a04a00
--- /dev/null
+++ b/datahub-web-react/src/app/entity/shared/tabs/Properties/__tests__/useStructuredProperties.test.ts
@@ -0,0 +1,87 @@
+import { identifyAndAddParentRows } from '../useStructuredProperties';
+
+describe('identifyAndAddParentRows', () => {
+ it('should not return parent rows when there are none', () => {
+ const propertyRows = [
+ { displayName: 'test1', qualifiedName: 'test1' },
+ { displayName: 'test2', qualifiedName: 'test2' },
+ ];
+ expect(identifyAndAddParentRows(propertyRows)).toMatchObject([]);
+ });
+
+ it('should not return parent rows when another row starts with the same letters but is a different token', () => {
+ const propertyRows = [
+ { displayName: 'test1', qualifiedName: 'testing.one' },
+ { displayName: 'test2', qualifiedName: 'testingAgain.two' },
+ ];
+ expect(identifyAndAddParentRows(propertyRows)).toMatchObject([]);
+ });
+
+ it('should return parent rows properly', () => {
+ const propertyRows = [
+ { displayName: 'test1', qualifiedName: 'testing.one' },
+ { displayName: 'test2', qualifiedName: 'testing.two' },
+ { displayName: 'test3', qualifiedName: 'testing.three' },
+ ];
+ expect(identifyAndAddParentRows(propertyRows)).toMatchObject([
+ { displayName: 'testing', qualifiedName: 'testing', childrenCount: 3 },
+ ]);
+ });
+
+ it('should return parent rows properly with multiple layers of nesting', () => {
+ const propertyRows = [
+ { displayName: 'test1', qualifiedName: 'testing.one.two.a.1' },
+ { displayName: 'test1', qualifiedName: 'testing.one.two.a.2' },
+ { displayName: 'test1', qualifiedName: 'testing.one.two.b' },
+ { displayName: 'test1', qualifiedName: 'testing.one.three' },
+ { displayName: 'test2', qualifiedName: 'testing.two.c.d' },
+ { displayName: 'test3', qualifiedName: 'testing.three' },
+ { displayName: 'test3', qualifiedName: 'testParent' },
+ ];
+ expect(identifyAndAddParentRows(propertyRows)).toMatchObject([
+ { displayName: 'testing', qualifiedName: 'testing', isParentRow: true, childrenCount: 6 },
+ { displayName: 'testing.one', qualifiedName: 'testing.one', isParentRow: true, childrenCount: 4 },
+ { displayName: 'testing.one.two', qualifiedName: 'testing.one.two', isParentRow: true, childrenCount: 3 },
+ {
+ displayName: 'testing.one.two.a',
+ qualifiedName: 'testing.one.two.a',
+ isParentRow: true,
+ childrenCount: 2,
+ },
+ ]);
+ });
+
+ it('should return parent rows properly with multiple layers of nesting regardless of order', () => {
+ const propertyRows = [
+ { displayName: 'test1', qualifiedName: 'testing.one.two.a.1' },
+ { displayName: 'test3', qualifiedName: 'testParent' },
+ { displayName: 'test1', qualifiedName: 'testing.one.three' },
+ { displayName: 'test2', qualifiedName: 'testing.two.c.d' },
+ { displayName: 'test1', qualifiedName: 'testing.one.two.b' },
+ { displayName: 'test3', qualifiedName: 'testing.three' },
+ { displayName: 'test1', qualifiedName: 'testing.one.two.a.2' },
+ ];
+ expect(identifyAndAddParentRows(propertyRows)).toMatchObject([
+ { displayName: 'testing', qualifiedName: 'testing', isParentRow: true, childrenCount: 6 },
+ { displayName: 'testing.one', qualifiedName: 'testing.one', isParentRow: true, childrenCount: 4 },
+ { displayName: 'testing.one.two', qualifiedName: 'testing.one.two', isParentRow: true, childrenCount: 3 },
+ {
+ displayName: 'testing.one.two.a',
+ qualifiedName: 'testing.one.two.a',
+ isParentRow: true,
+ childrenCount: 2,
+ },
+ ]);
+ });
+
+ it('should return parent rows properly with simpler layers of nesting', () => {
+ const propertyRows = [
+ { displayName: 'test2', qualifiedName: 'testing.two.c.d' },
+ { displayName: 'test3', qualifiedName: 'testing.three' },
+ { displayName: 'test3', qualifiedName: 'testParent' },
+ ];
+ expect(identifyAndAddParentRows(propertyRows)).toMatchObject([
+ { displayName: 'testing', qualifiedName: 'testing', isParentRow: true, childrenCount: 2 },
+ ]);
+ });
+});
diff --git a/datahub-web-react/src/app/entity/shared/tabs/Properties/useStructuredProperties.tsx b/datahub-web-react/src/app/entity/shared/tabs/Properties/useStructuredProperties.tsx
index 18ee6bb18da3d3..60d0aac30eb4ce 100644
--- a/datahub-web-react/src/app/entity/shared/tabs/Properties/useStructuredProperties.tsx
+++ b/datahub-web-react/src/app/entity/shared/tabs/Properties/useStructuredProperties.tsx
@@ -122,10 +122,10 @@ export function identifyAndAddParentRows(rows?: Array): Array name.startsWith(token)).length;
+ const currentCount = qualifiedNames.filter((name) => name.startsWith(`${token}.`)).length;
- // If we're at the beginning of the path and there is no nesting, break
- if (index === 0 && currentCount === 1) {
+ // If there's only one child, don't nest it
+ if (currentCount === 1) {
break;
}
diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py
index 31db711592eb14..6334b3abbb8a01 100644
--- a/metadata-ingestion/setup.py
+++ b/metadata-ingestion/setup.py
@@ -76,7 +76,7 @@
# now provide prebuilt wheels for most platforms, including M1 Macs and
# Linux aarch64 (e.g. Docker's linux/arm64). Installing confluent_kafka
# from source remains a pain.
- "confluent_kafka>=1.9.0",
+ "confluent_kafka[schemaregistry]>=1.9.0",
# We currently require both Avro libraries. The codegen uses avro-python3 (above)
# schema parsers at runtime for generating and reading JSON into Python objects.
# At the same time, we use Kafka's AvroSerializer, which internally relies on
diff --git a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py
index fd3fe7ca098ecb..e37281dea86e1f 100644
--- a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py
+++ b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py
@@ -9,27 +9,18 @@
from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp import MetadataChangeProposalWrapper
-from datahub.ingestion.api.global_context import get_graph_context, set_graph_context
-from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
+from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
PropertyValueClass,
StructuredPropertyDefinitionClass,
)
-from datahub.utilities.urns.urn import Urn
+from datahub.metadata.urns import StructuredPropertyUrn, Urn
+from datahub.utilities.urns._urn_base import URN_TYPES
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
-class StructuredPropertiesConfig:
- """Configuration class to hold the graph client"""
-
- @classmethod
- def get_graph_required(cls) -> DataHubGraph:
- """Get the current graph, falling back to default if none set"""
- return get_graph_context() or get_default_graph()
-
-
class AllowedTypes(Enum):
STRING = "string"
RICH_TEXT = "rich_text"
@@ -51,29 +42,28 @@ class AllowedValue(ConfigModel):
description: Optional[str] = None
-VALID_ENTITY_TYPES_PREFIX_STRING = ", ".join(
- [
- f"urn:li:entityType:datahub.{x}"
- for x in ["dataset", "dashboard", "dataFlow", "schemaField"]
- ]
-)
-VALID_ENTITY_TYPES_STRING = f"Valid entity type urns are {VALID_ENTITY_TYPES_PREFIX_STRING}, etc... Ensure that the entity type is valid."
+VALID_ENTITY_TYPE_URNS = [
+ Urn.make_entity_type_urn(entity_type) for entity_type in URN_TYPES.keys()
+]
+_VALID_ENTITY_TYPES_STRING = f"Valid entity type urns are {', '.join(VALID_ENTITY_TYPE_URNS)}, etc... Ensure that the entity type is valid."
+
+
+def _validate_entity_type_urn(v: str) -> str:
+ urn = Urn.make_entity_type_urn(v)
+ if urn not in VALID_ENTITY_TYPE_URNS:
+ raise ValueError(
+ f"Input {v} is not a valid entity type urn. {_VALID_ENTITY_TYPES_STRING}"
+ )
+ v = str(urn)
+ return v
class TypeQualifierAllowedTypes(ConfigModel):
allowed_types: List[str]
- @validator("allowed_types", each_item=True)
- def validate_allowed_types(cls, v):
- if v:
- graph = StructuredPropertiesConfig.get_graph_required()
- validated_urn = Urn.make_entity_type_urn(v)
- if not graph.exists(validated_urn):
- raise ValueError(
- f"Input {v} is not a valid entity type urn. {VALID_ENTITY_TYPES_STRING}"
- )
- v = str(validated_urn)
- return v
+ _check_allowed_types = validator("allowed_types", each_item=True, allow_reuse=True)(
+ _validate_entity_type_urn
+ )
class StructuredProperties(ConfigModel):
@@ -90,22 +80,30 @@ class StructuredProperties(ConfigModel):
type_qualifier: Optional[TypeQualifierAllowedTypes] = None
immutable: Optional[bool] = False
- @validator("entity_types", each_item=True)
- def validate_entity_types(cls, v):
- if v:
- graph = StructuredPropertiesConfig.get_graph_required()
- validated_urn = Urn.make_entity_type_urn(v)
- if not graph.exists(validated_urn):
- raise ValueError(
- f"Input {v} is not a valid entity type urn. {VALID_ENTITY_TYPES_STRING}"
- )
- v = str(validated_urn)
+ _check_entity_types = validator("entity_types", each_item=True, allow_reuse=True)(
+ _validate_entity_type_urn
+ )
+
+ @validator("type")
+ def validate_type(cls, v: str) -> str:
+ # Convert to lowercase if needed
+ if not v.islower():
+ logger.warning(
+ f"Structured property type should be lowercase. Updated to {v.lower()}"
+ )
+ v = v.lower()
+
+ # Check if type is allowed
+ if not AllowedTypes.check_allowed_type(v):
+ raise ValueError(
+ f"Type {v} is not allowed. Allowed types are {AllowedTypes.values()}"
+ )
return v
@property
def fqn(self) -> str:
assert self.urn is not None
- id = Urn.create_from_string(self.urn).get_entity_id()[0]
+ id = StructuredPropertyUrn.from_string(self.urn).id
if self.qualified_name is not None:
# ensure that qualified name and ID match
assert (
@@ -122,101 +120,90 @@ def urn_must_be_present(cls, v, values):
return v
@staticmethod
- def create(file: str, graph: Optional[DataHubGraph] = None) -> None:
- with set_graph_context(graph):
- graph = StructuredPropertiesConfig.get_graph_required()
-
- with open(file) as fp:
- structuredproperties: List[dict] = yaml.safe_load(fp)
- for structuredproperty_raw in structuredproperties:
- structuredproperty = StructuredProperties.parse_obj(
- structuredproperty_raw
- )
-
- if not structuredproperty.type.islower():
- structuredproperty.type = structuredproperty.type.lower()
- logger.warning(
- f"Structured property type should be lowercase. Updated to {structuredproperty.type}"
- )
- if not AllowedTypes.check_allowed_type(structuredproperty.type):
- raise ValueError(
- f"Type {structuredproperty.type} is not allowed. Allowed types are {AllowedTypes.values()}"
- )
- mcp = MetadataChangeProposalWrapper(
- entityUrn=structuredproperty.urn,
- aspect=StructuredPropertyDefinitionClass(
- qualifiedName=structuredproperty.fqn,
- valueType=Urn.make_data_type_urn(structuredproperty.type),
- displayName=structuredproperty.display_name,
- description=structuredproperty.description,
- entityTypes=[
- Urn.make_entity_type_urn(entity_type)
- for entity_type in structuredproperty.entity_types or []
- ],
- cardinality=structuredproperty.cardinality,
- immutable=structuredproperty.immutable,
- allowedValues=(
- [
- PropertyValueClass(
- value=v.value, description=v.description
- )
- for v in structuredproperty.allowed_values
- ]
- if structuredproperty.allowed_values
- else None
- ),
- typeQualifier=(
- {
- "allowedTypes": structuredproperty.type_qualifier.allowed_types
- }
- if structuredproperty.type_qualifier
- else None
- ),
- ),
- )
- graph.emit_mcp(mcp)
-
- logger.info(f"Created structured property {structuredproperty.urn}")
-
- @classmethod
- def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties":
- with set_graph_context(graph):
- structured_property: Optional[
- StructuredPropertyDefinitionClass
- ] = graph.get_aspect(urn, StructuredPropertyDefinitionClass)
- if structured_property is None:
- raise Exception(
- "StructuredPropertyDefinition aspect is None. Unable to create structured property."
- )
- return StructuredProperties(
- urn=urn,
- qualified_name=structured_property.qualifiedName,
- display_name=structured_property.displayName,
- type=structured_property.valueType,
- description=structured_property.description,
- entity_types=structured_property.entityTypes,
- cardinality=structured_property.cardinality,
- allowed_values=(
+ def from_yaml(file: str) -> List["StructuredProperties"]:
+ with open(file) as fp:
+ structuredproperties: List[dict] = yaml.safe_load(fp)
+
+ result: List[StructuredProperties] = []
+ for structuredproperty_raw in structuredproperties:
+ result.append(StructuredProperties.parse_obj(structuredproperty_raw))
+ return result
+
+ def generate_mcps(self) -> List[MetadataChangeProposalWrapper]:
+ mcp = MetadataChangeProposalWrapper(
+ entityUrn=self.urn,
+ aspect=StructuredPropertyDefinitionClass(
+ qualifiedName=self.fqn,
+ valueType=Urn.make_data_type_urn(self.type),
+ displayName=self.display_name,
+ description=self.description,
+ entityTypes=[
+ Urn.make_entity_type_urn(entity_type)
+ for entity_type in self.entity_types or []
+ ],
+ cardinality=self.cardinality,
+ immutable=self.immutable,
+ allowedValues=(
[
- AllowedValue(
- value=av.value,
- description=av.description,
- )
- for av in structured_property.allowedValues or []
+ PropertyValueClass(value=v.value, description=v.description)
+ for v in self.allowed_values
]
- if structured_property.allowedValues is not None
+ if self.allowed_values
else None
),
- type_qualifier=(
- {
- "allowed_types": structured_property.typeQualifier.get(
- "allowedTypes"
- )
- }
- if structured_property.typeQualifier
+ typeQualifier=(
+ {"allowedTypes": self.type_qualifier.allowed_types}
+ if self.type_qualifier
else None
),
+ ),
+ )
+ return [mcp]
+
+ @staticmethod
+ def create(file: str, graph: DataHubGraph) -> None:
+ # TODO: Deprecate this method.
+ structuredproperties = StructuredProperties.from_yaml(file)
+ for structuredproperty in structuredproperties:
+ for mcp in structuredproperty.generate_mcps():
+ graph.emit_mcp(mcp)
+
+ logger.info(f"Created structured property {structuredproperty.urn}")
+
+ @classmethod
+ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties":
+ structured_property: Optional[
+ StructuredPropertyDefinitionClass
+ ] = graph.get_aspect(urn, StructuredPropertyDefinitionClass)
+ if structured_property is None:
+ raise Exception(
+ "StructuredPropertyDefinition aspect is None. Unable to create structured property."
)
+ return StructuredProperties(
+ urn=urn,
+ qualified_name=structured_property.qualifiedName,
+ display_name=structured_property.displayName,
+ type=structured_property.valueType,
+ description=structured_property.description,
+ entity_types=structured_property.entityTypes,
+ cardinality=structured_property.cardinality,
+ allowed_values=(
+ [
+ AllowedValue(
+ value=av.value,
+ description=av.description,
+ )
+ for av in structured_property.allowedValues or []
+ ]
+ if structured_property.allowedValues is not None
+ else None
+ ),
+ type_qualifier=(
+ {"allowed_types": structured_property.typeQualifier.get("allowedTypes")}
+ if structured_property.typeQualifier
+ else None
+ ),
+ )
def to_yaml(
self,
diff --git a/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py b/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py
index 4162d44b9b0ea8..42285cf13a5ddc 100644
--- a/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py
+++ b/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py
@@ -31,7 +31,8 @@ def properties() -> None:
def upsert(file: Path) -> None:
"""Upsert structured properties in DataHub."""
- StructuredProperties.create(str(file))
+ with get_default_graph() as graph:
+ StructuredProperties.create(str(file), graph)
@properties.command(
diff --git a/metadata-ingestion/src/datahub/configuration/git.py b/metadata-ingestion/src/datahub/configuration/git.py
index d237cd9ddd306c..e7e9bfd43adca5 100644
--- a/metadata-ingestion/src/datahub/configuration/git.py
+++ b/metadata-ingestion/src/datahub/configuration/git.py
@@ -24,7 +24,11 @@ class GitReference(ConfigModel):
"main",
description="Branch on which your files live by default. Typically main or master. This can also be a commit hash.",
)
-
+ url_subdir: Optional[str] = Field(
+ default=None,
+ description="Prefix to prepend when generating URLs for files - useful when files are in a subdirectory. "
+ "Only affects URL generation, not git operations.",
+ )
url_template: Optional[str] = Field(
None,
description=f"Template for generating a URL to a file in the repo e.g. '{_GITHUB_URL_TEMPLATE}'. We can infer this for GitHub and GitLab repos, and it is otherwise required."
@@ -68,6 +72,8 @@ def infer_url_template(cls, url_template: Optional[str], values: dict) -> str:
def get_url_for_file_path(self, file_path: str) -> str:
assert self.url_template
+ if self.url_subdir:
+ file_path = f"{self.url_subdir}/{file_path}"
return self.url_template.format(
repo_url=self.repo, branch=self.branch, file_path=file_path
)
diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py
index c80da04e481a9f..c3638635b19aac 100644
--- a/metadata-ingestion/src/datahub/ingestion/api/source.py
+++ b/metadata-ingestion/src/datahub/ingestion/api/source.py
@@ -184,6 +184,7 @@ def infos(self) -> LossyList[StructuredLogEntry]:
@dataclass
class SourceReport(Report):
+ event_not_produced_warn: bool = True
events_produced: int = 0
events_produced_per_sec: int = 0
diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
index 0c86e1cf47203f..7791ea2797be34 100644
--- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
+++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
@@ -150,7 +150,7 @@ def auto_workunit_reporter(report: "SourceReport", stream: Iterable[T]) -> Itera
report.report_workunit(wu)
yield wu
- if report.events_produced == 0:
+ if report.event_not_produced_warn and report.events_produced == 0:
report.warning(
title="No metadata was produced by the source",
message="Please check the source configuration, filters, and permissions.",
diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
index 814f65ecb45cf0..4eecbb4d9d7177 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
@@ -65,18 +65,18 @@ class DataHubGcSourceConfig(ConfigModel):
description="Sleep between truncation monitoring.",
)
- dataprocess_cleanup: Optional[DataProcessCleanupConfig] = Field(
- default=None,
+ dataprocess_cleanup: DataProcessCleanupConfig = Field(
+ default_factory=DataProcessCleanupConfig,
description="Configuration for data process cleanup",
)
- soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanupConfig] = Field(
- default=None,
+ soft_deleted_entities_cleanup: SoftDeletedEntitiesCleanupConfig = Field(
+ default_factory=SoftDeletedEntitiesCleanupConfig,
description="Configuration for soft deleted entities cleanup",
)
- execution_request_cleanup: Optional[DatahubExecutionRequestCleanupConfig] = Field(
- default=None,
+ execution_request_cleanup: DatahubExecutionRequestCleanupConfig = Field(
+ default_factory=DatahubExecutionRequestCleanupConfig,
description="Configuration for execution request cleanup",
)
@@ -108,28 +108,22 @@ def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig):
self.ctx = ctx
self.config = config
self.report = DataHubGcSourceReport()
+ self.report.event_not_produced_warn = False
self.graph = ctx.require_graph("The DataHubGc source")
- self.dataprocess_cleanup: Optional[DataProcessCleanup] = None
- self.soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanup] = None
- self.execution_request_cleanup: Optional[DatahubExecutionRequestCleanup] = None
-
- if self.config.dataprocess_cleanup:
- self.dataprocess_cleanup = DataProcessCleanup(
- ctx, self.config.dataprocess_cleanup, self.report, self.config.dry_run
- )
- if self.config.soft_deleted_entities_cleanup:
- self.soft_deleted_entities_cleanup = SoftDeletedEntitiesCleanup(
- ctx,
- self.config.soft_deleted_entities_cleanup,
- self.report,
- self.config.dry_run,
- )
- if self.config.execution_request_cleanup:
- self.execution_request_cleanup = DatahubExecutionRequestCleanup(
- config=self.config.execution_request_cleanup,
- graph=self.graph,
- report=self.report,
- )
+ self.dataprocess_cleanup = DataProcessCleanup(
+ ctx, self.config.dataprocess_cleanup, self.report, self.config.dry_run
+ )
+ self.soft_deleted_entities_cleanup = SoftDeletedEntitiesCleanup(
+ ctx,
+ self.config.soft_deleted_entities_cleanup,
+ self.report,
+ self.config.dry_run,
+ )
+ self.execution_request_cleanup = DatahubExecutionRequestCleanup(
+ config=self.config.execution_request_cleanup,
+ graph=self.graph,
+ report=self.report,
+ )
@classmethod
def create(cls, config_dict, ctx):
@@ -153,19 +147,19 @@ def get_workunits_internal(
self.truncate_indices()
except Exception as e:
self.report.failure("While trying to truncate indices ", exc=e)
- if self.soft_deleted_entities_cleanup:
+ if self.config.soft_deleted_entities_cleanup.enabled:
try:
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
except Exception as e:
self.report.failure(
"While trying to cleanup soft deleted entities ", exc=e
)
- if self.execution_request_cleanup:
+ if self.config.execution_request_cleanup.enabled:
try:
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
- if self.dataprocess_cleanup:
+ if self.config.dataprocess_cleanup.enabled:
try:
yield from self.dataprocess_cleanup.get_workunits_internal()
except Exception as e:
diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py
index 8aacf13cdb00fb..6d16aaab2d7980 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py
@@ -98,6 +98,9 @@
class DataProcessCleanupConfig(ConfigModel):
+ enabled: bool = Field(
+ default=True, description="Whether to do data process cleanup."
+ )
retention_days: Optional[int] = Field(
10,
description="Number of days to retain metadata in DataHub",
@@ -371,17 +374,26 @@ def get_data_flows(self) -> Iterable[DataFlowEntity]:
previous_scroll_id: Optional[str] = None
while True:
- result = self.ctx.graph.execute_graphql(
- DATAFLOW_QUERY,
- {
- "query": "*",
- "scrollId": scroll_id if scroll_id else None,
- "batchSize": self.config.batch_size,
- },
- )
+ result = None
+ try:
+ result = self.ctx.graph.execute_graphql(
+ DATAFLOW_QUERY,
+ {
+ "query": "*",
+ "scrollId": scroll_id if scroll_id else None,
+ "batchSize": self.config.batch_size,
+ },
+ )
+ except Exception as e:
+ self.report.failure(
+ f"While trying to get dataflows with {scroll_id}", exc=e
+ )
+ break
+
scrollAcrossEntities = result.get("scrollAcrossEntities")
if not scrollAcrossEntities:
raise ValueError("Missing scrollAcrossEntities in response")
+ logger.info(f"Got {scrollAcrossEntities.get('count')} DataFlow entities")
scroll_id = scrollAcrossEntities.get("nextScrollId")
for flow in scrollAcrossEntities.get("searchResults"):
@@ -398,6 +410,8 @@ def get_data_flows(self) -> Iterable[DataFlowEntity]:
previous_scroll_id = scroll_id
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
+ if not self.config.enabled:
+ return []
assert self.ctx.graph
dataFlows: Dict[str, DataFlowEntity] = {}
@@ -411,14 +425,20 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
deleted_jobs: int = 0
while True:
- result = self.ctx.graph.execute_graphql(
- DATAJOB_QUERY,
- {
- "query": "*",
- "scrollId": scroll_id if scroll_id else None,
- "batchSize": self.config.batch_size,
- },
- )
+ try:
+ result = self.ctx.graph.execute_graphql(
+ DATAJOB_QUERY,
+ {
+ "query": "*",
+ "scrollId": scroll_id if scroll_id else None,
+ "batchSize": self.config.batch_size,
+ },
+ )
+ except Exception as e:
+ self.report.failure(
+ f"While trying to get data jobs with {scroll_id}", exc=e
+ )
+ break
scrollAcrossEntities = result.get("scrollAcrossEntities")
if not scrollAcrossEntities:
raise ValueError("Missing scrollAcrossEntities in response")
diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py
index bb4ab753543b7b..93f004ab675edc 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py
@@ -20,6 +20,9 @@
class SoftDeletedEntitiesCleanupConfig(ConfigModel):
+ enabled: bool = Field(
+ default=True, description="Whether to do soft deletion cleanup."
+ )
retention_days: Optional[int] = Field(
10,
description="Number of days to retain metadata in DataHub",
@@ -156,6 +159,8 @@ def delete_soft_deleted_entity(self, urn: str) -> None:
self.delete_entity(urn)
def cleanup_soft_deleted_entities(self) -> None:
+ if not self.config.enabled:
+ return
assert self.ctx.graph
start_time = time.time()
diff --git a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py
index 15ee995b2d5fdc..f71949b9eb27f7 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py
@@ -89,7 +89,16 @@ def __init__(self, schema):
logger.error(f"Invalid JSON schema: {schema_data}. Error: {str(e)}")
avro_schema = {}
- self.schema_name = avro_schema.get("namespace") + "." + avro_schema.get("name")
+ self.schema_name = "null"
+ if avro_schema.get("namespace") and avro_schema.get("name"):
+ self.schema_name = (
+ avro_schema.get("namespace") + "." + avro_schema.get("name")
+ )
+ elif avro_schema.get("namespace"):
+ self.schema_name = avro_schema.get("namespace")
+ elif avro_schema.get("name"):
+ self.schema_name = avro_schema.get("name")
+
self.schema_description = avro_schema.get("doc")
self.schema_type = schema.get("type")
self.schema_str = schema.get("data")
diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py
index 93d84d8b246e51..c769c6705ac3f6 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py
@@ -414,9 +414,13 @@ def _process_upstream_lineage_row(
except Exception as e:
self.report.num_upstream_lineage_edge_parsing_failed += 1
upstream_tables = db_row.get("UPSTREAM_TABLES")
+ downstream_table = db_row.get("DOWNSTREAM_TABLE_NAME")
self.structured_reporter.warning(
"Failed to parse lineage edge",
- context=f"Upstreams: {upstream_tables} Downstreams: {db_row.get('DOWNSTREAM_TABLE_NAME')}",
+ # Tricky: sometimes the full row data is too large, and so the context
+ # message gets truncated. By pulling out the upstreams and downstream
+ # list, we can at least get the important fields if truncation does occur.
+ context=f"Upstreams: {upstream_tables} Downstream: {downstream_table} Full row: {db_row}",
exc=e,
)
return None
diff --git a/metadata-ingestion/tests/integration/git/test_git_clone.py b/metadata-ingestion/tests/integration/git/test_git_clone.py
index 60cf20fefcbdd1..01e075930998a4 100644
--- a/metadata-ingestion/tests/integration/git/test_git_clone.py
+++ b/metadata-ingestion/tests/integration/git/test_git_clone.py
@@ -1,4 +1,5 @@
import os
+import pathlib
import pytest
from pydantic import SecretStr
@@ -12,7 +13,7 @@
LOOKML_TEST_SSH_KEY = os.environ.get("DATAHUB_LOOKML_GIT_TEST_SSH_KEY")
-def test_base_url_guessing():
+def test_base_url_guessing() -> None:
# Basic GitHub repo.
config = GitInfo(repo="https://github.com/datahub-project/datahub", branch="master")
assert config.repo_ssh_locator == "git@github.com:datahub-project/datahub.git"
@@ -70,7 +71,7 @@ def test_base_url_guessing():
)
-def test_github_branch():
+def test_github_branch() -> None:
config = GitInfo(
repo="owner/repo",
)
@@ -83,11 +84,37 @@ def test_github_branch():
assert config.branch_for_clone == "main"
+def test_url_subdir() -> None:
+ git_ref = GitReference(repo="https://github.com/org/repo", url_subdir="dbt")
+ assert (
+ git_ref.get_url_for_file_path("model.sql")
+ == "https://github.com/org/repo/blob/main/dbt/model.sql"
+ )
+
+ git_ref = GitReference(repo="https://gitlab.com/org/repo", url_subdir="dbt")
+ assert (
+ git_ref.get_url_for_file_path("model.sql")
+ == "https://gitlab.com/org/repo/-/blob/main/dbt/model.sql"
+ )
+
+ git_ref = GitReference(repo="https://github.com/org/repo", url_subdir="")
+ assert (
+ git_ref.get_url_for_file_path("model.sql")
+ == "https://github.com/org/repo/blob/main/model.sql"
+ )
+
+ git_ref = GitReference(repo="https://github.com/org/repo", url_subdir="dbt/models")
+ assert (
+ git_ref.get_url_for_file_path("model.sql")
+ == "https://github.com/org/repo/blob/main/dbt/models/model.sql"
+ )
+
+
def test_sanitize_repo_url() -> None:
assert_doctest(datahub.ingestion.source.git.git_import)
-def test_git_clone_public(tmp_path):
+def test_git_clone_public(tmp_path: pathlib.Path) -> None:
git_clone = GitClone(str(tmp_path))
checkout_dir = git_clone.clone(
ssh_key=None,
@@ -107,7 +134,7 @@ def test_git_clone_public(tmp_path):
LOOKML_TEST_SSH_KEY is None,
reason="DATAHUB_LOOKML_GIT_TEST_SSH_KEY env variable is not configured",
)
-def test_git_clone_private(tmp_path):
+def test_git_clone_private(tmp_path: pathlib.Path) -> None:
git_clone = GitClone(str(tmp_path))
secret_key = SecretStr(LOOKML_TEST_SSH_KEY) if LOOKML_TEST_SSH_KEY else None
diff --git a/metadata-ingestion/tests/integration/kafka/test_kafka.py b/metadata-ingestion/tests/integration/kafka/test_kafka.py
index 0d9a714625e96b..648c4b26b20a76 100644
--- a/metadata-ingestion/tests/integration/kafka/test_kafka.py
+++ b/metadata-ingestion/tests/integration/kafka/test_kafka.py
@@ -102,7 +102,7 @@ def test_kafka_test_connection(mock_kafka_service, config_dict, is_success):
test_connection_helpers.assert_capability_report(
capability_report=report.capability_report,
failure_capabilities={
- SourceCapability.SCHEMA_METADATA: "Failed to establish a new connection"
+ SourceCapability.SCHEMA_METADATA: "[Errno 111] Connection refused"
},
)
diff --git a/metadata-ingestion/tests/unit/api/entities/structuredproperties/__init__.py b/metadata-ingestion/tests/unit/api/entities/structuredproperties/__init__.py
new file mode 100644
index 00000000000000..e69de29bb2d1d6
diff --git a/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json b/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json
new file mode 100644
index 00000000000000..29386ece7b0ca1
--- /dev/null
+++ b/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json
@@ -0,0 +1,194 @@
+[
+{
+ "entityType": "structuredProperty",
+ "entityUrn": "urn:li:structuredProperty:io.acryl.privacy.retentionTime",
+ "changeType": "UPSERT",
+ "aspectName": "propertyDefinition",
+ "aspect": {
+ "json": {
+ "qualifiedName": "io.acryl.privacy.retentionTime",
+ "displayName": "Retention Time",
+ "valueType": "urn:li:dataType:datahub.number",
+ "allowedValues": [
+ {
+ "value": {
+ "string": "30"
+ },
+ "description": "30 days, usually reserved for datasets that are ephemeral and contain pii"
+ },
+ {
+ "value": {
+ "string": "90"
+ },
+ "description": "Use this for datasets that drive monthly reporting but contain pii"
+ },
+ {
+ "value": {
+ "string": "365"
+ },
+ "description": "Use this for non-sensitive data that can be retained for longer"
+ }
+ ],
+ "cardinality": "MULTIPLE",
+ "entityTypes": [
+ "urn:li:entityType:datahub.dataset",
+ "urn:li:entityType:datahub.dataFlow"
+ ],
+ "description": "Retention Time is used to figure out how long to retain records in a dataset",
+ "immutable": false
+ }
+ }
+},
+{
+ "entityType": "structuredProperty",
+ "entityUrn": "urn:li:structuredProperty:io.acryl.dataManagement.replicationSLA",
+ "changeType": "UPSERT",
+ "aspectName": "propertyDefinition",
+ "aspect": {
+ "json": {
+ "qualifiedName": "io.acryl.dataManagement.replicationSLA",
+ "displayName": "Replication SLA",
+ "valueType": "urn:li:dataType:datahub.number",
+ "cardinality": "SINGLE",
+ "entityTypes": [
+ "urn:li:entityType:datahub.dataset"
+ ],
+ "description": "SLA for how long data can be delayed before replicating to the destination cluster",
+ "immutable": false
+ }
+ }
+},
+{
+ "entityType": "structuredProperty",
+ "entityUrn": "urn:li:structuredProperty:io.acryl.dataManagement.deprecationDate",
+ "changeType": "UPSERT",
+ "aspectName": "propertyDefinition",
+ "aspect": {
+ "json": {
+ "qualifiedName": "io.acryl.dataManagement.deprecationDate",
+ "displayName": "Deprecation Date",
+ "valueType": "urn:li:dataType:datahub.date",
+ "cardinality": "SINGLE",
+ "entityTypes": [
+ "urn:li:entityType:datahub.dataset",
+ "urn:li:entityType:datahub.dataFlow",
+ "urn:li:entityType:datahub.dataJob"
+ ],
+ "immutable": false
+ }
+ }
+},
+{
+ "entityType": "structuredProperty",
+ "entityUrn": "urn:li:structuredProperty:io.acryl.dataManagement.steward",
+ "changeType": "UPSERT",
+ "aspectName": "propertyDefinition",
+ "aspect": {
+ "json": {
+ "qualifiedName": "io.acryl.dataManagement.steward",
+ "displayName": "Steward",
+ "valueType": "urn:li:dataType:datahub.urn",
+ "typeQualifier": {
+ "allowedTypes": [
+ "urn:li:entityType:datahub.corpuser",
+ "urn:li:entityType:datahub.corpGroup"
+ ]
+ },
+ "cardinality": "SINGLE",
+ "entityTypes": [
+ "urn:li:entityType:datahub.dataset",
+ "urn:li:entityType:datahub.dataFlow",
+ "urn:li:entityType:datahub.dataJob"
+ ],
+ "immutable": false
+ }
+ }
+},
+{
+ "entityType": "structuredProperty",
+ "entityUrn": "urn:li:structuredProperty:io.acryl.dataManagement.certifier",
+ "changeType": "UPSERT",
+ "aspectName": "propertyDefinition",
+ "aspect": {
+ "json": {
+ "qualifiedName": "io.acryl.dataManagement.certifier",
+ "displayName": "Person Certifying the asset",
+ "valueType": "urn:li:dataType:datahub.urn",
+ "cardinality": "SINGLE",
+ "entityTypes": [
+ "urn:li:entityType:datahub.dataset",
+ "urn:li:entityType:datahub.schemaField"
+ ],
+ "immutable": false
+ }
+ }
+},
+{
+ "entityType": "structuredProperty",
+ "entityUrn": "urn:li:structuredProperty:io.acryl.dataManagement.team",
+ "changeType": "UPSERT",
+ "aspectName": "propertyDefinition",
+ "aspect": {
+ "json": {
+ "qualifiedName": "io.acryl.dataManagement.team",
+ "displayName": "Management team",
+ "valueType": "urn:li:dataType:datahub.string",
+ "cardinality": "SINGLE",
+ "entityTypes": [
+ "urn:li:entityType:datahub.dataset"
+ ],
+ "immutable": false
+ }
+ }
+},
+{
+ "entityType": "structuredProperty",
+ "entityUrn": "urn:li:structuredProperty:projectNames",
+ "changeType": "UPSERT",
+ "aspectName": "propertyDefinition",
+ "aspect": {
+ "json": {
+ "qualifiedName": "projectNames",
+ "displayName": "Project names",
+ "valueType": "urn:li:dataType:datahub.string",
+ "allowedValues": [
+ {
+ "value": {
+ "string": "Tracking"
+ },
+ "description": "test value 1 for project"
+ },
+ {
+ "value": {
+ "string": "DataHub"
+ },
+ "description": "test value 2 for project"
+ }
+ ],
+ "cardinality": "MULTIPLE",
+ "entityTypes": [
+ "urn:li:entityType:datahub.dataset"
+ ],
+ "immutable": false
+ }
+ }
+},
+{
+ "entityType": "structuredProperty",
+ "entityUrn": "urn:li:structuredProperty:namespace",
+ "changeType": "UPSERT",
+ "aspectName": "propertyDefinition",
+ "aspect": {
+ "json": {
+ "qualifiedName": "namespace",
+ "displayName": "Namespace",
+ "valueType": "urn:li:dataType:datahub.string",
+ "cardinality": "SINGLE",
+ "entityTypes": [
+ "urn:li:entityType:datahub.dataset"
+ ],
+ "immutable": false
+ }
+ }
+}
+]
\ No newline at end of file
diff --git a/metadata-ingestion/tests/unit/api/entities/structuredproperties/test_structuredproperties.py b/metadata-ingestion/tests/unit/api/entities/structuredproperties/test_structuredproperties.py
new file mode 100644
index 00000000000000..e96b7c1f98437e
--- /dev/null
+++ b/metadata-ingestion/tests/unit/api/entities/structuredproperties/test_structuredproperties.py
@@ -0,0 +1,38 @@
+import pathlib
+
+import pydantic
+import pytest
+
+from datahub.api.entities.structuredproperties.structuredproperties import (
+ StructuredProperties,
+ TypeQualifierAllowedTypes,
+)
+from tests.test_helpers.mce_helpers import check_goldens_stream
+
+RESOURCE_DIR = pathlib.Path(__file__).parent
+
+
+def test_type_validation() -> None:
+ with pytest.raises(pydantic.ValidationError):
+ TypeQualifierAllowedTypes(allowed_types=["thisdoesnotexist"])
+
+ types = TypeQualifierAllowedTypes(allowed_types=["dataset"])
+ assert types.allowed_types == ["urn:li:entityType:datahub.dataset"]
+
+
+def test_structuredproperties_load(pytestconfig: pytest.Config) -> None:
+ example_properties_file = (
+ pytestconfig.rootpath
+ / "examples/structured_properties/structured_properties.yaml"
+ )
+
+ properties = StructuredProperties.from_yaml(str(example_properties_file))
+ mcps = []
+ for property in properties:
+ mcps.extend(property.generate_mcps())
+
+ check_goldens_stream(
+ pytestconfig,
+ mcps,
+ golden_path=RESOURCE_DIR / "example_structured_properties_golden.json",
+ )
diff --git a/metadata-ingestion/tests/unit/serde/test_codegen.py b/metadata-ingestion/tests/unit/serde/test_codegen.py
index 37ac35586950e1..98d62d5643ff2d 100644
--- a/metadata-ingestion/tests/unit/serde/test_codegen.py
+++ b/metadata-ingestion/tests/unit/serde/test_codegen.py
@@ -18,6 +18,7 @@
UpstreamClass,
_Aspect,
)
+from datahub.utilities.urns._urn_base import URN_TYPES
_UPDATE_ENTITY_REGISTRY = os.getenv("UPDATE_ENTITY_REGISTRY", "false").lower() == "true"
ENTITY_REGISTRY_PATH = pathlib.Path(
@@ -165,3 +166,9 @@ def test_enum_options():
# This is mainly a sanity check to ensure that it doesn't do anything too crazy.
env_options = get_enum_options(FabricTypeClass)
assert "PROD" in env_options
+
+
+def test_urn_types() -> None:
+ assert len(URN_TYPES) > 10
+ for checked_type in ["dataset", "dashboard", "dataFlow", "schemaField"]:
+ assert checked_type in URN_TYPES