Skip to content

Commit

Permalink
Enrich metrics with load_id in StepInfo.asdict
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Mar 5, 2024
1 parent b5a5f84 commit 1848d4d
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,19 @@ def finished_at(self) -> datetime.datetime:

def asdict(self) -> DictStrAny:
# to be mixed with NamedTuple
d: DictStrAny = self._asdict() # type: ignore
d["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name}
d["load_packages"] = [package.asdict() for package in self.load_packages]
step_info: DictStrAny = self._asdict() # type: ignore
step_info["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name}
step_info["load_packages"] = [package.asdict() for package in self.load_packages]
if self.metrics:
d["started_at"] = self.started_at
d["finished_at"] = self.finished_at
d["metrics"] = [
{**dict(metric), "load_id": load_id} for load_id, metric in self.metrics.items()
]
return d
step_info["started_at"] = self.started_at
step_info["finished_at"] = self.finished_at
all_metrics = []
for load_id, metrics in step_info["metrics"].items():
for metric in metrics:
all_metrics.append({**dict(metric), "load_id": load_id})

step_info["metrics"] = all_metrics
return step_info

def __str__(self) -> str:
return self.asstr(verbosity=0)
Expand Down Expand Up @@ -527,8 +530,7 @@ def run(
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None,
schema_contract: TSchemaContract = None,
) -> LoadInfo:
...
) -> LoadInfo: ...

def _set_context(self, is_active: bool) -> None:
"""Called when pipeline context activated or deactivate"""
Expand All @@ -550,8 +552,7 @@ def __call__(
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None,
schema_contract: TSchemaContract = None,
) -> LoadInfo:
...
) -> LoadInfo: ...


@configspec
Expand Down Expand Up @@ -601,8 +602,7 @@ class StateInjectableContext(ContainerInjectableContext):

if TYPE_CHECKING:

def __init__(self, state: TPipelineState = None) -> None:
...
def __init__(self, state: TPipelineState = None) -> None: ...


def pipeline_state(
Expand Down

0 comments on commit 1848d4d

Please sign in to comment.