-
Notifications
You must be signed in to change notification settings - Fork 197
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor LoadInfo metrics layout schema #1046
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
|
||
load_info = pipeline.run(data, table_name="users") | ||
|
||
pipeline.run([load_info], table_name="_load_info") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- please add load_info, normalize_info and extract_info.
- please load to a separate schema (you have schema argument to run()
- use this second schema to compare hashes (mind that it wont be a default)
@@ -61,8 +61,12 @@ class _StepInfo(NamedTuple): | |||
class StepMetrics(TypedDict): | |||
"""Metrics for particular package processed in particular pipeline step""" | |||
|
|||
load_id: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is OK and overall even better than a dictionary. But in general the shape of the data is changed in asdict()
like here:
def asdict(self) -> DictStrAny:
# to be mixed with NamedTuple
d: DictStrAny = self._asdict() # type: ignore
d["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name}
d["load_packages"] = [package.asdict() for package in self.load_packages]
if self.metrics:
d["started_at"] = self.started_at
d["finished_at"] = self.finished_at
return d
and the problem was that we didn't reformat metric
to convert form dict to list.
dataset_name="mydata", | ||
) | ||
|
||
load_info = pipeline.run(data, table_name="users") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please load something more complicated than this. ie. source with a resource that have several hints
pipeline.run([load_info], table_name="_load_info") | ||
first_version_hash = pipeline.default_schema.version_hash | ||
|
||
load_info = pipeline.run(data, table_name="users") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here let's load again but we should add another source with different resource and some schema hints
first_version_hash = pipeline.default_schema.version_hash | ||
|
||
load_info = pipeline.run(data, table_name="users") | ||
pipeline.run([load_info], table_name="_load_info") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you may have a schema difference when loading extract_info and the new resource has a new hint type. then indeed we may add column dynamically.
closing in favor of #1051 |
This issue was reported by a community member in slack and related issue #1043
When we capture load_info data in the destination database the following occurs:
TODO
_ExtractInfo.metrics
fromDict[str, List[ExtractMetrics]]
to justList[ExtractMetrics]
,load_id
field toStepMetrics
,