diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index e530009d2523e..c029d781b410f 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -71,13 +71,16 @@ class ExecutionProperties: You should not need to access these attributes directly. """ - def __init__(self, step_description: str, node_type: str, op_def: "OpDefinition"): + def __init__( + self, step_description: str, node_type: str, op_def: "OpDefinition", op_config: Any + ): self._step_description = step_description self._node_type = node_type self._op_def = op_def self._events: List[DagsterEvent] = [] self._requires_typed_event_stream = False self._typed_event_stream_error_message = None + self._op_config = op_config @property def step_description(self) -> str: @@ -91,6 +94,10 @@ def node_type(self) -> str: def op_def(self) -> "OpDefinition": return self._op_def + @property + def op_config(self) -> Any: + return self._op_config + def consume_events(self) -> Iterator[DagsterEvent]: events = self._events self._events = [] @@ -168,11 +175,20 @@ def op_config(self) -> Any: class ContextHasExecutionProperties(ABC): + """Base class that any context that can be used for execution or invocation of an op or asset + must implement. + """ + @property @abstractmethod def execution_properties(self) -> ExecutionProperties: """Context classes must contain an instance of ExecutionProperties.""" + @property + @abstractmethod + def resources(self) -> Any: + """Context classes must be able to provide currently available resources.""" + class OpExecutionContextMetaClass(AbstractComputeMetaclass): def __instancecheck__(cls, instance) -> bool: @@ -233,6 +249,7 @@ def __init__(self, step_execution_context: StepExecutionContext): OpDefinition, self._step_execution_context.job_def.get_node(self.node_handle).definition, ), + op_config=self._step_execution_context.op_config, ) @property @@ -243,7 +260,7 @@ def execution_properties(self) -> ExecutionProperties: @property def op_config(self) -> Any: """Any: The parsed config specific to this op.""" - return self._step_execution_context.op_config + return self.execution_properties.op_config @property def dagster_run(self) -> DagsterRun: @@ -1517,6 +1534,7 @@ def execution_properties(self) -> ExecutionProperties: step_description=f"asset {self.op_execution_context.node_handle}", node_type="asset", op_def=self.op_execution_context.op_def, + op_config=self.op_execution_context.op_config, ) return self._execution_props diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 2fe58e73cb7a7..3ba23e8202766 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -56,7 +56,7 @@ from dagster._utils.merger import merge_dicts from dagster._utils.warnings import deprecation_warning -from .compute import OpExecutionContext +from .compute import ExecutionProperties, OpExecutionContext from .system import StepExecutionContext, TypeCheckContext @@ -106,18 +106,26 @@ def __new__( ) -class RunlessExecutionProperties: - """Maintains information about the invocation that is updated during execution time. This information - needs to be available to the user once invocation is complete, so that they can assert on events and - outputs. It needs to be cleared before the context is used for another invocation. +class RunlessExecutionProperties(ExecutionProperties): + """Maintains properties that need to be available to the execution code. To support runless execution + (direct invocation) this class also maintains information about the invocation that is updated + during execution time. This information needs to be available to the user once invocation is + complete, so that they can assert on events and outputs. It needs to be cleared before the + context is used for another invocation. """ - def __init__(self): + def __init__( + self, step_description: str, node_type: str, op_def: "OpDefinition", op_config: Any + ): + self._step_description = step_description + self._node_type = node_type + self._op_def = op_def self._events: List[UserEvent] = [] self._seen_outputs = {} self._output_metadata = {} self._requires_typed_event_stream = False self._typed_event_stream_error_message = None + self._op_config = op_config @property def user_events(self): @@ -131,14 +139,6 @@ def seen_outputs(self): def output_metadata(self): return self._output_metadata - @property - def requires_typed_event_stream(self) -> bool: - return self._requires_typed_event_stream - - @property - def typed_event_stream_error_message(self) -> Optional[str]: - return self._typed_event_stream_error_message - def log_event(self, event: UserEvent) -> None: check.inst_param( event, @@ -292,7 +292,7 @@ def __init__( # my_op(ctx) # ctx._execution_properties.output_metadata # information is retained after invocation # my_op(ctx) # ctx._execution_properties is cleared at the beginning of the next invocation - self._execution_properties = RunlessExecutionProperties() + self._execution_properties = None def __enter__(self): self._cm_scope_entered = True @@ -326,9 +326,6 @@ def bind( f"This context is currently being used to execute {self.alias}. The context cannot be used to execute another op until {self.alias} has finished executing." ) - # reset execution_properties - self._execution_properties = RunlessExecutionProperties() - # update the bound context with properties relevant to the execution of the op invocation_tags = ( @@ -403,6 +400,11 @@ def bind( step_description=step_description, ) + # reset execution_properties + self._execution_properties = RunlessExecutionProperties( + step_description=step_description, node_type="op", op_def=op_def, op_config=op_config + ) + return self def unbind(self): @@ -414,6 +416,11 @@ def is_bound(self) -> bool: @property def execution_properties(self) -> RunlessExecutionProperties: + if self._execution_properties is None: + raise DagsterInvalidPropertyError( + "Cannot access execution_properties until after the context has been used to" + " invoke an op" + ) return self._execution_properties @property diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 4cf2a837c683a..e5fba715e937e 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -120,10 +120,12 @@ def invoke_compute_fn( if config_arg_cls: # config_arg_cls is either a Config class or a primitive type if issubclass(config_arg_cls, Config): - to_pass = config_arg_cls._get_non_default_public_field_values_cls(context.op_config) # noqa: SLF001 + to_pass = config_arg_cls._get_non_default_public_field_values_cls( # noqa: SLF001 + context.execution_properties.op_config + ) args_to_pass["config"] = config_arg_cls(**to_pass) else: - args_to_pass["config"] = context.op_config + args_to_pass["config"] = context.execution_properties.op_config if resource_args: for resource_name, arg_name in resource_args.items(): args_to_pass[arg_name] = context.resources.original_resource_dict[resource_name]