diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index b5a4fd0e7dd16..6a9f62c8f1cd3 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -39,6 +39,7 @@ from dagster._core.definitions.step_launcher import StepLauncher from dagster._core.definitions.time_window_partitions import TimeWindow from dagster._core.errors import ( + DagsterInvalidDefinitionError, DagsterInvalidPropertyError, DagsterInvariantViolationError, ) @@ -1695,6 +1696,7 @@ def has_events(self) -> bool: def asset_check_spec(self) -> AssetCheckSpec: return self._op_execution_context.asset_check_spec + def build_execution_context( step_context: StepExecutionContext, ) -> Union[OpExecutionContext, AssetExecutionContext]: @@ -1705,6 +1707,13 @@ def build_execution_context( asset AssetExecutionContext AssetExecutionContext asset OpExecutionContext OpExecutionContext asset None AssetExecutionContext + op AssetExecutionContext Error - we cannot init an AssetExecutionContext w/o an AssetsDefinition + op OpExecutionContext OpExecutionContext + op None OpExecutionContext + + + For ops in graph-backed assets + step type annotation result op AssetExecutionContext AssetExecutionContext op OpExecutionContext OpExecutionContext op None OpExecutionContext @@ -1723,6 +1732,17 @@ def build_execution_context( context_param = compute_fn.get_context_arg() context_annotation = context_param.annotation + # TODO - i dont know how to move this check to Definition time since we don't know if the op is + # part of a graph-backed asset until we have the step execution context, i think + if context_annotation is AssetExecutionContext and not is_sda_step: + # AssetExecutionContext requires an AssetsDefinition during init, so an op in an op job + # cannot be annotated with AssetExecutionContext + raise DagsterInvalidDefinitionError( + "Cannot annotate @op `context` parameter with type AssetExecutionContext unless the" + " op is part of a graph-backed asset. `context` must be annotated with" + " OpExecutionContext, or left blank." + ) + op_context = OpExecutionContext(step_context) if context_annotation is EmptyAnnotation: diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py index d51c350e6ea00..674314da0ff72 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py @@ -76,7 +76,11 @@ def asset_annotation(context: AssetExecutionContext): def asset_annotation_job(): asset_annotation() - assert asset_annotation_job.execute_in_process().success + with pytest.raises( + DagsterInvalidDefinitionError, + match="Cannot annotate @op `context` parameter with type AssetExecutionContext", + ): + asset_annotation_job.execute_in_process() @op def op_annotation(context: OpExecutionContext): @@ -209,15 +213,18 @@ def no_annotation(context, *args): no_annotation_graph.to_job(name="no_annotation_job").execute_in_process() def asset_annotation(context: AssetExecutionContext, *args): - assert isinstance(context, AssetExecutionContext) - yield Output(1) + assert False, "Test should error during context creation" asset_annotation_op = OpDefinition(compute_fn=asset_annotation, name="asset_annotation_op") asset_annotation_graph = GraphDefinition( name="asset_annotation_graph", node_defs=[asset_annotation_op] ) - asset_annotation_graph.to_job(name="asset_annotation_job").execute_in_process() + with pytest.raises( + DagsterInvalidDefinitionError, + match="Cannot annotate @op `context` parameter with type AssetExecutionContext", + ): + asset_annotation_graph.to_job(name="asset_annotation_job").execute_in_process() def op_annotation(context: OpExecutionContext, *args): assert isinstance(context, OpExecutionContext) diff --git a/python_modules/dagster/dagster_tests/execution_tests/test_execute_job.py b/python_modules/dagster/dagster_tests/execution_tests/test_execute_job.py index 878296ba5ff71..a755d62473a25 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/test_execute_job.py +++ b/python_modules/dagster/dagster_tests/execution_tests/test_execute_job.py @@ -1,12 +1,12 @@ import dagster._check as check import pytest from dagster import ( - AssetExecutionContext, AssetKey, DagsterExecutionStepNotFoundError, DagsterInvalidConfigError, DagsterInvariantViolationError, Field, + OpExecutionContext, Out, Output, ReexecutionOptions, @@ -377,7 +377,7 @@ def echo(x): @op -def fail_once(context: AssetExecutionContext, x): +def fail_once(context: OpExecutionContext, x): key = context.op_handle.name if context.instance.run_storage.get_cursor_values({key}).get(key): return x