Skip to content

Commit

Permalink
fix(ingest/databricks): Fix profiling (#12060)
Browse files Browse the repository at this point in the history
  • Loading branch information
skrydal authored Dec 20, 2024
1 parent 1570139 commit e52a4de
Show file tree
Hide file tree
Showing 4 changed files with 462 additions and 1 deletion.
17 changes: 16 additions & 1 deletion metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ def emit_mcps(
mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
async_flag: Optional[bool] = None,
) -> int:
logger.debug("Attempting to emit batch mcps")
url = f"{self._gms_server}/aspects?action=ingestProposalBatch"
for mcp in mcps:
ensure_has_system_metadata(mcp)
Expand All @@ -303,15 +304,22 @@ def emit_mcps(
current_chunk_size = INGEST_MAX_PAYLOAD_BYTES
for mcp_obj in mcp_objs:
mcp_obj_size = len(json.dumps(mcp_obj))
logger.debug(
f"Iterating through object with size {mcp_obj_size} (type: {mcp_obj.get('aspectName')}"
)

if (
mcp_obj_size + current_chunk_size > INGEST_MAX_PAYLOAD_BYTES
or len(mcp_obj_chunks[-1]) >= BATCH_INGEST_MAX_PAYLOAD_LENGTH
):
logger.debug("Decided to create new chunk")
mcp_obj_chunks.append([])
current_chunk_size = 0
mcp_obj_chunks[-1].append(mcp_obj)
current_chunk_size += mcp_obj_size
logger.debug(
f"Decided to send {len(mcps)} mcps in {len(mcp_obj_chunks)} chunks"
)

for mcp_obj_chunk in mcp_obj_chunks:
# TODO: We're calling json.dumps on each MCP object twice, once to estimate
Expand All @@ -338,8 +346,15 @@ def emit_usage(self, usageStats: UsageAggregation) -> None:

def _emit_generic(self, url: str, payload: str) -> None:
curl_command = make_curl_command(self._session, "POST", url, payload)
payload_size = len(payload)
if payload_size > INGEST_MAX_PAYLOAD_BYTES:
# since we know total payload size here, we could simply avoid sending such payload at all and report a warning, with current approach we are going to cause whole ingestion to fail
logger.warning(
f"Apparent payload size exceeded {INGEST_MAX_PAYLOAD_BYTES}, might fail with an exception due to the size"
)
logger.debug(
"Attempting to emit to DataHub GMS; using curl equivalent to:\n%s",
"Attempting to emit aspect (size: %s) to DataHub GMS; using curl equivalent to:\n%s",
payload_size,
curl_command,
)
try:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import json
import logging
from typing import Iterable, List

from datahub.emitter.rest_emitter import INGEST_MAX_PAYLOAD_BYTES
from datahub.emitter.serialization_helper import pre_json_transform
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
DatasetProfileClass,
SchemaFieldClass,
SchemaMetadataClass,
)

logger = logging.getLogger(__name__)


class EnsureAspectSizeProcessor:
def __init__(
self, report: SourceReport, payload_constraint: int = INGEST_MAX_PAYLOAD_BYTES
):
self.report = report
self.payload_constraint = payload_constraint

def ensure_dataset_profile_size(
self, dataset_urn: str, profile: DatasetProfileClass
) -> None:
"""
This is quite arbitrary approach to ensuring dataset profile aspect does not exceed allowed size, might be adjusted
in the future
"""
sample_fields_size = 0
if profile.fieldProfiles:
logger.debug(f"Length of field profiles: {len(profile.fieldProfiles)}")
for field in profile.fieldProfiles:
if field.sampleValues:
values_len = 0
for value in field.sampleValues:
if value:
values_len += len(value)
logger.debug(
f"Field {field.fieldPath} has {len(field.sampleValues)} sample values, taking total bytes {values_len}"
)
if sample_fields_size + values_len > self.payload_constraint:
field.sampleValues = []
self.report.warning(
title="Dataset profile truncated due to size constraint",
message="Dataset profile contained too much data and would have caused ingestion to fail",
context=f"Sample values for field {field.fieldPath} were removed from dataset profile for {dataset_urn} due to aspect size constraints",
)
else:
sample_fields_size += values_len
else:
logger.debug(f"Field {field.fieldPath} has no sample values")

def ensure_schema_metadata_size(
self, dataset_urn: str, schema: SchemaMetadataClass
) -> None:
"""
This is quite arbitrary approach to ensuring schema metadata aspect does not exceed allowed size, might be adjusted
in the future
"""
total_fields_size = 0
logger.debug(f"Amount of schema fields: {len(schema.fields)}")
accepted_fields: List[SchemaFieldClass] = []
for field in schema.fields:
field_size = len(json.dumps(pre_json_transform(field.to_obj())))
logger.debug(f"Field {field.fieldPath} takes total {field_size}")
if total_fields_size + field_size < self.payload_constraint:
accepted_fields.append(field)
total_fields_size += field_size
else:
self.report.warning(
title="Schema truncated due to size constraint",
message="Dataset schema contained too much data and would have caused ingestion to fail",
context=f"Field {field.fieldPath} was removed from schema for {dataset_urn} due to aspect size constraints",
)

schema.fields = accepted_fields

def ensure_aspect_size(
self,
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
"""
We have hard limitation of aspect size being 16 MB. Some aspects can exceed that value causing an exception
on GMS side and failure of the entire ingestion. This processor will attempt to trim suspected aspects.
"""
for wu in stream:
logger.debug(f"Ensuring size of workunit: {wu.id}")

if schema := wu.get_aspect_of_type(SchemaMetadataClass):
self.ensure_schema_metadata_size(wu.get_urn(), schema)
elif profile := wu.get_aspect_of_type(DatasetProfileClass):
self.ensure_dataset_profile_size(wu.get_urn(), profile)
yield wu
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
gen_containers,
)
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size import (
EnsureAspectSizeProcessor,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand Down Expand Up @@ -260,6 +263,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
EnsureAspectSizeProcessor(self.get_report()).ensure_aspect_size,
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
Expand Down
Loading

0 comments on commit e52a4de

Please sign in to comment.