Skip to content

Commit

Permalink
Add explicit_mode to compute contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Sep 22, 2023
1 parent c402db0 commit 7124a54
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 5 deletions.
14 changes: 13 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
)
from dagster._core.selector.subset_selector import AssetSelectionData

from ..errors import DagsterInvalidSubsetError
from ..errors import (
DagsterInvalidSubsetError,
DagsterInvariantViolationError,
)
from .config import ConfigMapping
from .dependency import NodeHandle, NodeInputHandle, NodeOutput, NodeOutputHandle
from .events import AssetKey
Expand Down Expand Up @@ -606,6 +609,15 @@ def node_output_handle_for_asset(self, asset_key: AssetKey) -> NodeOutputHandle:
def assets_def_for_node(self, node_handle: NodeHandle) -> Optional["AssetsDefinition"]:
return self.assets_defs_by_node_handle.get(node_handle)

def asset_key_for_node(self, node_handle: NodeHandle) -> AssetKey:
assets_def = self.assets_def_for_node(node_handle)
if not assets_def or len(assets_def.keys_by_output_name.keys()) > 1:
raise DagsterInvariantViolationError(
"Cannot call `asset_key_for_node` in a multi_asset with more than one asset."
" Multiple asset keys defined."
)
return next(iter(assets_def.keys_by_output_name.values()))

def asset_check_specs_for_node(self, node_handle: NodeHandle) -> Sequence[AssetCheckSpec]:
assets_def_for_node = self.assets_def_for_node(node_handle)
checks_def_for_node = self.asset_checks_def_for_node(node_handle)
Expand Down
13 changes: 13 additions & 0 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,19 @@ def asset_check_spec(self) -> AssetCheckSpec:
)
return asset_checks_def.spec

# In this mode no conversion is done on returned values and missing but expected outputs are not
# allowed.
@property
def has_require_typed_event_stream(self) -> bool:
return self._step_execution_context.has_require_typed_event_stream

@property
def typed_event_stream_error_message(self) -> Optional[str]:
return self._step_execution_context.typed_event_stream_error_message

def set_require_typed_event_stream(self, *, error_message: Optional[str] = None) -> None:
self._step_execution_context.set_require_typed_event_stream(error_message=error_message)


# actually forking the object type for assets is tricky for users in the cases of:
# * manually constructing ops to make AssetsDefinitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ def __init__(
self._partition_key = partition_key
self._partition_key_range = partition_key_range
self._assets_def = assets_def
self._require_typed_event_stream = False
self._typed_event_stream_error_message = None

@property
def op_config(self) -> Any:
Expand Down Expand Up @@ -714,6 +716,20 @@ def add_metadata_two_outputs(context) -> Tuple[str, int]:
else:
self._output_metadata[output_name] = metadata

# In this mode no conversion is done on returned values and missing but expected outputs are not
# allowed.
@property
def has_require_typed_event_stream(self) -> bool:
return self._require_typed_event_stream

@property
def typed_event_stream_error_message(self) -> Optional[str]:
return self._typed_event_stream_error_message

def set_require_typed_event_stream(self, *, error_message: Optional[str]) -> None:
self._require_typed_event_stream = True
self._typed_event_stream_error_message = error_message


