Skip to content

Commit

Permalink
pyright
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 28, 2023
1 parent 125be9c commit 65c73ce
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 11 deletions.
15 changes: 12 additions & 3 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class ExecutionProperties(
"_ExecutionProperties",
[
("step_description", PublicAttr[str]),
("node_type", PublicAttr[str]),
("op_execution_context", PublicAttr["OpExecutionContext"]),
],
)
Expand All @@ -73,9 +74,14 @@ class ExecutionProperties(
You should not need to access these attributes directly.
"""

def __new__(cls, step_description: str, op_execution_context: "OpExecutionContext"):
def __new__(
cls, step_description: str, node_type: str, op_execution_context: "OpExecutionContext"
):
return super(ExecutionProperties, cls).__new__(
cls, step_description=step_description, op_execution_context=op_execution_context
cls,
step_description=step_description,
node_type=node_type,
op_execution_context=op_execution_context,
)


Expand Down Expand Up @@ -195,7 +201,9 @@ def __init__(self, step_execution_context: StepExecutionContext):
self._output_metadata: Dict[str, Any] = {}

self._execution_props = ExecutionProperties(
step_description=self._step_execution_context.describe_op(), op_execution_context=self
step_description=self._step_execution_context.describe_op(),
node_type="op",
op_execution_context=self,
)

@property
Expand Down Expand Up @@ -1494,6 +1502,7 @@ def execution_properties(self) -> ExecutionProperties:
if self._execution_props is None:
self._execution_props = ExecutionProperties(
step_description=f"asset {self.op_execution_context.node_handle}",
node_type="asset",
op_execution_context=self._op_execution_context,
)
return self._execution_props
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,9 @@ def bind(
resources=resources,
op_config=op_config,
)

self._execution_props = ExecutionProperties(
step_description=f'op "{op_def.name}"', op_execution_context=self
step_description=f'op "{op_def.name}"', node_type="op", op_execution_context=self
)

self._invocation_properties = InvocationProperties()
return self

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def _yield_compute_results(
yield _validate_event(event, step_context)

if compute_context.execution_properties.op_execution_context.has_events():
yield from compute_context.consume_events()
yield from compute_context.execution_properties.op_execution_context.consume_events()


def execute_core_compute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,10 @@ def validate_and_coerce_op_result_to_iterator(
# results that will be registered in the instance, without additional fancy inference (like
# wrapping `None` in an `Output`). We therefore skip any return-specific validation for this
# mode and treat returned values as if they were yielded.
elif output_defs and context.requires_typed_event_stream:
elif (
output_defs
and context.execution_properties.op_execution_context.requires_typed_event_stream
):
# If nothing was returned, treat it as an empty tuple instead of a `(None,)`.
# This is important for delivering the correct error message when an output is missing.
if result is None:
Expand All @@ -294,7 +297,9 @@ def validate_and_coerce_op_result_to_iterator(
for position, output_def, element in _zip_and_iterate_op_result(
result, context, output_defs
):
annotation = _get_annotation_for_output_position(position, context.op_def, output_defs)
annotation = _get_annotation_for_output_position(
position, context.execution_properties.op_execution_context.op_def, output_defs
)
if output_def.is_dynamic:
if not isinstance(element, list):
raise DagsterInvariantViolationError(
Expand Down Expand Up @@ -352,11 +357,11 @@ def validate_and_coerce_op_result_to_iterator(
f"Received instead an object of type {type(element)}."
)
if result is None and output_def.is_required is False:
context.log.warning(
context.execution_properties.op_execution_context.log.warning(
'Value "None" returned for non-required output '
f'"{output_def.name}" of {context.execution_properties.step_description}. '
"This value will be passed to downstream "
f"{context.op_def.node_type_str}s. For conditional "
f"{context.execution_properties.node_type}s. For conditional "
"execution, results must be yielded: "
"https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#with-conditional-branching"
)
Expand Down

0 comments on commit 65c73ce

Please sign in to comment.