From 1848d4d44b395b5e870f5d79d178d9e2d09603d6 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Tue, 5 Mar 2024 13:27:27 +0100 Subject: [PATCH] Enrich metrics with load_id in StepInfo.asdict --- dlt/common/pipeline.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index a215ff2de6..df221ec703 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -102,16 +102,19 @@ def finished_at(self) -> datetime.datetime: def asdict(self) -> DictStrAny: # to be mixed with NamedTuple - d: DictStrAny = self._asdict() # type: ignore - d["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name} - d["load_packages"] = [package.asdict() for package in self.load_packages] + step_info: DictStrAny = self._asdict() # type: ignore + step_info["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name} + step_info["load_packages"] = [package.asdict() for package in self.load_packages] if self.metrics: - d["started_at"] = self.started_at - d["finished_at"] = self.finished_at - d["metrics"] = [ - {**dict(metric), "load_id": load_id} for load_id, metric in self.metrics.items() - ] - return d + step_info["started_at"] = self.started_at + step_info["finished_at"] = self.finished_at + all_metrics = [] + for load_id, metrics in step_info["metrics"].items(): + for metric in metrics: + all_metrics.append({**dict(metric), "load_id": load_id}) + + step_info["metrics"] = all_metrics + return step_info def __str__(self) -> str: return self.asstr(verbosity=0) @@ -527,8 +530,7 @@ def run( schema: Schema = None, loader_file_format: TLoaderFileFormat = None, schema_contract: TSchemaContract = None, - ) -> LoadInfo: - ... + ) -> LoadInfo: ... def _set_context(self, is_active: bool) -> None: """Called when pipeline context activated or deactivate""" @@ -550,8 +552,7 @@ def __call__( schema: Schema = None, loader_file_format: TLoaderFileFormat = None, schema_contract: TSchemaContract = None, - ) -> LoadInfo: - ... + ) -> LoadInfo: ... @configspec @@ -601,8 +602,7 @@ class StateInjectableContext(ContainerInjectableContext): if TYPE_CHECKING: - def __init__(self, state: TPipelineState = None) -> None: - ... + def __init__(self, state: TPipelineState = None) -> None: ... def pipeline_state(