Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add require_typed_event_stream to compute contexts #16706

Merged
merged 1 commit into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,14 @@ def to_asset_check_evaluation(
severity=self.severity,
)

def get_spec_python_identifier(self, asset_key: Optional[AssetKey]) -> str:
def get_spec_python_identifier(
self, *, asset_key: Optional[AssetKey] = None, check_name: Optional[str] = None
) -> str:
"""Returns a string uniquely identifying the asset check spec associated with this result.
This is used for the output name associated with an `AssetCheckResult`.
"""
asset_key = asset_key or self.asset_key
check_name = check_name or self.check_name
assert asset_key is not None, "Asset key must be provided if not set on spec"
assert asset_key is not None, "Asset key must be provided if not set on spec"
return f"{asset_key.to_python_identifier()}_{self.check_name}"
14 changes: 13 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
)
from dagster._core.selector.subset_selector import AssetSelectionData

from ..errors import DagsterInvalidSubsetError
from ..errors import (
DagsterInvalidSubsetError,
DagsterInvariantViolationError,
)
from .config import ConfigMapping
from .dependency import NodeHandle, NodeInputHandle, NodeOutput, NodeOutputHandle
from .events import AssetKey
Expand Down Expand Up @@ -606,6 +609,15 @@ def node_output_handle_for_asset(self, asset_key: AssetKey) -> NodeOutputHandle:
def assets_def_for_node(self, node_handle: NodeHandle) -> Optional["AssetsDefinition"]:
return self.assets_defs_by_node_handle.get(node_handle)

def asset_key_for_node(self, node_handle: NodeHandle) -> AssetKey:
assets_def = self.assets_def_for_node(node_handle)
if not assets_def or len(assets_def.keys_by_output_name.keys()) > 1:
raise DagsterInvariantViolationError(
"Cannot call `asset_key_for_node` in a multi_asset with more than one asset."
" Multiple asset keys defined."
)
return next(iter(assets_def.keys_by_output_name.values()))

def asset_check_specs_for_node(self, node_handle: NodeHandle) -> Sequence[AssetCheckSpec]:
assets_def_for_node = self.assets_def_for_node(node_handle)
checks_def_for_node = self.asset_checks_def_for_node(node_handle)
Expand Down
13 changes: 13 additions & 0 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,19 @@ def asset_check_spec(self) -> AssetCheckSpec:
)
return asset_checks_def.spec

# In this mode no conversion is done on returned values and missing but expected outputs are not
# allowed.
@property
def requires_typed_event_stream(self) -> bool:
return self._step_execution_context.requires_typed_event_stream

@property
def typed_event_stream_error_message(self) -> Optional[str]:
return self._step_execution_context.typed_event_stream_error_message

def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None) -> None:
self._step_execution_context.set_requires_typed_event_stream(error_message=error_message)


# actually forking the object type for assets is tricky for users in the cases of:
# * manually constructing ops to make AssetsDefinitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ def __init__(
self._partition_key = partition_key
self._partition_key_range = partition_key_range
self._assets_def = assets_def
self._requires_typed_event_stream = False
self._typed_event_stream_error_message = None

@property
def op_config(self) -> Any:
Expand Down Expand Up @@ -714,6 +716,20 @@ def add_metadata_two_outputs(context) -> Tuple[str, int]:
else:
self._output_metadata[output_name] = metadata

# In this mode no conversion is done on returned values and missing but expected outputs are not
# allowed.
@property
def requires_typed_event_stream(self) -> bool:
return self._requires_typed_event_stream

@property
def typed_event_stream_error_message(self) -> Optional[str]:
return self._typed_event_stream_error_message

def set_requires_typed_event_stream(self, *, error_message: Optional[str]) -> None:
self._requires_typed_event_stream = True
self._typed_event_stream_error_message = error_message


