Skip to content

Commit

Permalink
feat(ingestion/iceberg): Improve iceberg connector logging (#12317)
Browse files Browse the repository at this point in the history
  • Loading branch information
skrydal authored Jan 13, 2025
1 parent 8d48622 commit 457f96e
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 13 deletions.
13 changes: 10 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
with PerfTimer() as timer:
table = thread_local.local_catalog.load_table(dataset_path)
time_taken = timer.elapsed_seconds()
self.report.report_table_load_time(time_taken)
self.report.report_table_load_time(
time_taken, dataset_name, table.metadata_location
)
LOGGER.debug(f"Loaded table: {table.name()}, time taken: {time_taken}")
yield from self._create_iceberg_workunit(dataset_name, table)
except NoSuchPropertyException as e:
Expand Down Expand Up @@ -247,7 +249,10 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
f"Iceberg Rest Catalog server error (500 status) encountered when processing table {dataset_path}, skipping it."
)
except Exception as e:
self.report.report_failure("general", f"Failed to create workunit: {e}")
self.report.report_failure(
"general",
f"Failed to create workunit for dataset {dataset_name}: {e}",
)
LOGGER.exception(
f"Exception while processing table {dataset_path}, skipping it.",
)
Expand Down Expand Up @@ -312,7 +317,9 @@ def _create_iceberg_workunit(
dataset_snapshot.aspects.append(schema_metadata)

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
self.report.report_table_processing_time(timer.elapsed_seconds())
self.report.report_table_processing_time(
timer.elapsed_seconds(), dataset_name, table.metadata_location
)
yield MetadataWorkUnit(id=dataset_name, mce=mce)

dpi_aspect = self._get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from humanfriendly import format_timespan
from pydantic import Field, validator
from pyiceberg.catalog import Catalog, load_catalog
from sortedcontainers import SortedList

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DatasetSourceConfigMixin
Expand Down Expand Up @@ -146,19 +147,40 @@ def get_catalog(self) -> Catalog:
return load_catalog(name=catalog_name, **catalog_config)


class TopTableTimings:
_VALUE_FIELD: str = "timing"
top_entites: SortedList
_size: int

def __init__(self, size: int = 10):
self._size = size
self.top_entites = SortedList(key=lambda x: -x.get(self._VALUE_FIELD, 0))

def add(self, entity: Dict[str, Any]) -> None:
if self._VALUE_FIELD not in entity:
return
self.top_entites.add(entity)
if len(self.top_entites) > self._size:
self.top_entites.pop()

def __str__(self) -> str:
if len(self.top_entites) == 0:
return "no timings reported"
return str(list(self.top_entites))


class TimingClass:
times: List[int]
times: SortedList

def __init__(self):
self.times = []
self.times = SortedList()

def add_timing(self, t):
self.times.append(t)
def add_timing(self, t: float) -> None:
self.times.add(t)

def __str__(self):
def __str__(self) -> str:
if len(self.times) == 0:
return "no timings reported"
self.times.sort()
total = sum(self.times)
avg = total / len(self.times)
return str(
Expand All @@ -180,6 +202,9 @@ class IcebergSourceReport(StaleEntityRemovalSourceReport):
load_table_timings: TimingClass = field(default_factory=TimingClass)
processing_table_timings: TimingClass = field(default_factory=TimingClass)
profiling_table_timings: TimingClass = field(default_factory=TimingClass)
tables_load_timings: TopTableTimings = field(default_factory=TopTableTimings)
tables_profile_timings: TopTableTimings = field(default_factory=TopTableTimings)
tables_process_timings: TopTableTimings = field(default_factory=TopTableTimings)
listed_namespaces: int = 0
total_listed_tables: int = 0
tables_listed_per_namespace: TopKDict[str, int] = field(
Expand All @@ -201,11 +226,26 @@ def report_table_scanned(self, name: str) -> None:
def report_dropped(self, ent_name: str) -> None:
self.filtered.append(ent_name)

def report_table_load_time(self, t: float) -> None:
def report_table_load_time(
self, t: float, table_name: str, table_metadata_location: str
) -> None:
self.load_table_timings.add_timing(t)
self.tables_load_timings.add(
{"table": table_name, "timing": t, "metadata_file": table_metadata_location}
)

def report_table_processing_time(self, t: float) -> None:
def report_table_processing_time(
self, t: float, table_name: str, table_metadata_location: str
) -> None:
self.processing_table_timings.add_timing(t)
self.tables_process_timings.add(
{"table": table_name, "timing": t, "metadata_file": table_metadata_location}
)

def report_table_profiling_time(self, t: float) -> None:
def report_table_profiling_time(
self, t: float, table_name: str, table_metadata_location: str
) -> None:
self.profiling_table_timings.add_timing(t)
self.tables_profile_timings.add(
{"table": table_name, "timing": t, "metadata_file": table_metadata_location}
)
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ def profile_table(
)
dataset_profile.fieldProfiles.append(column_profile)
time_taken = timer.elapsed_seconds()
self.report.report_table_profiling_time(time_taken)
self.report.report_table_profiling_time(
time_taken, dataset_name, table.metadata_location
)
LOGGER.debug(
f"Finished profiling of dataset: {dataset_name} in {time_taken}"
)
Expand Down

0 comments on commit 457f96e

Please sign in to comment.