Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support partitioned assets for fetching latest AssetMaterializations #19286

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 49 additions & 9 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1408,6 +1408,8 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None:
)
self._step_execution_context = self._op_execution_context._step_execution_context # noqa: SLF001

self._step_execution_context = op_execution_context._step_execution_context # noqa: SLF001

@staticmethod
def get() -> "AssetExecutionContext":
ctx = _current_asset_execution_context.get()
Expand Down Expand Up @@ -1471,27 +1473,65 @@ def job_def(self) -> JobDefinition:
return self.op_execution_context.job_def

@public
def latest_materialization_for_upstream_asset(
self, key: CoercibleToAssetKey
def fetch_latest_materialization_for_upstream_asset(
self, key: CoercibleToAssetKey, partition_key: Optional[str] = None
) -> Optional[AssetMaterialization]:
"""Get the most recent AssetMaterialization event for the key. The key must be an upstream
"""Get the most recent AssetMaterialization event for the key. The key must be a direct upstream
asset for the currently materializing asset. Information like metadata and tags can be found
on the AssetMaterialization. If the key is not an upstream asset of the currently
materializing asset, an error will be raised. If no AssetMaterialization exists for key, None
will be returned.

For partitioned assets, fetching the latest AssetMaterialization for the partition requires a
call to the event log. Additionally, if the currently materializing asset depends on multiple
partitions of the upstream asset, ``partition_key`` must be provided so that the latest AssetMaterialization
is fetched for the correct partition.

Returns: Optional[AssetMaterialization]
"""
asset_key = AssetKey.from_coercible(key)

if (
self.job_def.asset_layer.input_for_asset_key(
self.op_execution_context.node_handle, AssetKey.from_coercible(asset_key)
)
is None
):
raise DagsterInvariantViolationError(
f"Cannot fetch AssetMaterialization for asset {asset_key}. {asset_key} must be a direct upstream dependency"
"in order to call fetch_latest_materialization_for_upstream_asset."
)

materialization_events = (
self.op_execution_context._step_execution_context.upstream_asset_materialization_events # noqa: SLF001
)
if AssetKey.from_coercible(key) in materialization_events.keys():
return materialization_events.get(AssetKey.from_coercible(key))
if not self._step_execution_context.has_asset_partitions_for_upstream_asset(asset_key):
return materialization_events.get(asset_key)
else:
# if the asset is partitioned, the AssetMaterialization in 'materialization_events' may not correspond to the
# latest AssetMaterialization of the particular partition that is relevant to this materialization. It will
# always be the latest AssetMaterialization regardless of partition. So we need to fetch the latest
# AssetMaterialization for the specific partition key that is relevant to this materialization
if partition_key is None:
partition_keys_for_asset = list(
self._step_execution_context.asset_partitions_subset_for_upstream_asset(
asset_key
).get_partition_keys()
)
if len(partition_keys_for_asset) > 1:
raise DagsterInvariantViolationError(
f"Asset {self.asset_key} depends on multiple partitions of asset {asset_key}."
" fetch_latest_materialization_for_upstream asset must be given"
" a specific partition_key to fetch the AssetMaterialization for."
)
partition_key = partition_keys_for_asset[0]

raise DagsterInvariantViolationError(
f"Cannot fetch AssetMaterialization for asset {key}. {key} must be an upstream dependency"
"in order to call latest_materialization_for_upstream_asset."
)
event = self.instance.get_latest_data_version_record(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

starting a thread to discuss adding this DB access path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my major concern with not adding this access is that we could be providing inaccurate information to users who are working with partitioned assets

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and a concern with adding it is that a user could iterate through a big list of assets and fetch the latest materialization for each and break something

asset_key, partition_key=partition_key
)
if event is None:
return event
return event.asset_materialization if event.asset_materialization else None

######## Deprecated methods

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@
import pytest
from dagster import (
AssetCheckResult,
AssetDep,
AssetExecutionContext,
AssetKey,
AssetOut,
DagsterInstance,
DailyPartitionsDefinition,
Definitions,
GraphDefinition,
MaterializeResult,
OpExecutionContext,
Output,
TimeWindowPartitionMapping,
asset,
asset_check,
graph_asset,
graph_multi_asset,
instance_for_test,
job,
materialize,
multi_asset,
Expand Down Expand Up @@ -437,7 +442,7 @@ def upstream(context: AssetExecutionContext):

@asset
def downstream(context: AssetExecutionContext, upstream):
mat = context.latest_materialization_for_upstream_asset("upstream")
mat = context.fetch_latest_materialization_for_upstream_asset("upstream")
assert mat is not None
assert mat.metadata["foo"].value == "bar"

Expand All @@ -452,8 +457,80 @@ def upstream():

@asset
def downstream(context: AssetExecutionContext, upstream):
mat = context.latest_materialization_for_upstream_asset("upstream")
mat = context.fetch_latest_materialization_for_upstream_asset("upstream")
assert mat is not None
assert mat.metadata["foo"].value == "bar"

materialize([upstream, downstream])


def test_upstream_metadata_with_partitions():
partitions_def = DailyPartitionsDefinition(start_date="2024-01-01")
first_materialization_partition_key = "2024-01-01"
second_materialization_partition_key = "2024-01-02"

@asset(partitions_def=partitions_def)
def upstream(context):
return MaterializeResult(metadata={"partition_key": context.partition_key})

@asset(partitions_def=partitions_def, deps=[upstream])
def downstream(context: AssetExecutionContext):
mat = context.fetch_latest_materialization_for_upstream_asset("upstream")
assert mat is not None
assert mat.metadata["partition_key"].value == context.partition_key

# upstream_asset_materialization_events has the latest AssetMaterialization and is not scoped to
# a specific partition
latest_mat = context._step_execution_context.upstream_asset_materialization_events.get( # noqa: SLF001
AssetKey.from_coercible("upstream")
)
assert latest_mat is not None
assert latest_mat.metadata["partition_key"].value != context.partition_key
assert latest_mat.metadata["partition_key"].value == second_materialization_partition_key

with instance_for_test() as instance:
materialize(
[upstream], partition_key=first_materialization_partition_key, instance=instance
)

# the metadata from this materialization will be what's in upstream_asset_materialization_events
materialize(
[upstream], partition_key=second_materialization_partition_key, instance=instance
)

# asserts that fetch_latest_materialization_for_upstream_asset gets the AssetMaterialization
# scoped to the correct partition, not just the latest AssetMaterialization
materialize(
[downstream], partition_key=first_materialization_partition_key, instance=instance
)


def test_upstream_metadata_with_partition_mapping():
partitions_def = DailyPartitionsDefinition(start_date="2024-01-01")

@asset(partitions_def=partitions_def)
def upstream(context):
return MaterializeResult(metadata={"partition_key": context.partition_key})

@asset(
partitions_def=partitions_def,
deps=[
AssetDep(asset=upstream, partition_mapping=TimeWindowPartitionMapping(start_offset=-2))
],
)
def downstream(context: AssetExecutionContext):
with pytest.raises(DagsterInvariantViolationError):
context.fetch_latest_materialization_for_upstream_asset("upstream")

mat = context.fetch_latest_materialization_for_upstream_asset(
"upstream", partition_key="2024-01-01"
)
assert mat is not None
assert mat.metadata["partition_key"].value == "2024-01-01"

with instance_for_test() as instance:
materialize([upstream], partition_key="2024-01-01", instance=instance)
materialize([upstream], partition_key="2024-01-02", instance=instance)
materialize([upstream], partition_key="2024-01-03", instance=instance)

materialize([downstream], partition_key="2024-01-03", instance=instance)