Skip to content

Commit

Permalink
change context conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Sep 20, 2023
1 parent 9f6d595 commit 32d5c77
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
20 changes: 20 additions & 0 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from dagster._core.definitions.step_launcher import StepLauncher
from dagster._core.definitions.time_window_partitions import TimeWindow
from dagster._core.errors import (
DagsterInvalidDefinitionError,
DagsterInvalidPropertyError,
DagsterInvariantViolationError,
)
Expand Down Expand Up @@ -1698,6 +1699,7 @@ def has_events(self) -> bool:
def asset_check_spec(self) -> AssetCheckSpec:
return self._op_execution_context.asset_check_spec


def build_execution_context(
step_context: StepExecutionContext,
) -> Union[OpExecutionContext, AssetExecutionContext]:
Expand All @@ -1708,6 +1710,13 @@ def build_execution_context(
asset AssetExecutionContext AssetExecutionContext
asset OpExecutionContext OpExecutionContext
asset None AssetExecutionContext
op AssetExecutionContext Error - we cannot init an AssetExecutionContext w/o an AssetsDefinition
op OpExecutionContext OpExecutionContext
op None OpExecutionContext
For ops in graph-backed assets
step type annotation result
op AssetExecutionContext AssetExecutionContext
op OpExecutionContext OpExecutionContext
op None OpExecutionContext
Expand All @@ -1726,6 +1735,17 @@ def build_execution_context(
context_param = compute_fn.get_context_arg()
context_annotation = context_param.annotation

# TODO - i dont know how to move this check to Definition time since we don't know if the op is
# part of a graph-backed asset until we have the step execution context, i think
if context_annotation is AssetExecutionContext and not is_sda_step:
# AssetExecutionContext requires an AssetsDefinition during init, so an op in an op job
# cannot be annotated with AssetExecutionContext
raise DagsterInvalidDefinitionError(
"Cannot annotate @op `context` parameter with type AssetExecutionContext unless the"
" op is part of a graph-backed asset. `context` must be annotated with"
" OpExecutionContext, or left blank."
)

op_context = OpExecutionContext(step_context)

if context_annotation is EmptyAnnotation:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ def asset_annotation(context: AssetExecutionContext):
def asset_annotation_job():
asset_annotation()

assert asset_annotation_job.execute_in_process().success
with pytest.raises(
DagsterInvalidDefinitionError,
match="Cannot annotate @op `context` parameter with type AssetExecutionContext",
):
asset_annotation_job.execute_in_process()

@op
def op_annotation(context: OpExecutionContext):
Expand Down Expand Up @@ -209,15 +213,18 @@ def no_annotation(context, *args):
no_annotation_graph.to_job(name="no_annotation_job").execute_in_process()

def asset_annotation(context: AssetExecutionContext, *args):
assert isinstance(context, AssetExecutionContext)
yield Output(1)
assert False, "Test should error during context creation"

asset_annotation_op = OpDefinition(compute_fn=asset_annotation, name="asset_annotation_op")
asset_annotation_graph = GraphDefinition(
name="asset_annotation_graph", node_defs=[asset_annotation_op]
)

asset_annotation_graph.to_job(name="asset_annotation_job").execute_in_process()
with pytest.raises(
DagsterInvalidDefinitionError,
match="Cannot annotate @op `context` parameter with type AssetExecutionContext",
):
asset_annotation_graph.to_job(name="asset_annotation_job").execute_in_process()

def op_annotation(context: OpExecutionContext, *args):
assert isinstance(context, OpExecutionContext)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import dagster._check as check
import pytest
from dagster import (
AssetExecutionContext,
AssetKey,
DagsterExecutionStepNotFoundError,
DagsterInvalidConfigError,
DagsterInvariantViolationError,
Field,
OpExecutionContext,
Out,
Output,
ReexecutionOptions,
Expand Down Expand Up @@ -377,7 +377,7 @@ def echo(x):


@op
def fail_once(context: AssetExecutionContext, x):
def fail_once(context: OpExecutionContext, x):
key = context.op_handle.name
if context.instance.run_storage.get_cursor_values({key}).get(key):
return x
Expand Down

0 comments on commit 32d5c77

Please sign in to comment.