Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cli/properties): allow structured properties without a graph instance #12144

Merged
merged 6 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,18 @@

from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.global_context import get_graph_context, set_graph_context
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
PropertyValueClass,
StructuredPropertyDefinitionClass,
)
from datahub.utilities.urns.urn import Urn
from datahub.metadata.urns import StructuredPropertyUrn, Urn
from datahub.utilities.urns._urn_base import URN_TYPES

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class StructuredPropertiesConfig:
"""Configuration class to hold the graph client"""

@classmethod
def get_graph_required(cls) -> DataHubGraph:
"""Get the current graph, falling back to default if none set"""
return get_graph_context() or get_default_graph()


class AllowedTypes(Enum):
STRING = "string"
RICH_TEXT = "rich_text"
Expand All @@ -51,29 +42,28 @@ class AllowedValue(ConfigModel):
description: Optional[str] = None


VALID_ENTITY_TYPES_PREFIX_STRING = ", ".join(
[
f"urn:li:entityType:datahub.{x}"
for x in ["dataset", "dashboard", "dataFlow", "schemaField"]
]
)
VALID_ENTITY_TYPES_STRING = f"Valid entity type urns are {VALID_ENTITY_TYPES_PREFIX_STRING}, etc... Ensure that the entity type is valid."
VALID_ENTITY_TYPE_URNS = [
Urn.make_entity_type_urn(entity_type) for entity_type in URN_TYPES.keys()
]
_VALID_ENTITY_TYPES_STRING = f"Valid entity type urns are {', '.join(VALID_ENTITY_TYPE_URNS)}, etc... Ensure that the entity type is valid."


def _validate_entity_type_urn(v: str) -> str:
urn = Urn.make_entity_type_urn(v)
if urn not in VALID_ENTITY_TYPE_URNS:
raise ValueError(
f"Input {v} is not a valid entity type urn. {_VALID_ENTITY_TYPES_STRING}"
)
v = str(urn)
return v


class TypeQualifierAllowedTypes(ConfigModel):
allowed_types: List[str]

@validator("allowed_types", each_item=True)
def validate_allowed_types(cls, v):
if v:
graph = StructuredPropertiesConfig.get_graph_required()
validated_urn = Urn.make_entity_type_urn(v)
if not graph.exists(validated_urn):
raise ValueError(
f"Input {v} is not a valid entity type urn. {VALID_ENTITY_TYPES_STRING}"
)
v = str(validated_urn)
return v
_check_allowed_types = validator("allowed_types", each_item=True, allow_reuse=True)(
_validate_entity_type_urn
)


class StructuredProperties(ConfigModel):
Expand All @@ -90,22 +80,30 @@ class StructuredProperties(ConfigModel):
type_qualifier: Optional[TypeQualifierAllowedTypes] = None
immutable: Optional[bool] = False

@validator("entity_types", each_item=True)
def validate_entity_types(cls, v):
if v:
graph = StructuredPropertiesConfig.get_graph_required()
validated_urn = Urn.make_entity_type_urn(v)
if not graph.exists(validated_urn):
raise ValueError(
f"Input {v} is not a valid entity type urn. {VALID_ENTITY_TYPES_STRING}"
)
v = str(validated_urn)
_check_entity_types = validator("entity_types", each_item=True, allow_reuse=True)(
_validate_entity_type_urn
)

@validator("type")
def validate_type(cls, v: str) -> str:
# Convert to lowercase if needed
if not v.islower():
logger.warning(
f"Structured property type should be lowercase. Updated to {v.lower()}"
)
v = v.lower()

# Check if type is allowed
if not AllowedTypes.check_allowed_type(v):
raise ValueError(
f"Type {v} is not allowed. Allowed types are {AllowedTypes.values()}"
)
return v

