diff --git a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py index fd3fe7ca098ec..e37281dea86e1 100644 --- a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py +++ b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py @@ -9,27 +9,18 @@ from datahub.configuration.common import ConfigModel from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.ingestion.api.global_context import get_graph_context, set_graph_context -from datahub.ingestion.graph.client import DataHubGraph, get_default_graph +from datahub.ingestion.graph.client import DataHubGraph from datahub.metadata.schema_classes import ( PropertyValueClass, StructuredPropertyDefinitionClass, ) -from datahub.utilities.urns.urn import Urn +from datahub.metadata.urns import StructuredPropertyUrn, Urn +from datahub.utilities.urns._urn_base import URN_TYPES logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -class StructuredPropertiesConfig: - """Configuration class to hold the graph client""" - - @classmethod - def get_graph_required(cls) -> DataHubGraph: - """Get the current graph, falling back to default if none set""" - return get_graph_context() or get_default_graph() - - class AllowedTypes(Enum): STRING = "string" RICH_TEXT = "rich_text" @@ -51,29 +42,28 @@ class AllowedValue(ConfigModel): description: Optional[str] = None -VALID_ENTITY_TYPES_PREFIX_STRING = ", ".join( - [ - f"urn:li:entityType:datahub.{x}" - for x in ["dataset", "dashboard", "dataFlow", "schemaField"] - ] -) -VALID_ENTITY_TYPES_STRING = f"Valid entity type urns are {VALID_ENTITY_TYPES_PREFIX_STRING}, etc... Ensure that the entity type is valid." +VALID_ENTITY_TYPE_URNS = [ + Urn.make_entity_type_urn(entity_type) for entity_type in URN_TYPES.keys() +] +_VALID_ENTITY_TYPES_STRING = f"Valid entity type urns are {', '.join(VALID_ENTITY_TYPE_URNS)}, etc... Ensure that the entity type is valid." + + +def _validate_entity_type_urn(v: str) -> str: + urn = Urn.make_entity_type_urn(v) + if urn not in VALID_ENTITY_TYPE_URNS: + raise ValueError( + f"Input {v} is not a valid entity type urn. {_VALID_ENTITY_TYPES_STRING}" + ) + v = str(urn) + return v class TypeQualifierAllowedTypes(ConfigModel): allowed_types: List[str] - @validator("allowed_types", each_item=True) - def validate_allowed_types(cls, v): - if v: - graph = StructuredPropertiesConfig.get_graph_required() - validated_urn = Urn.make_entity_type_urn(v) - if not graph.exists(validated_urn): - raise ValueError( - f"Input {v} is not a valid entity type urn. {VALID_ENTITY_TYPES_STRING}" - ) - v = str(validated_urn) - return v + _check_allowed_types = validator("allowed_types", each_item=True, allow_reuse=True)( + _validate_entity_type_urn + ) class StructuredProperties(ConfigModel): @@ -90,22 +80,30 @@ class StructuredProperties(ConfigModel): type_qualifier: Optional[TypeQualifierAllowedTypes] = None immutable: Optional[bool] = False - @validator("entity_types", each_item=True) - def validate_entity_types(cls, v): - if v: - graph = StructuredPropertiesConfig.get_graph_required() - validated_urn = Urn.make_entity_type_urn(v) - if not graph.exists(validated_urn): - raise ValueError( - f"Input {v} is not a valid entity type urn. {VALID_ENTITY_TYPES_STRING}" - ) - v = str(validated_urn) + _check_entity_types = validator("entity_types", each_item=True, allow_reuse=True)( + _validate_entity_type_urn + ) + + @validator("type") + def validate_type(cls, v: str) -> str: + # Convert to lowercase if needed + if not v.islower(): + logger.warning( + f"Structured property type should be lowercase. Updated to {v.lower()}" + ) + v = v.lower() + + # Check if type is allowed + if not AllowedTypes.check_allowed_type(v): + raise ValueError( + f"Type {v} is not allowed. Allowed types are {AllowedTypes.values()}" + ) return v @property def fqn(self) -> str: assert self.urn is not None - id = Urn.create_from_string(self.urn).get_entity_id()[0] + id = StructuredPropertyUrn.from_string(self.urn).id if self.qualified_name is not None: # ensure that qualified name and ID match assert ( @@ -122,101 +120,90 @@ def urn_must_be_present(cls, v, values): return v @staticmethod - def create(file: str, graph: Optional[DataHubGraph] = None) -> None: - with set_graph_context(graph): - graph = StructuredPropertiesConfig.get_graph_required() - - with open(file) as fp: - structuredproperties: List[dict] = yaml.safe_load(fp) - for structuredproperty_raw in structuredproperties: - structuredproperty = StructuredProperties.parse_obj( - structuredproperty_raw - ) - - if not structuredproperty.type.islower(): - structuredproperty.type = structuredproperty.type.lower() - logger.warning( - f"Structured property type should be lowercase. Updated to {structuredproperty.type}" - ) - if not AllowedTypes.check_allowed_type(structuredproperty.type): - raise ValueError( - f"Type {structuredproperty.type} is not allowed. Allowed types are {AllowedTypes.values()}" - ) - mcp = MetadataChangeProposalWrapper( - entityUrn=structuredproperty.urn, - aspect=StructuredPropertyDefinitionClass( - qualifiedName=structuredproperty.fqn, - valueType=Urn.make_data_type_urn(structuredproperty.type), - displayName=structuredproperty.display_name, - description=structuredproperty.description, - entityTypes=[ - Urn.make_entity_type_urn(entity_type) - for entity_type in structuredproperty.entity_types or [] - ], - cardinality=structuredproperty.cardinality, - immutable=structuredproperty.immutable, - allowedValues=( - [ - PropertyValueClass( - value=v.value, description=v.description - ) - for v in structuredproperty.allowed_values - ] - if structuredproperty.allowed_values - else None - ), - typeQualifier=( - { - "allowedTypes": structuredproperty.type_qualifier.allowed_types - } - if structuredproperty.type_qualifier - else None - ), - ), - ) - graph.emit_mcp(mcp) - - logger.info(f"Created structured property {structuredproperty.urn}") - - @classmethod - def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties": - with set_graph_context(graph): - structured_property: Optional[ - StructuredPropertyDefinitionClass - ] = graph.get_aspect(urn, StructuredPropertyDefinitionClass) - if structured_property is None: - raise Exception( - "StructuredPropertyDefinition aspect is None. Unable to create structured property." - ) - return StructuredProperties( - urn=urn, - qualified_name=structured_property.qualifiedName, - display_name=structured_property.displayName, - type=structured_property.valueType, - description=structured_property.description, - entity_types=structured_property.entityTypes, - cardinality=structured_property.cardinality, - allowed_values=( + def from_yaml(file: str) -> List["StructuredProperties"]: + with open(file) as fp: + structuredproperties: List[dict] = yaml.safe_load(fp) + + result: List[StructuredProperties] = [] + for structuredproperty_raw in structuredproperties: + result.append(StructuredProperties.parse_obj(structuredproperty_raw)) + return result + + def generate_mcps(self) -> List[MetadataChangeProposalWrapper]: + mcp = MetadataChangeProposalWrapper( + entityUrn=self.urn, + aspect=StructuredPropertyDefinitionClass( + qualifiedName=self.fqn, + valueType=Urn.make_data_type_urn(self.type), + displayName=self.display_name, + description=self.description, + entityTypes=[ + Urn.make_entity_type_urn(entity_type) + for entity_type in self.entity_types or [] + ], + cardinality=self.cardinality, + immutable=self.immutable, + allowedValues=( [ - AllowedValue( - value=av.value, - description=av.description, - ) - for av in structured_property.allowedValues or [] + PropertyValueClass(value=v.value, description=v.description) + for v in self.allowed_values ] - if structured_property.allowedValues is not None + if self.allowed_values else None ), - type_qualifier=( - { - "allowed_types": structured_property.typeQualifier.get( - "allowedTypes" - ) - } - if structured_property.typeQualifier + typeQualifier=( + {"allowedTypes": self.type_qualifier.allowed_types} + if self.type_qualifier else None ), + ), + ) + return [mcp] + + @staticmethod + def create(file: str, graph: DataHubGraph) -> None: + # TODO: Deprecate this method. + structuredproperties = StructuredProperties.from_yaml(file) + for structuredproperty in structuredproperties: + for mcp in structuredproperty.generate_mcps(): + graph.emit_mcp(mcp) + + logger.info(f"Created structured property {structuredproperty.urn}") + + @classmethod + def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties": + structured_property: Optional[ + StructuredPropertyDefinitionClass + ] = graph.get_aspect(urn, StructuredPropertyDefinitionClass) + if structured_property is None: + raise Exception( + "StructuredPropertyDefinition aspect is None. Unable to create structured property." ) + return StructuredProperties( + urn=urn, + qualified_name=structured_property.qualifiedName, + display_name=structured_property.displayName, + type=structured_property.valueType, + description=structured_property.description, + entity_types=structured_property.entityTypes, + cardinality=structured_property.cardinality, + allowed_values=( + [ + AllowedValue( + value=av.value, + description=av.description, + ) + for av in structured_property.allowedValues or [] + ] + if structured_property.allowedValues is not None + else None + ), + type_qualifier=( + {"allowed_types": structured_property.typeQualifier.get("allowedTypes")} + if structured_property.typeQualifier + else None + ), + ) def to_yaml( self, diff --git a/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py b/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py index 4162d44b9b0ea..42285cf13a5dd 100644 --- a/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py +++ b/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py @@ -31,7 +31,8 @@ def properties() -> None: def upsert(file: Path) -> None: """Upsert structured properties in DataHub.""" - StructuredProperties.create(str(file)) + with get_default_graph() as graph: + StructuredProperties.create(str(file), graph) @properties.command( diff --git a/metadata-ingestion/tests/unit/api/entities/structuredproperties/__init__.py b/metadata-ingestion/tests/unit/api/entities/structuredproperties/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json b/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json new file mode 100644 index 0000000000000..29386ece7b0ca --- /dev/null +++ b/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json @@ -0,0 +1,194 @@ +[ +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:io.acryl.privacy.retentionTime", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "io.acryl.privacy.retentionTime", + "displayName": "Retention Time", + "valueType": "urn:li:dataType:datahub.number", + "allowedValues": [ + { + "value": { + "string": "30" + }, + "description": "30 days, usually reserved for datasets that are ephemeral and contain pii" + }, + { + "value": { + "string": "90" + }, + "description": "Use this for datasets that drive monthly reporting but contain pii" + }, + { + "value": { + "string": "365" + }, + "description": "Use this for non-sensitive data that can be retained for longer" + } + ], + "cardinality": "MULTIPLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset", + "urn:li:entityType:datahub.dataFlow" + ], + "description": "Retention Time is used to figure out how long to retain records in a dataset", + "immutable": false + } + } +}, +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:io.acryl.dataManagement.replicationSLA", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "io.acryl.dataManagement.replicationSLA", + "displayName": "Replication SLA", + "valueType": "urn:li:dataType:datahub.number", + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset" + ], + "description": "SLA for how long data can be delayed before replicating to the destination cluster", + "immutable": false + } + } +}, +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:io.acryl.dataManagement.deprecationDate", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "io.acryl.dataManagement.deprecationDate", + "displayName": "Deprecation Date", + "valueType": "urn:li:dataType:datahub.date", + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset", + "urn:li:entityType:datahub.dataFlow", + "urn:li:entityType:datahub.dataJob" + ], + "immutable": false + } + } +}, +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:io.acryl.dataManagement.steward", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "io.acryl.dataManagement.steward", + "displayName": "Steward", + "valueType": "urn:li:dataType:datahub.urn", + "typeQualifier": { + "allowedTypes": [ + "urn:li:entityType:datahub.corpuser", + "urn:li:entityType:datahub.corpGroup" + ] + }, + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset", + "urn:li:entityType:datahub.dataFlow", + "urn:li:entityType:datahub.dataJob" + ], + "immutable": false + } + } +}, +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:io.acryl.dataManagement.certifier", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "io.acryl.dataManagement.certifier", + "displayName": "Person Certifying the asset", + "valueType": "urn:li:dataType:datahub.urn", + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset", + "urn:li:entityType:datahub.schemaField" + ], + "immutable": false + } + } +}, +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:io.acryl.dataManagement.team", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "io.acryl.dataManagement.team", + "displayName": "Management team", + "valueType": "urn:li:dataType:datahub.string", + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset" + ], + "immutable": false + } + } +}, +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:projectNames", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "projectNames", + "displayName": "Project names", + "valueType": "urn:li:dataType:datahub.string", + "allowedValues": [ + { + "value": { + "string": "Tracking" + }, + "description": "test value 1 for project" + }, + { + "value": { + "string": "DataHub" + }, + "description": "test value 2 for project" + } + ], + "cardinality": "MULTIPLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset" + ], + "immutable": false + } + } +}, +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:namespace", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "namespace", + "displayName": "Namespace", + "valueType": "urn:li:dataType:datahub.string", + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset" + ], + "immutable": false + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/api/entities/structuredproperties/test_structuredproperties.py b/metadata-ingestion/tests/unit/api/entities/structuredproperties/test_structuredproperties.py new file mode 100644 index 0000000000000..e96b7c1f98437 --- /dev/null +++ b/metadata-ingestion/tests/unit/api/entities/structuredproperties/test_structuredproperties.py @@ -0,0 +1,38 @@ +import pathlib + +import pydantic +import pytest + +from datahub.api.entities.structuredproperties.structuredproperties import ( + StructuredProperties, + TypeQualifierAllowedTypes, +) +from tests.test_helpers.mce_helpers import check_goldens_stream + +RESOURCE_DIR = pathlib.Path(__file__).parent + + +def test_type_validation() -> None: + with pytest.raises(pydantic.ValidationError): + TypeQualifierAllowedTypes(allowed_types=["thisdoesnotexist"]) + + types = TypeQualifierAllowedTypes(allowed_types=["dataset"]) + assert types.allowed_types == ["urn:li:entityType:datahub.dataset"] + + +def test_structuredproperties_load(pytestconfig: pytest.Config) -> None: + example_properties_file = ( + pytestconfig.rootpath + / "examples/structured_properties/structured_properties.yaml" + ) + + properties = StructuredProperties.from_yaml(str(example_properties_file)) + mcps = [] + for property in properties: + mcps.extend(property.generate_mcps()) + + check_goldens_stream( + pytestconfig, + mcps, + golden_path=RESOURCE_DIR / "example_structured_properties_golden.json", + ) diff --git a/metadata-ingestion/tests/unit/serde/test_codegen.py b/metadata-ingestion/tests/unit/serde/test_codegen.py index 37ac35586950e..98d62d5643ff2 100644 --- a/metadata-ingestion/tests/unit/serde/test_codegen.py +++ b/metadata-ingestion/tests/unit/serde/test_codegen.py @@ -18,6 +18,7 @@ UpstreamClass, _Aspect, ) +from datahub.utilities.urns._urn_base import URN_TYPES _UPDATE_ENTITY_REGISTRY = os.getenv("UPDATE_ENTITY_REGISTRY", "false").lower() == "true" ENTITY_REGISTRY_PATH = pathlib.Path( @@ -165,3 +166,9 @@ def test_enum_options(): # This is mainly a sanity check to ensure that it doesn't do anything too crazy. env_options = get_enum_options(FabricTypeClass) assert "PROD" in env_options + + +def test_urn_types() -> None: + assert len(URN_TYPES) > 10 + for checked_type in ["dataset", "dashboard", "dataFlow", "schemaField"]: + assert checked_type in URN_TYPES