Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 15, 2023
1 parent d0ecb6a commit fd482bd
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 20 deletions.
38 changes: 21 additions & 17 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@
from .system import StepExecutionContext


class ExecutionInfo(
class ExecutionProperties(
NamedTuple(
"_ExecutionInfo",
"_ExecutionProperties",
[
("step_description", PublicAttr[str]),
("op_execution_context", PublicAttr["OpExecutionContext"]),
Expand All @@ -74,7 +74,7 @@ class ExecutionInfo(
"""

def __new__(cls, step_description: str, op_execution_context: "OpExecutionContext"):
return super(ExecutionInfo, cls).__new__(
return super(ExecutionProperties, cls).__new__(
cls, step_description=step_description, op_execution_context=op_execution_context
)

Expand Down Expand Up @@ -135,14 +135,14 @@ def op_config(self) -> Any:
"""The parsed config specific to this op."""


class HasExecutionInfo(ABC):
class HasExecutionProperties(ABC):
@property
@abstractmethod
def execution_info(self) -> ExecutionInfo:
def execution_properties(self) -> ExecutionProperties:
"""Implement this method to check if a logging tag is set."""


class OpExecutionContextMetaClass(AbstractComputeMetaclass, HasExecutionInfo):
class OpExecutionContextMetaClass(AbstractComputeMetaclass):
def __instancecheck__(cls, instance) -> bool:
# This makes isinstance(context, OpExecutionContext) throw a deprecation warning when
# context is an AssetExecutionContext. This metaclass can be deleted once AssetExecutionContext
Expand All @@ -160,7 +160,9 @@ def __instancecheck__(cls, instance) -> bool:
return super().__instancecheck__(instance)


class OpExecutionContext(AbstractComputeExecutionContext, metaclass=OpExecutionContextMetaClass):
class OpExecutionContext(
AbstractComputeExecutionContext, HasExecutionProperties, metaclass=OpExecutionContextMetaClass
):
"""The ``context`` object that can be made available as the first argument to the function
used for computing an op or asset.
Expand Down Expand Up @@ -190,13 +192,13 @@ def __init__(self, step_execution_context: StepExecutionContext):
self._events: List[DagsterEvent] = []
self._output_metadata: Dict[str, Any] = {}

self._execution_info = ExecutionInfo(
self._execution_props = ExecutionProperties(
step_description=self._step_execution_context.describe_op(), op_execution_context=self
)

@property
def execution_info(self) -> ExecutionInfo:
return self._execution_info
def execution_properties(self) -> ExecutionProperties:
return self._execution_props

@public
@property
Expand Down Expand Up @@ -1459,11 +1461,7 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None:
op_execution_context, "op_execution_context", OpExecutionContext
)
self._run_props = None

self._execution_info = ExecutionInfo(
step_description=f"asset {self._op_execution_context.node_handle}",
op_execution_context=self._op_execution_context,
)
self._execution_props = None

@staticmethod
def get() -> "AssetExecutionContext":
Expand All @@ -1489,8 +1487,14 @@ def run_properties(self) -> RunProperties:
return self._run_props

@property
def execution_info(self) -> ExecutionInfo:
return self._execution_info
def execution_properties(self) -> ExecutionProperties:
if self._execution_props is None:
self._execution_props = ExecutionProperties(
step_description=f"asset {self.assets_def.node_def.name}",
op_execution_context=self._op_execution_context,
)

return self._execution_props

######## Deprecated methods

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from dagster._utils.forked_pdb import ForkedPdb
from dagster._utils.merger import merge_dicts

from .compute import ExecutionInfo, OpExecutionContext
from .compute import ExecutionProperties, OpExecutionContext
from .system import StepExecutionContext, TypeCheckContext


Expand Down Expand Up @@ -125,7 +125,7 @@ def __init__(
self._hook_defs = None
self._tags = {}
self._seen_outputs = {}
self._execution_info = None
self._execution_props = None

# maintain init time versions of these values so we can unbind the context
self._init_op_config = op_config
Expand Down Expand Up @@ -240,7 +240,7 @@ def bind(
raise DagsterInvalidInvocationError("Cannot provide config in both context and kwargs")
self._op_config = resolve_bound_config(config_from_args or self.op_config, op_def)

self._execution_info = ExecutionInfo(
self._execution_props = ExecutionProperties(
step_description=f'op "{self._alias}"', op_execution_context=self
)

Expand All @@ -260,6 +260,7 @@ def unbind(self):
self._op_config = self._init_op_config
self._user_events = []
self._output_metadata = {}
self._execution_props = None

self._bound = False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def test_deprecation_warnings():
"is_subset",
"partition_keys",
"get",
"execution_info",
"_execution_info",
]

other_ignores = [
Expand Down

0 comments on commit fd482bd

Please sign in to comment.