Skip to content

Commit

Permalink
move to making the materailization event avialable
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 2, 2024
1 parent 2b98100 commit c7d13f2
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1359,8 +1359,12 @@ def get() -> "AssetExecutionContext":
def get_op_execution_context(self) -> "OpExecutionContext":
return OpExecutionContext(self._step_execution_context)

def get_metadata_for_asset(self, key: CoercibleToAssetKey):
return self._step_execution_context._upstream_metadata.get(AssetKey.from_coercible(key), {}) # noqa: SLF001
def latest_materialization_event(
self, key: CoercibleToAssetKey
) -> Optional[AssetMaterialization]:
return self._step_execution_context.latest_materialization_event.get(
AssetKey.from_coercible(key)
)


@contextmanager
Expand Down
23 changes: 14 additions & 9 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)

import dagster._check as check
from dagster import AssetMaterialization
from dagster._annotations import public
from dagster._core.definitions.data_version import (
DATA_VERSION_TAG,
Expand Down Expand Up @@ -77,7 +78,6 @@
DataVersion,
)
from dagster._core.definitions.dependency import NodeHandle
from dagster._core.definitions.metadata import MetadataValue
from dagster._core.definitions.resource_definition import Resources
from dagster._core.event_api import EventLogRecord
from dagster._core.execution.plan.plan import ExecutionPlan
Expand Down Expand Up @@ -572,7 +572,7 @@ def __init__(
self._output_metadata: Dict[str, Any] = {}
self._seen_outputs: Dict[str, Union[str, Set[str]]] = {}

self._upstream_metadata: Dict[AssetKey, Mapping[str, MetadataValue]] = {}
self.latest_materialization_event: Dict[AssetKey, Optional[AssetMaterialization]] = {}

self._input_asset_version_info: Dict[AssetKey, Optional["InputAssetVersionInfo"]] = {}
self._is_external_input_asset_version_info_loaded = False
Expand Down Expand Up @@ -958,11 +958,16 @@ def is_external_input_asset_version_info_loaded(self) -> bool:

def get_input_asset_version_info(self, key: AssetKey) -> Optional["InputAssetVersionInfo"]:
if key not in self._input_asset_version_info:
self._fetch_input_asset_metadata_and_version_info(key)
self._fetch_input_asset_materialization_and_version_info(key)
return self._input_asset_version_info[key]

# "external" refers to records for inputs generated outside of this step
def fetch_external_input_asset_version_info_and_metadata(self) -> None:
def fetch_external_input_asset_materialization_and_version_info(self) -> None:
"""Fetches the latest observation or materialization for each upstream dependency
in order to determine the version info. As a side effect we create a dictionary
of the materialization events so that the AssetContext can access the latest materialization
event for each upstream asset.
"""
output_keys = self.get_output_asset_keys()

all_dep_keys: List[AssetKey] = []
Expand All @@ -976,21 +981,21 @@ def fetch_external_input_asset_version_info_and_metadata(self) -> None:

self._input_asset_version_info = {}
for key in all_dep_keys:
self._fetch_input_asset_metadata_and_version_info(key)
self._fetch_input_asset_materialization_and_version_info(key)
self._is_external_input_asset_version_info_loaded = True

def _fetch_input_asset_metadata_and_version_info(self, key: AssetKey) -> None:
def _fetch_input_asset_materialization_and_version_info(self, key: AssetKey) -> None:
from dagster._core.definitions.data_version import (
extract_data_version_from_entry,
)

event = self._get_input_asset_event(key)
if event is None:
self._input_asset_version_info[key] = None
self._upstream_metadata[key] = {}
self.latest_materialization_event[key] = None
else:
self._upstream_metadata[key] = (
event.asset_materialization.metadata if event.asset_materialization else {}
self.latest_materialization_event[key] = (
event.asset_materialization if event.asset_materialization else None
)
storage_id = event.storage_id
# Input name will be none if this is an internal dep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,8 @@ def core_dagster_event_sequence_for_step(
inputs = {}

if step_context.is_sda_step:
step_context.fetch_external_input_asset_version_info_and_metadata()
step_context.fetch_external_input_asset_materialization_and_version_info()
# step_context.fetch_external_input_asset_materializations_and_observations

for step_input in step_context.step.step_inputs:
input_def = step_context.op_def.input_def_named(step_input.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,9 @@ def upstream(context: AssetExecutionContext):

@asset
def downstream(context: AssetExecutionContext, upstream):
metadata = context.get_metadata_for_asset("upstream")
assert metadata["foo"].value == "bar"
mat = context.latest_materialization_event("upstream")
assert mat is not None
assert mat.metadata["foo"].value == "bar"

materialize([upstream, downstream])

Expand All @@ -451,7 +452,8 @@ def upstream():

@asset
def downstream(context: AssetExecutionContext, upstream):
metadata = context.get_metadata_for_asset("upstream")
assert metadata["foo"].value == "bar"
mat = context.latest_materialization_event("upstream")
assert mat is not None
assert mat.metadata["foo"].value == "bar"

materialize([upstream, downstream])

0 comments on commit c7d13f2

Please sign in to comment.