Skip to content

Commit

Permalink
Merge branch 'master' into confluent_cloud_kafka_connect_pr2
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Dec 18, 2024
2 parents 1cde17e + 01a2c0c commit 0ba7b0b
Show file tree
Hide file tree
Showing 13 changed files with 426 additions and 179 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
# now provide prebuilt wheels for most platforms, including M1 Macs and
# Linux aarch64 (e.g. Docker's linux/arm64). Installing confluent_kafka
# from source remains a pain.
"confluent_kafka>=1.9.0",
"confluent_kafka[schemaregistry]>=1.9.0",
# We currently require both Avro libraries. The codegen uses avro-python3 (above)
# schema parsers at runtime for generating and reading JSON into Python objects.
# At the same time, we use Kafka's AvroSerializer, which internally relies on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,18 @@

from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.global_context import get_graph_context, set_graph_context
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
PropertyValueClass,
StructuredPropertyDefinitionClass,
)
from datahub.utilities.urns.urn import Urn
from datahub.metadata.urns import StructuredPropertyUrn, Urn
from datahub.utilities.urns._urn_base import URN_TYPES

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class StructuredPropertiesConfig:
"""Configuration class to hold the graph client"""

@classmethod
def get_graph_required(cls) -> DataHubGraph:
"""Get the current graph, falling back to default if none set"""
return get_graph_context() or get_default_graph()


class AllowedTypes(Enum):
STRING = "string"
RICH_TEXT = "rich_text"
Expand All @@ -51,29 +42,28 @@ class AllowedValue(ConfigModel):
description: Optional[str] = None


VALID_ENTITY_TYPES_PREFIX_STRING = ", ".join(
[
f"urn:li:entityType:datahub.{x}"
for x in ["dataset", "dashboard", "dataFlow", "schemaField"]
]
)
VALID_ENTITY_TYPES_STRING = f"Valid entity type urns are {VALID_ENTITY_TYPES_PREFIX_STRING}, etc... Ensure that the entity type is valid."
VALID_ENTITY_TYPE_URNS = [
Urn.make_entity_type_urn(entity_type) for entity_type in URN_TYPES.keys()
]
_VALID_ENTITY_TYPES_STRING = f"Valid entity type urns are {', '.join(VALID_ENTITY_TYPE_URNS)}, etc... Ensure that the entity type is valid."


def _validate_entity_type_urn(v: str) -> str:
urn = Urn.make_entity_type_urn(v)
if urn not in VALID_ENTITY_TYPE_URNS:
raise ValueError(
f"Input {v} is not a valid entity type urn. {_VALID_ENTITY_TYPES_STRING}"
)
v = str(urn)
return v


class TypeQualifierAllowedTypes(ConfigModel):
allowed_types: List[str]

@validator("allowed_types", each_item=True)
def validate_allowed_types(cls, v):
if v:
graph = StructuredPropertiesConfig.get_graph_required()
validated_urn = Urn.make_entity_type_urn(v)
if not graph.exists(validated_urn):
raise ValueError(
f"Input {v} is not a valid entity type urn. {VALID_ENTITY_TYPES_STRING}"
)
v = str(validated_urn)
return v
_check_allowed_types = validator("allowed_types", each_item=True, allow_reuse=True)(
_validate_entity_type_urn
)


class StructuredProperties(ConfigModel):
Expand All @@ -90,22 +80,30 @@ class StructuredProperties(ConfigModel):
type_qualifier: Optional[TypeQualifierAllowedTypes] = None
immutable: Optional[bool] = False

@validator("entity_types", each_item=True)
def validate_entity_types(cls, v):
if v:
graph = StructuredPropertiesConfig.get_graph_required()
validated_urn = Urn.make_entity_type_urn(v)
if not graph.exists(validated_urn):
raise ValueError(
f"Input {v} is not a valid entity type urn. {VALID_ENTITY_TYPES_STRING}"
)
v = str(validated_urn)
_check_entity_types = validator("entity_types", each_item=True, allow_reuse=True)(
_validate_entity_type_urn
)

@validator("type")
def validate_type(cls, v: str) -> str:
# Convert to lowercase if needed
if not v.islower():
logger.warning(
f"Structured property type should be lowercase. Updated to {v.lower()}"
)
v = v.lower()

# Check if type is allowed
if not AllowedTypes.check_allowed_type(v):
raise ValueError(
f"Type {v} is not allowed. Allowed types are {AllowedTypes.values()}"
)
return v