@property
def fqn(self) -> str:
assert self.urn is not None
id = Urn.create_from_string(self.urn).get_entity_id()[0]
id = StructuredPropertyUrn.from_string(self.urn).id
if self.qualified_name is not None:
# ensure that qualified name and ID match
assert (
Expand All @@ -122,101 +120,90 @@ def urn_must_be_present(cls, v, values):
return v

@staticmethod
def create(file: str, graph: Optional[DataHubGraph] = None) -> None:
with set_graph_context(graph):
graph = StructuredPropertiesConfig.get_graph_required()

with open(file) as fp:
structuredproperties: List[dict] = yaml.safe_load(fp)
for structuredproperty_raw in structuredproperties:
structuredproperty = StructuredProperties.parse_obj(
structuredproperty_raw
)

if not structuredproperty.type.islower():
structuredproperty.type = structuredproperty.type.lower()
logger.warning(
f"Structured property type should be lowercase. Updated to {structuredproperty.type}"
)
if not AllowedTypes.check_allowed_type(structuredproperty.type):
raise ValueError(
f"Type {structuredproperty.type} is not allowed. Allowed types are {AllowedTypes.values()}"
)
mcp = MetadataChangeProposalWrapper(
entityUrn=structuredproperty.urn,
aspect=StructuredPropertyDefinitionClass(
qualifiedName=structuredproperty.fqn,
valueType=Urn.make_data_type_urn(structuredproperty.type),
displayName=structuredproperty.display_name,
description=structuredproperty.description,
entityTypes=[
Urn.make_entity_type_urn(entity_type)
for entity_type in structuredproperty.entity_types or []
],
cardinality=structuredproperty.cardinality,
immutable=structuredproperty.immutable,
allowedValues=(
[
PropertyValueClass(
value=v.value, description=v.description
)
for v in structuredproperty.allowed_values
]
if structuredproperty.allowed_values
else None
),
typeQualifier=(
{
"allowedTypes": structuredproperty.type_qualifier.allowed_types
}
if structuredproperty.type_qualifier
else None
),
),
)
graph.emit_mcp(mcp)

logger.info(f"Created structured property {structuredproperty.urn}")

@classmethod
def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties":
with set_graph_context(graph):
structured_property: Optional[
StructuredPropertyDefinitionClass
] = graph.get_aspect(urn, StructuredPropertyDefinitionClass)
if structured_property is None:
raise Exception(
"StructuredPropertyDefinition aspect is None. Unable to create structured property."
)
return StructuredProperties(
urn=urn,
qualified_name=structured_property.qualifiedName,
display_name=structured_property.displayName,
type=structured_property.valueType,
description=structured_property.description,
entity_types=structured_property.entityTypes,
cardinality=structured_property.cardinality,
allowed_values=(
def from_yaml(file: str) -> List["StructuredProperties"]:
with open(file) as fp:
structuredproperties: List[dict] = yaml.safe_load(fp)

result: List[StructuredProperties] = []
for structuredproperty_raw in structuredproperties:
result.append(StructuredProperties.parse_obj(structuredproperty_raw))
return result

def generate_mcps(self) -> List[MetadataChangeProposalWrapper]:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=StructuredPropertyDefinitionClass(
qualifiedName=self.fqn,
valueType=Urn.make_data_type_urn(self.type),
displayName=self.display_name,
description=self.description,
entityTypes=[
Urn.make_entity_type_urn(entity_type)
for entity_type in self.entity_types or []
],
cardinality=self.cardinality,
immutable=self.immutable,
allowedValues=(
[
AllowedValue(
value=av.value,
description=av.description,
)
for av in structured_property.allowedValues or []
PropertyValueClass(value=v.value, description=v.description)
for v in self.allowed_values
]
if structured_property.allowedValues is not None
if self.allowed_values
else None
),
type_qualifier=(
{
"allowed_types": structured_property.typeQualifier.get(
"allowedTypes"
)
}
if structured_property.typeQualifier
typeQualifier=(
{"allowedTypes": self.type_qualifier.allowed_types}
if self.type_qualifier
else None
),
),
)
return [mcp]

@staticmethod
def create(file: str, graph: DataHubGraph) -> None:
# TODO: Deprecate this method.
structuredproperties = StructuredProperties.from_yaml(file)
for structuredproperty in structuredproperties:
for mcp in structuredproperty.generate_mcps():
graph.emit_mcp(mcp)

logger.info(f"Created structured property {structuredproperty.urn}")

@classmethod
def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties":
structured_property: Optional[
StructuredPropertyDefinitionClass
] = graph.get_aspect(urn, StructuredPropertyDefinitionClass)
if structured_property is None:
raise Exception(
"StructuredPropertyDefinition aspect is None. Unable to create structured property."
)
return StructuredProperties(
urn=urn,
qualified_name=structured_property.qualifiedName,
display_name=structured_property.displayName,
type=structured_property.valueType,
description=structured_property.description,
entity_types=structured_property.entityTypes,
cardinality=structured_property.cardinality,
allowed_values=(
[
AllowedValue(
value=av.value,
description=av.description,
)
for av in structured_property.allowedValues or []
]
if structured_property.allowedValues is not None
else None
),
type_qualifier=(
{"allowed_types": structured_property.typeQualifier.get("allowedTypes")}
if structured_property.typeQualifier
else None
),
)

def to_yaml(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def properties() -> None:
def upsert(file: Path) -> None:
"""Upsert structured properties in DataHub."""

StructuredProperties.create(str(file))
with get_default_graph() as graph:
StructuredProperties.create(str(file), graph)


@properties.command(
Expand Down
Loading
Loading