def build_op_context(
resources: Optional[Mapping[str, Any]] = None,
Expand Down
18 changes: 18 additions & 0 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,24 @@ def __init__(
self._is_external_input_asset_version_info_loaded = False
self._data_version_cache: Dict[AssetKey, "DataVersion"] = {}

self._require_typed_event_stream = False
self._typed_event_stream_error_message = None

# In this mode no conversion is done on returned values and missing but expected outputs are not
# allowed.
@property
def has_require_typed_event_stream(self) -> bool:
return self._require_typed_event_stream

@property
def typed_event_stream_error_message(self) -> Optional[str]:
return self._typed_event_stream_error_message

# Error message will be appended to the default error message.
def set_require_typed_event_stream(self, *, error_message: Optional[str] = None):
self._require_typed_event_stream = True
self._typed_event_stream_error_message = error_message

@property
def step(self) -> ExecutionStep:
return self._step
Expand Down
38 changes: 34 additions & 4 deletions python_modules/dagster/dagster/_core/execution/plan/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
NodeHandle,
Output,
)
from dagster._core.definitions.asset_check_spec import AssetCheckHandle
from dagster._core.definitions.asset_layer import AssetLayer
from dagster._core.definitions.op_definition import OpComputeFunction
from dagster._core.definitions.result import MaterializeResult
Expand Down Expand Up @@ -203,14 +204,43 @@ def execute_core_compute(
for step_output in _yield_compute_results(step_context, inputs, compute_fn):
yield step_output
if isinstance(step_output, (DynamicOutput, Output)):
emitted_result_names.add(step_output.output_name)
output_name = step_output.output_name
elif isinstance(step_output, MaterializeResult):
asset_key = (
step_output.asset_key
or step_context.job_def.asset_layer.asset_key_for_node(step_context.node_handle)
)
output_name = step_context.job_def.asset_layer.node_output_handle_for_asset(
asset_key
).output_name
elif isinstance(step_output, AssetCheckEvaluation):
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(
step_output.asset_check_handle
)
elif isinstance(step_output, AssetCheckResult):
if step_output.asset_key and step_output.check_name:
handle = AssetCheckHandle(step_output.asset_key, step_output.check_name)
else:
handle = step_output.to_asset_check_evaluation(step_context).asset_check_handle
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(handle)
else:
output_name = None
if output_name:
emitted_result_names.add(output_name)

expected_op_output_names = {
output.name for output in step.step_outputs if not output.properties.asset_check_handle
}
omitted_outputs = expected_op_output_names.difference(emitted_result_names)
if omitted_outputs:
step_context.log.info(
f"{step_context.op_def.node_type_str} '{step.node_handle}' did not fire "
f"outputs {omitted_outputs!r}"
message = (
f"{step_context.op_def.node_type_str} '{step.node_handle}' did not yield or return "
f"expected outputs {omitted_outputs!r}."
)

if step_context.has_require_typed_event_stream:
if step_context.typed_event_stream_error_message:
message += " " + step_context.typed_event_stream_error_message
raise DagsterInvariantViolationError(message)
else:
step_context.log.info(message)
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,17 @@ def validate_and_coerce_op_result_to_iterator(
f" {type(result)}. {context.op_def.node_type_str.capitalize()} is explicitly defined to"
" return no results."
)
# Skip any return-specific validation and treat it like a generator op
elif output_defs and context.has_require_typed_event_stream:
# If nothing was returned, treat it as an empty tuple instead of a `(None,)`.
# This is important for delivering the correct error message when an output is missing.
if result is None:
result_tuple = tuple()
elif not isinstance(result, tuple) or is_named_tuple_instance(result):
result_tuple = (result,)
else:
result_tuple = result
yield from result_tuple
elif output_defs:
for position, output_def, element in _zip_and_iterate_op_result(
result, context, output_defs
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from contextlib import contextmanager
from typing import Iterator

import pytest
from dagster import OpExecutionContext, Out, asset, multi_asset, op
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.events import Output
from dagster._core.definitions.materialize import materialize
from dagster._core.errors import DagsterInvariantViolationError
from dagster._utils.test import wrap_op_in_graph_and_execute

EXTRA_ERROR_MESSAGE = "Hello"


@contextmanager
def raises_missing_output_error() -> Iterator[None]:
with pytest.raises(
DagsterInvariantViolationError,
match=f"did not yield or return expected outputs.*{EXTRA_ERROR_MESSAGE}$",
):
yield


def test_explicit_mode_op():
@op(out={"a": Out(int), "b": Out(int)})
def explicit_mode_op(context: OpExecutionContext):
context.set_require_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE)

with raises_missing_output_error():
wrap_op_in_graph_and_execute(explicit_mode_op)


def test_explicit_mode_asset():
@asset
def explicit_mode_asset(context: OpExecutionContext):
context.set_require_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE)
pass

with raises_missing_output_error():
materialize([explicit_mode_asset])


def test_explicit_mode_multi_asset():
@multi_asset(specs=[AssetSpec("foo"), AssetSpec("bar")])
def explicit_mode_multi_asset(context: OpExecutionContext):
context.set_require_typed_event_stream(error_message=EXTRA_ERROR_MESSAGE)
yield Output(None, output_name="foo")
pass

with raises_missing_output_error():
materialize([explicit_mode_multi_asset])

0 comments on commit 7124a54

Please sign in to comment.