@property
def fqn(self) -> str:
assert self.urn is not None
id = Urn.create_from_string(self.urn).get_entity_id()[0]
id = StructuredPropertyUrn.from_string(self.urn).id
if self.qualified_name is not None:
# ensure that qualified name and ID match
assert (
Expand All @@ -122,101 +120,90 @@ def urn_must_be_present(cls, v, values):
return v

@staticmethod
def create(file: str, graph: Optional[DataHubGraph] = None) -> None:
with set_graph_context(graph):
graph = StructuredPropertiesConfig.get_graph_required()

with open(file) as fp:
structuredproperties: List[dict] = yaml.safe_load(fp)
for structuredproperty_raw in structuredproperties:
structuredproperty = StructuredProperties.parse_obj(
structuredproperty_raw
)

if not structuredproperty.type.islower():
structuredproperty.type = structuredproperty.type.lower()
logger.warning(
f"Structured property type should be lowercase. Updated to {structuredproperty.type}"
)
if not AllowedTypes.check_allowed_type(structuredproperty.type):
raise ValueError(
f"Type {structuredproperty.type} is not allowed. Allowed types are {AllowedTypes.values()}"
)
mcp = MetadataChangeProposalWrapper(
entityUrn=structuredproperty.urn,
aspect=StructuredPropertyDefinitionClass(
qualifiedName=structuredproperty.fqn,
valueType=Urn.make_data_type_urn(structuredproperty.type),
displayName=structuredproperty.display_name,
description=structuredproperty.description,
entityTypes=[
Urn.make_entity_type_urn(entity_type)
for entity_type in structuredproperty.entity_types or []
],
cardinality=structuredproperty.cardinality,
immutable=structuredproperty.immutable,
allowedValues=(
[
PropertyValueClass(
value=v.value, description=v.description
)
for v in structuredproperty.allowed_values
]
if structuredproperty.allowed_values
else None
),
typeQualifier=(
{
"allowedTypes": structuredproperty.type_qualifier.allowed_types
}
if structuredproperty.type_qualifier
else None
),
),
)
graph.emit_mcp(mcp)

logger.info(f"Created structured property {structuredproperty.urn}")

@classmethod
def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties":
with set_graph_context(graph):
structured_property: Optional[
StructuredPropertyDefinitionClass
] = graph.get_aspect(urn, StructuredPropertyDefinitionClass)
if structured_property is None:
raise Exception(
"StructuredPropertyDefinition aspect is None. Unable to create structured property."
)
return StructuredProperties(
urn=urn,
qualified_name=structured_property.qualifiedName,
display_name=structured_property.displayName,
type=structured_property.valueType,
description=structured_property.description,
entity_types=structured_property.entityTypes,
cardinality=structured_property.cardinality,
allowed_values=(
def from_yaml(file: str) -> List["StructuredProperties"]:
with open(file) as fp:
structuredproperties: List[dict] = yaml.safe_load(fp)

result: List[StructuredProperties] = []
for structuredproperty_raw in structuredproperties:
result.append(StructuredProperties.parse_obj(structuredproperty_raw))
return result

def generate_mcps(self) -> List[MetadataChangeProposalWrapper]:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=StructuredPropertyDefinitionClass(
qualifiedName=self.fqn,
valueType=Urn.make_data_type_urn(self.type),
displayName=self.display_name,
description=self.description,
entityTypes=[
Urn.make_entity_type_urn(entity_type)
for entity_type in self.entity_types or []
],
cardinality=self.cardinality,
immutable=self.immutable,
allowedValues=(
[
AllowedValue(
value=av.value,
description=av.description,
)
for av in structured_property.allowedValues or []
PropertyValueClass(value=v.value, description=v.description)
for v in self.allowed_values
]
if structured_property.allowedValues is not None
if self.allowed_values
else None
),
type_qualifier=(
{
"allowed_types": structured_property.typeQualifier.get(
"allowedTypes"
)
}
if structured_property.typeQualifier
typeQualifier=(
{"allowedTypes": self.type_qualifier.allowed_types}
if self.type_qualifier
else None
),
),
)
return [mcp]

@staticmethod
def create(file: str, graph: DataHubGraph) -> None:
# TODO: Deprecate this method.
structuredproperties = StructuredProperties.from_yaml(file)
for structuredproperty in structuredproperties:
for mcp in structuredproperty.generate_mcps():
graph.emit_mcp(mcp)

logger.info(f"Created structured property {structuredproperty.urn}")

@classmethod
def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties":
structured_property: Optional[
StructuredPropertyDefinitionClass
] = graph.get_aspect(urn, StructuredPropertyDefinitionClass)
if structured_property is None:
raise Exception(
"StructuredPropertyDefinition aspect is None. Unable to create structured property."
)
return StructuredProperties(
urn=urn,
qualified_name=structured_property.qualifiedName,
display_name=structured_property.displayName,
type=structured_property.valueType,
description=structured_property.description,
entity_types=structured_property.entityTypes,
cardinality=structured_property.cardinality,
allowed_values=(
[
AllowedValue(
value=av.value,
description=av.description,
)
for av in structured_property.allowedValues or []
]
if structured_property.allowedValues is not None
else None
),
type_qualifier=(
{"allowed_types": structured_property.typeQualifier.get("allowedTypes")}
if structured_property.typeQualifier
else None
),
)

def to_yaml(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def properties() -> None:
def upsert(file: Path) -> None:
"""Upsert structured properties in DataHub."""

StructuredProperties.create(str(file))
with get_default_graph() as graph:
StructuredProperties.create(str(file), graph)


@properties.command(
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ def infos(self) -> LossyList[StructuredLogEntry]:

@dataclass
class SourceReport(Report):
event_not_produced_warn: bool = True
events_produced: int = 0
events_produced_per_sec: int = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def auto_workunit_reporter(report: "SourceReport", stream: Iterable[T]) -> Itera
report.report_workunit(wu)
yield wu

if report.events_produced == 0:
if report.event_not_produced_warn and report.events_produced == 0:
report.warning(
title="No metadata was produced by the source",
message="Please check the source configuration, filters, and permissions.",
Expand Down
Loading

0 comments on commit 0ba7b0b

Please sign in to comment.