def build_op_context(
resources: Optional[Mapping[str, Any]] = None,
Expand Down
18 changes: 18 additions & 0 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,24 @@ def __init__(
self._is_external_input_asset_version_info_loaded = False
self._data_version_cache: Dict[AssetKey, "DataVersion"] = {}

self._requires_typed_event_stream = False
self._typed_event_stream_error_message = None

# In this mode no conversion is done on returned values and missing but expected outputs are not
# allowed.
@property
def requires_typed_event_stream(self) -> bool:
return self._requires_typed_event_stream

@property
def typed_event_stream_error_message(self) -> Optional[str]:
return self._typed_event_stream_error_message

# Error message will be appended to the default error message.
def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None):
self._requires_typed_event_stream = True
self._typed_event_stream_error_message = error_message

@property
def step(self) -> ExecutionStep:
return self._step
Expand Down
46 changes: 42 additions & 4 deletions python_modules/dagster/dagster/_core/execution/plan/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
NodeHandle,
Output,
)
from dagster._core.definitions.asset_check_spec import AssetCheckHandle
from dagster._core.definitions.asset_layer import AssetLayer
from dagster._core.definitions.op_definition import OpComputeFunction
from dagster._core.definitions.result import MaterializeResult
Expand Down Expand Up @@ -204,13 +205,50 @@ def execute_core_compute(
yield step_output
if isinstance(step_output, (DynamicOutput, Output)):
emitted_result_names.add(step_output.output_name)
elif isinstance(step_output, MaterializeResult):
asset_key = (
step_output.asset_key
or step_context.job_def.asset_layer.asset_key_for_node(step_context.node_handle)
)
emitted_result_names.add(
step_context.job_def.asset_layer.node_output_handle_for_asset(asset_key).output_name
)
# Check results embedded in MaterializeResult are counted
for check_result in step_output.check_results or []:
handle = check_result.to_asset_check_evaluation(step_context).asset_check_handle
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(
handle
)
emitted_result_names.add(output_name)
elif isinstance(step_output, AssetCheckEvaluation):
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(
step_output.asset_check_handle
)
emitted_result_names.add(output_name)
elif isinstance(step_output, AssetCheckResult):
if step_output.asset_key and step_output.check_name:
handle = AssetCheckHandle(step_output.asset_key, step_output.check_name)
else:
handle = step_output.to_asset_check_evaluation(step_context).asset_check_handle
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(handle)
Comment on lines +228 to +233
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

theres custom logic to bypass check handle outputs on 232 to avoid printing the warning message. I think that means currently asset checks would bypass would not trigger this has_require_typed_event_stream check? Needs test

Copy link
Collaborator Author

@smackesey smackesey Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, they were indeed not being caught, fixed and tests added

emitted_result_names.add(output_name)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be very nice to have a universal function you can call to get output name from result object somewhere, not sure how to cover all cases or where to put it at this time.


expected_op_output_names = {
output.name for output in step.step_outputs if not output.properties.asset_check_handle
output.name
for output in step.step_outputs
# checks are required if we're in requires_typed_event_stream mode
if step_context.requires_typed_event_stream or output.properties.asset_check_handle
}
omitted_outputs = expected_op_output_names.difference(emitted_result_names)
if omitted_outputs:
step_context.log.info(
f"{step_context.op_def.node_type_str} '{step.node_handle}' did not fire "
f"outputs {omitted_outputs!r}"
message = (
f"{step_context.op_def.node_type_str} '{step.node_handle}' did not yield or return "
f"expected outputs {omitted_outputs!r}."
)

if step_context.requires_typed_event_stream:
if step_context.typed_event_stream_error_message:
message += " " + step_context.typed_event_stream_error_message
raise DagsterInvariantViolationError(message)
else:
step_context.log.info(message)
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def _filter_expected_output_defs(
)
materialize_results = [x for x in result_tuple if isinstance(x, MaterializeResult)]
remove_outputs = [
r.get_spec_python_identifier(x.asset_key or context.asset_key)
r.get_spec_python_identifier(asset_key=x.asset_key or context.asset_key)
for x in materialize_results
for r in x.check_results or []
]
Expand Down Expand Up @@ -267,6 +267,20 @@ def validate_and_coerce_op_result_to_iterator(
f" {type(result)}. {context.op_def.node_type_str.capitalize()} is explicitly defined to"
" return no results."
)
# `requires_typed_event_stream` is a mode where we require users to return/yield exactly the
# results that will be registered in the instance, without additional fancy inference (like
# wrapping `None` in an `Output`). We therefore skip any return-specific validation for this
# mode and treat returned values as if they were yielded.
elif output_defs and context.requires_typed_event_stream:
# If nothing was returned, treat it as an empty tuple instead of a `(None,)`.
# This is important for delivering the correct error message when an output is missing.
if result is None:
result_tuple = tuple()
elif not isinstance(result, tuple) or is_named_tuple_instance(result):
result_tuple = (result,)
else:
result_tuple = result
yield from result_tuple
elif output_defs:
for position, output_def, element in _zip_and_iterate_op_result(
result, context, output_defs
Expand Down
Loading