Skip to content

Commit

Permalink
move tags to _store_output
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 30, 2024
1 parent 3b4b73f commit ef85e4f
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,6 @@ def _get_output_asset_materializations(
output_def: OutputDefinition,
io_manager_metadata: Mapping[str, MetadataValue],
step_context: StepExecutionContext,
handled_none_output: bool = False,
) -> Iterator[AssetMaterialization]:
all_metadata = {**output.metadata, **io_manager_metadata}

Expand Down Expand Up @@ -622,7 +621,7 @@ def _get_output_asset_materializations(
if backfill_id:
tags[BACKFILL_ID_TAG] = backfill_id

if handled_none_output:
if output.value is None:
tags[NONE_OUTPUT_TAG] = "True"

if asset_partitions:
Expand Down Expand Up @@ -715,7 +714,6 @@ def _store_output(

manager_materializations = []
manager_metadata: Dict[str, MetadataValue] = {}
handled_none_output = False

# don't store asset check outputs, asset observation outputs, or Nothing type outputs
step_output = step_context.step.step_output_named(step_output_handle.output_name)
Expand All @@ -730,7 +728,6 @@ def _store_output(
output=output,
output_def=output_def,
manager_metadata={},
handled_none_output=True,
)
# otherwise invoke the I/O manager
else:
Expand Down Expand Up @@ -786,9 +783,6 @@ def _gen_fn():
yield event

manager_metadata = {**manager_metadata, **output_context.consume_logged_metadata()}
if manager_metadata.get(NONE_OUTPUT_TAG):
handled_none_output = True
del manager_metadata[NONE_OUTPUT_TAG]

# do not alter explicitly created AssetMaterializations
for mgr_materialization in manager_materializations:
Expand Down Expand Up @@ -820,7 +814,6 @@ def _gen_fn():
output=output,
output_def=output_def,
manager_metadata=manager_metadata,
handled_none_output=handled_none_output,
)

yield DagsterEvent.handled_output(
Expand All @@ -832,7 +825,7 @@ def _gen_fn():


def _log_asset_materialization_events_for_asset(
step_context, output_context, output, output_def, manager_metadata, handled_none_output
step_context, output_context, output, output_def, manager_metadata
):
asset_key, partitions = _materializing_asset_key_and_partitions_for_output(output_context)
if asset_key:
Expand Down Expand Up @@ -863,7 +856,6 @@ def _log_asset_materialization_events_for_asset(
output_def,
manager_metadata,
step_context,
handled_none_output=handled_none_output,
)
)
if execution_type == AssetExecutionType.MATERIALIZATION
Expand Down
18 changes: 4 additions & 14 deletions python_modules/dagster/dagster/_core/storage/upath_io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ def _load_partition_from_path_or_none(
def _load_none_based_on_tags(
self, context: InputContext, partition_key: Optional[str] = None
) -> bool:
if context.has_asset_key:
if context.has_asset_key and context.step_context is not None:
# If the upstream step is an asset and the output value was None, then there will
# be a tag marking that. If that tag exists, we want to provide None to the
# materializing asset.
Expand Down Expand Up @@ -466,19 +466,9 @@ def load_input(self, context: InputContext) -> Union[Any, Dict[str, Any]]:

def handle_output(self, context: OutputContext, obj: Any):
if obj is None and context.has_asset_key:
# If the step returns None and is an asset, mark via tag on the AssetMaterialization so
# that we can check this tag at load time to know to provide None
# context.add_output_metadata({"output_is_none": True})

# Adding the tags here does nothing since we manually make the AssetMaterialization for hte
# asset in execute_step line 846. We need to find a way to tell execute step to add this tag

# options:
# special metadata that we add here and then remove in execute step, then convert to tag

# return AssetMaterialization(asset_key=context.asset_key, tags={NONE_OUTPUT_TAG: "True"})

context.add_output_metadata({NONE_OUTPUT_TAG: True})
# If the step returns None and is an asset, do not store the None in the file system
# The main execution path will add a tag to the AssetMaterialization that we can read
# at load time to provide a None
return None
if context.has_asset_partitions:
paths = self._get_paths_for_partitions(context)
Expand Down

0 comments on commit ef85e4f

Please sign in to comment.