Skip to content

Commit

Permalink
make it work for partitions?
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 19, 2024
1 parent 5895144 commit c842b92
Showing 1 changed file with 47 additions and 12 deletions.
59 changes: 47 additions & 12 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from inspect import (
_empty as EmptyAnnotation,
)
import pstats
from typing import (
AbstractSet,
Any,
Expand Down Expand Up @@ -1407,6 +1406,8 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None:
op_execution_context, "op_execution_context", OpExecutionContext
)

self._step_execution_context = op_execution_context._step_execution_context # noqa: SLF001

@staticmethod
def get() -> "AssetExecutionContext":
ctx = _current_asset_execution_context.get()
Expand Down Expand Up @@ -1470,31 +1471,65 @@ def job_def(self) -> JobDefinition:
return self.op_execution_context.job_def

@public
def latest_materialization_for_upstream_asset(
self, key: CoercibleToAssetKey
def fetch_latest_materialization_for_upstream_asset(
self, key: CoercibleToAssetKey, partition_key: Optional[str] = None
) -> Optional[AssetMaterialization]:
"""Get the most recent AssetMaterialization event for the key. The key must be an upstream
"""Get the most recent AssetMaterialization event for the key. The key must be a direct upstream
asset for the currently materializing asset. Information like metadata and tags can be found
on the AssetMaterialization. If the key is not an upstream asset of the currently
materializing asset, an error will be raised. If no AssetMaterialization exists for key, None
will be returned.
For partitioned assets, fetching the latest AssetMaterialization for the partition requires a
call to the event log. Additionally, if the currently materializing asset depends on multiple
partitions of the upstream asset, ``partition_key`` must be provided so that the latest AssetMaterialization
is fetched for the correct partition.
Returns: Optional[AssetMaterialization]
"""
asset_key = AssetKey.from_coercible(key)

if (
self.job_def.asset_layer.input_for_asset_key(
self.op_execution_context.node_handle, AssetKey.from_coercible(asset_key)
)
is None
):
raise DagsterInvariantViolationError(
f"Cannot fetch AssetMaterialization for asset {asset_key}. {asset_key} must be a direct upstream dependency"
"in order to call fetch_latest_materialization_for_upstream_asset."
)

materialization_events = (
self.op_execution_context._step_execution_context.upstream_asset_materialization_events # noqa: SLF001
)
if self.has_partition_key:
# if the asset is par
pass
if not self._step_execution_context.has_asset_partitions_for_upstream_asset(asset_key):
return materialization_events.get(asset_key)
else:
if AssetKey.from_coercible(key) in materialization_events.keys():
return materialization_events.get(AssetKey.from_coercible(key))
# if the asset is partitioned, the AssetMaterialization in 'materialization_events' may not correspond to the
# latest AssetMaterialization of the particular partition that is relevant to this materialization. It will
# always be the latest AssetMaterialization regardless of partition. So we need to fetch the latest
# AssetMaterialization for the specific partition key that is relevant to this materialization
if partition_key is None:
partition_keys_for_asset = list(
self._step_execution_context.asset_partitions_subset_for_upstream_asset(
asset_key
).get_partition_keys()
)
if len(partition_keys_for_asset) > 1:
raise DagsterInvariantViolationError(
f"Asset {self.asset_key} depends on multiple partitions of asset {asset_key}."
" fetch_latest_materialization_for_upstream asset must be given"
" a specific partition_key to fetch the AssetMaterialization for."
)
partition_key = partition_keys_for_asset[0]

raise DagsterInvariantViolationError(
f"Cannot fetch AssetMaterialization for asset {key}. {key} must be an upstream dependency"
"in order to call latest_materialization_for_upstream_asset."
event = self.instance.get_latest_data_version_record(
asset_key, partition_key=partition_key
)
if event is None:
return event
return event.asset_materialization if event.asset_materialization else None

######## Deprecated methods

Expand Down

0 comments on commit c842b92

Please sign in to comment.