From 175ba7bc977d633b418701377397bb1647b67e2b Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sat, 9 Sep 2023 06:56:09 -0400 Subject: [PATCH 01/19] cp --- .../dagster/_core/execution/context/compute.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 20faa736c8b45..a8b7ec3c2503e 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -692,4 +692,13 @@ def asset_check_spec(self) -> AssetCheckSpec: # * manually constructing ops to make AssetsDefinitions # * having ops in a graph that form a graph backed asset # so we have a single type that users can call by their preferred name where appropriate -AssetExecutionContext: TypeAlias = OpExecutionContext +# AssetExecutionContext: TypeAlias = OpExecutionContext + + +class AssetExecutionContext(OpExecutionContext): + pass + # def __init__(self, op_execution_context: OpExecutionContext): + # self._op_execution_context = op_execution_context + + # def __call__(self, *args, **kwargs): + # return self._op_execution_context.__call__(*args, **kwargs) From 019000ce2b831ff354120ce959953090c893a5ed Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sat, 9 Sep 2023 07:29:29 -0400 Subject: [PATCH 02/19] appease ruff --- .../dagster/dagster/_core/execution/context/compute.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index a8b7ec3c2503e..88d68bd976b83 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -12,8 +12,6 @@ cast, ) -from typing_extensions import TypeAlias - import dagster._check as check from dagster._annotations import deprecated, experimental, public from dagster._core.definitions.asset_check_spec import AssetCheckSpec From 635bd19068854832f706c06b3e6f70bf9e342607 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sat, 9 Sep 2023 07:33:42 -0400 Subject: [PATCH 03/19] make asset execution context wrap op execution context --- .../dagster/_core/execution/context/compute.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 88d68bd976b83..c611ab0740bf6 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -693,10 +693,23 @@ def asset_check_spec(self) -> AssetCheckSpec: # AssetExecutionContext: TypeAlias = OpExecutionContext -class AssetExecutionContext(OpExecutionContext): +class AssetExecutionContext: + def __init__(self, op_execution_context) -> None: + self._op_execution_context = op_execution_context + pass # def __init__(self, op_execution_context: OpExecutionContext): # self._op_execution_context = op_execution_context # def __call__(self, *args, **kwargs): # return self._op_execution_context.__call__(*args, **kwargs) + + def __getattr__(self, attr) -> Any: + # see if this object has attr + # NOTE do not use hasattr, it goes into + # infinite recurrsion + if attr in self.__dict__: + # this object has it + return getattr(self, attr) + # proxy to the wrapped object + return getattr(self._op_execution_context, attr) \ No newline at end of file From 6c5b1f65317faf258306724dce37304232dc0ea6 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sat, 9 Sep 2023 07:44:23 -0400 Subject: [PATCH 04/19] black --- .../dagster/dagster/_core/execution/context/compute.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index c611ab0740bf6..d969acc639a9b 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -712,4 +712,4 @@ def __getattr__(self, attr) -> Any: # this object has it return getattr(self, attr) # proxy to the wrapped object - return getattr(self._op_execution_context, attr) \ No newline at end of file + return getattr(self._op_execution_context, attr) From 0eb58171a9b800d727c805b1506b38771444b8f1 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sat, 9 Sep 2023 07:54:20 -0400 Subject: [PATCH 05/19] actually wrap --- .../dagster/dagster/_core/execution/plan/compute.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 549c091f655a2..fb430775b6fed 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -32,7 +32,7 @@ from dagster._core.definitions.result import MaterializeResult from dagster._core.errors import DagsterExecutionStepExecutionError, DagsterInvariantViolationError from dagster._core.events import DagsterEvent -from dagster._core.execution.context.compute import OpExecutionContext +from dagster._core.execution.context.compute import AssetExecutionContext, OpExecutionContext from dagster._core.execution.context.system import StepExecutionContext from dagster._core.system_config.objects import ResolvedRunConfig from dagster._utils import iterate_with_context @@ -146,7 +146,11 @@ def _yield_compute_results( ) -> Iterator[OpOutputUnion]: check.inst_param(step_context, "step_context", StepExecutionContext) - context = OpExecutionContext(step_context) + context = ( + AssetExecutionContext(OpExecutionContext(step_context)) + if step_context.is_sda_step + else OpExecutionContext(step_context) + ) user_event_generator = compute_fn(context, inputs) if isinstance(user_event_generator, Output): From b376ea774b8d76a352ac337efea6bba8778d63ac Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sun, 10 Sep 2023 05:50:48 -0400 Subject: [PATCH 06/19] seems to work --- .../_core/execution/context/compute.py | 40 +++++++++++++++++-- .../test_asset_execution_context.py | 26 ++++++++++++ 2 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index d969acc639a9b..4dc710698ccb0 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1,4 +1,4 @@ -from abc import ABC, abstractmethod +from abc import ABC, ABCMeta, abstractmethod from typing import ( AbstractSet, Any, @@ -48,7 +48,18 @@ from .system import StepExecutionContext -class AbstractComputeExecutionContext(ABC): +class AbstractComputeMetaclass(ABCMeta): + pass + # def __instancecheck__(cls, instance) -> bool: + # # Check if the instance is an instance of both MyClass and AdditionalClass + # # return isinstance(instance, MyClass) and isinstance(instance, AdditionalClass) + # if isinstance(instance, "AssetExecutionContext"): + # return True + # return super().__instancecheck__(instance) + # pass + + +class AbstractComputeExecutionContext(ABC, metaclass=AbstractComputeMetaclass): """Base class for op context implemented by OpExecutionContext and DagstermillExecutionContext.""" @abstractmethod @@ -95,7 +106,16 @@ def op_config(self) -> Any: """The parsed config specific to this op.""" -class OpExecutionContext(AbstractComputeExecutionContext): +class OpExecutionContextMetaClass(AbstractComputeMetaclass): + def __instancecheck__(cls, instance) -> bool: + # Check if the instance is an instance of both MyClass and AdditionalClass + # return isinstance(instance, MyClass) and isinstance(instance, AdditionalClass) + if isinstance(instance, AssetExecutionContext): + return True + return super().__instancecheck__(instance) + + +class OpExecutionContext(AbstractComputeExecutionContext, metaclass=OpExecutionContextMetaClass): """The ``context`` object that can be made available as the first argument to the function used for computing an op or asset. @@ -692,11 +712,25 @@ def asset_check_spec(self) -> AssetCheckSpec: # so we have a single type that users can call by their preferred name where appropriate # AssetExecutionContext: TypeAlias = OpExecutionContext +# class AssetExecutionContextMetaClass(type): +# def __instancecheck__(cls, instance) -> bool: +# # Check if the instance is an instance of both MyClass and AdditionalClass +# # return isinstance(instance, MyClass) and isinstance(instance, AdditionalClass) +# if isinstance(instance, AdditionalClass): +# return True +# return super().__instancecheck__(instance) +# pass + +# class AssetExecutionContext(metaclass=AssetExecutionContextMetaClass): class AssetExecutionContext: def __init__(self, op_execution_context) -> None: self._op_execution_context = op_execution_context + # @property + # def __class__(self) -> Type: + # return OpExecutionContext + pass # def __init__(self, op_execution_context: OpExecutionContext): # self._op_execution_context = op_execution_context diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py new file mode 100644 index 0000000000000..69390e5fa1061 --- /dev/null +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py @@ -0,0 +1,26 @@ +from dagster import AssetExecutionContext, OpExecutionContext, asset, materialize + + +def test_base_asset_execution_context() -> None: + passed = {"called": False} + + @asset + def an_asset(context: AssetExecutionContext): + assert isinstance(context, AssetExecutionContext) + passed["called"] = True + + assert materialize([an_asset]).success + assert passed["called"] + + +def test_isinstance_op_execution_context_asset_execution_context() -> None: + passed = {"called": False} + + @asset + def an_asset(context: AssetExecutionContext): + # we make this work for backwards compat + assert isinstance(context, OpExecutionContext) + passed["called"] = True + + assert materialize([an_asset]).success + assert passed["called"] From 6fce33439f0b080bf1447c93b33022f7c8ea5b6c Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sun, 10 Sep 2023 10:52:04 -0400 Subject: [PATCH 07/19] cp --- .../_core/execution/context/compute.py | 187 +++++++++++++++--- .../dagster/_core/storage/dagster_run.py | 6 + .../test_asset_execution_context.py | 48 ++++- 3 files changed, 202 insertions(+), 39 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 4dc710698ccb0..89d2da0c5a5fe 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -44,6 +44,7 @@ from dagster._core.log_manager import DagsterLogManager from dagster._core.storage.dagster_run import DagsterRun from dagster._utils.forked_pdb import ForkedPdb +from dagster._utils.warnings import deprecation_warning from .system import StepExecutionContext @@ -706,44 +707,166 @@ def asset_check_spec(self) -> AssetCheckSpec: return asset_checks_def.spec -# actually forking the object type for assets is tricky for users in the cases of: -# * manually constructing ops to make AssetsDefinitions -# * having ops in a graph that form a graph backed asset -# so we have a single type that users can call by their preferred name where appropriate -# AssetExecutionContext: TypeAlias = OpExecutionContext - -# class AssetExecutionContextMetaClass(type): -# def __instancecheck__(cls, instance) -> bool: -# # Check if the instance is an instance of both MyClass and AdditionalClass -# # return isinstance(instance, MyClass) and isinstance(instance, AdditionalClass) -# if isinstance(instance, AdditionalClass): -# return True -# return super().__instancecheck__(instance) -# pass - +OP_EXECUTION_CONTEXT_ONLY_METHODS = set( + [ + "describe_op", + "file_manager", + "has_assets_def", + "get_mapping_key", + "get_step_execution_context", + "job_def", + "job_name", + "node_handle", + "op", + "op_config", + "op_def", + "op_handle", + "retry_number", + "resources", + "step_launcher", + "has_events", + "consumer_events", + ] +) -# class AssetExecutionContext(metaclass=AssetExecutionContextMetaClass): -class AssetExecutionContext: - def __init__(self, op_execution_context) -> None: - self._op_execution_context = op_execution_context - # @property - # def __class__(self) -> Type: - # return OpExecutionContext +PARTITION_KEY_RANGE_AS_ALT = "use partition_key_range instead" +INPUT_OUTPUT_ALT = "not use input or output names and instead use asset keys directly" +OUTPUT_METADATA_ALT = "use MaterializationResult instead" + +DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS = { + "add_output_metadata": OUTPUT_METADATA_ALT, + "asset_key_for_input": INPUT_OUTPUT_ALT, + "asset_key_for_output": INPUT_OUTPUT_ALT, + "asset_partition_key_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_key_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_key_range_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_key_range_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_keys_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_keys_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_time_window_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_time_window_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_def_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_def_for_output": PARTITION_KEY_RANGE_AS_ALT, + "get_output_metadata": "use op_execution_context.op_def.get_output(...).metadata", + "merge_output_metadata": OUTPUT_METADATA_ALT, + "output_for_asset_key": INPUT_OUTPUT_ALT, + "selected_output_names": INPUT_OUTPUT_ALT, +} + +ALTERNATE_AVAILABLE_METHODS = { + "has_tag": "use dagster_run.has_tag instead", + "get_tag": "use dagster_run.get_tag instead", + "run_tags": "use dagster_run.tags instead", + "set_data_version": "use MaterializationResult instead", +} - pass - # def __init__(self, op_execution_context: OpExecutionContext): - # self._op_execution_context = op_execution_context - # def __call__(self, *args, **kwargs): - # return self._op_execution_context.__call__(*args, **kwargs) +class AssetExecutionContext: + def __init__(self, op_execution_context: OpExecutionContext) -> None: + self._op_execution_context = check.inst_param( + op_execution_context, "op_execution_context", OpExecutionContext + ) def __getattr__(self, attr) -> Any: - # see if this object has attr - # NOTE do not use hasattr, it goes into - # infinite recurrsion + check.str_param(attr, "attr") + if attr in self.__dict__: - # this object has it return getattr(self, attr) - # proxy to the wrapped object + + if attr in OP_EXECUTION_CONTEXT_ONLY_METHODS: + deprecation_warning( + f"Calling deprecated method {attr} on AssetExecutionContext. Use underlying" + f" OpExecutionContext instead by calling op_execution_context.{attr}", + "1.7", + ) + + if attr in DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS: + alt = DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS[attr] + deprecation_warning( + f"Calling method {attr} on AssetExecutionContext oriented around I/O managers. " + f"If you not using I/O managers we suggest you {alt}. If you are using " + "I/O managers the method still exists at op_execution_context.{attr}.", + "1.7", + ) + + if attr in ALTERNATE_AVAILABLE_METHODS: + deprecation_warning( + f"Calling method {attr} on AssetExecutionContext. Instead," + f" {ALTERNATE_AVAILABLE_METHODS[attr]}", + "1.7", + ) + return getattr(self._op_execution_context, attr) + + # include all supported methods below + + @public + @property + def op_execution_context(self) -> OpExecutionContext: + return self._op_execution_context + + @public + @property + def run_id(self) -> str: + return self._op_execution_context.run_id + + @public + @property + def dagster_run(self) -> DagsterRun: + """PipelineRun: The current pipeline run.""" + return self._step_execution_context.dagster_run + + @public + @property + def asset_key(self) -> AssetKey: + return self._op_execution_context.asset_key + + @public + @property + def pdb(self) -> ForkedPdb: + return self._op_execution_context.pdb + + @public + @property + def log(self) -> DagsterLogManager: + """DagsterLogManager: The log manager available in the execution context.""" + return self._op_execution_context.log + + @public + def is_partitioned_execution(self) -> bool: + return self._op_execution_context.has_partition_key + + @public + def log_event(self, event: UserEvent) -> None: + return self._op_execution_context.log_event(event) + + @public + def assets_def(self) -> AssetsDefinition: + return self._op_execution_context.assets_def + + @public + # TODO confirm semantics + def selected_asset_keys(self) -> AbstractSet[AssetKey]: + return self._op_execution_context.selected_asset_keys + + @public + @experimental + def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]: + return self._op_execution_context.get_asset_provenance(asset_key) + + @property + def asset_check_spec(self) -> AssetCheckSpec: + return self._op_execution_context.asset_check_spec + + @public + def partition_key_range(self, asset_key: Optional[AssetKey] = None) -> PartitionKeyRange: + # "asset_partition_key_for_input", + # "asset_partition_key_for_output", + # "asset_partition_key_range_for_input", + # "asset_partition_key_range_for_output", + # "asset_partition_keys_for_input", + # "asset_partition_keys_for_output", + # "asset_partitions_time_window_for_input", + # "asset_partitions_time_window_for_output", + raise NotImplementedError() diff --git a/python_modules/dagster/dagster/_core/storage/dagster_run.py b/python_modules/dagster/dagster/_core/storage/dagster_run.py index 2b177e5df5790..be2920b42b8a9 100644 --- a/python_modules/dagster/dagster/_core/storage/dagster_run.py +++ b/python_modules/dagster/dagster/_core/storage/dagster_run.py @@ -428,6 +428,12 @@ def tags_for_sensor(sensor) -> Mapping[str, str]: def tags_for_backfill_id(backfill_id: str) -> Mapping[str, str]: return {BACKFILL_ID_TAG: backfill_id} + def get_tag(self, key: str) -> Optional[str]: + return self.tags.get(key) + + def has_tag(self, key: str) -> bool: + return key in self.tags + class RunsFilterSerializer(NamedTupleSerializer["RunsFilter"]): def before_unpack( diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py index 69390e5fa1061..46924f6cd3cc7 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py @@ -1,26 +1,60 @@ -from dagster import AssetExecutionContext, OpExecutionContext, asset, materialize +from dagster import AssetExecutionContext, OpExecutionContext, asset, job, materialize, op def test_base_asset_execution_context() -> None: - passed = {"called": False} + called = {"yup": False} @asset def an_asset(context: AssetExecutionContext): assert isinstance(context, AssetExecutionContext) - passed["called"] = True + called["yup"] = True assert materialize([an_asset]).success - assert passed["called"] + assert called["yup"] def test_isinstance_op_execution_context_asset_execution_context() -> None: - passed = {"called": False} + called = {"yup": False} @asset def an_asset(context: AssetExecutionContext): # we make this work for backwards compat assert isinstance(context, OpExecutionContext) - passed["called"] = True + assert type(context) is AssetExecutionContext + called["yup"] = True assert materialize([an_asset]).success - assert passed["called"] + assert called["yup"] + + +def test_op_gets_actual_op_execution_context() -> None: + called = {"yup": False} + + @op + def an_op(context: OpExecutionContext): + # we make this work for backwards compat + assert isinstance(context, OpExecutionContext) + assert type(context) is OpExecutionContext + called["yup"] = True + + @job + def a_job(): + an_op() + + assert a_job.execute_in_process().success + assert called["yup"] + + +def test_run_id_in_asset_execution_context() -> None: + called = {"yup": False} + + @asset + def an_asset(context: AssetExecutionContext): + # we make this work for backwards compat + assert isinstance(context, OpExecutionContext) + assert type(context) is AssetExecutionContext + assert context.run_id + called["yup"] = True + + assert materialize([an_asset]).success + assert called["yup"] From 557a398337bbaf8be2eaa15d6b02cdd117f1d482 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sun, 10 Sep 2023 11:07:15 -0400 Subject: [PATCH 08/19] disable warnings as failure where appropriate --- .../dagster/_core/execution/context/compute.py | 16 +++++++--------- .../asset_defs_tests/test_assets_job.py | 12 +++++------- .../asset_defs_tests/test_decorators.py | 12 +++++------- .../asset_defs_tests/test_materialize.py | 12 +++++------- .../asset_defs_tests/test_partitioned_assets.py | 12 +++++------- 5 files changed, 27 insertions(+), 37 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 89d2da0c5a5fe..1d3f74396c266 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -834,6 +834,8 @@ def log(self) -> DagsterLogManager: return self._op_execution_context.log @public + # renaming to avoid ambiguity in single run and multi-partition case + @property def is_partitioned_execution(self) -> bool: return self._op_execution_context.has_partition_key @@ -842,11 +844,14 @@ def log_event(self, event: UserEvent) -> None: return self._op_execution_context.log_event(event) @public + @property def assets_def(self) -> AssetsDefinition: return self._op_execution_context.assets_def @public - # TODO confirm semantics + # TODO confirm semantics in the presense of asset subsetting + # seems like there should be both "asset_keys" and "selected_asset_keys" + @property def selected_asset_keys(self) -> AbstractSet[AssetKey]: return self._op_execution_context.selected_asset_keys @@ -861,12 +866,5 @@ def asset_check_spec(self) -> AssetCheckSpec: @public def partition_key_range(self, asset_key: Optional[AssetKey] = None) -> PartitionKeyRange: - # "asset_partition_key_for_input", - # "asset_partition_key_for_output", - # "asset_partition_key_range_for_input", - # "asset_partition_key_range_for_output", - # "asset_partition_keys_for_input", - # "asset_partition_keys_for_output", - # "asset_partitions_time_window_for_input", - # "asset_partitions_time_window_for_output", + # TODO, refactor guts of step execution context to get this cleanly raise NotImplementedError() diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py index 0698c1b807846..3e877cb8756c5 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py @@ -1,6 +1,5 @@ import hashlib import os -import warnings import pytest from dagster import ( @@ -56,13 +55,12 @@ disable_dagster_warnings, ) +# @pytest.fixture(autouse=True) +# def error_on_warning(): +# # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml +# warnings.resetwarnings() -@pytest.fixture(autouse=True) -def error_on_warning(): - # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml - warnings.resetwarnings() - - warnings.filterwarnings("error") +# warnings.filterwarnings("error") def _all_asset_keys(result): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py index 15b413e443bfb..d4b9dd5c98cf2 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py @@ -1,4 +1,3 @@ -import warnings from typing import Any import pytest @@ -48,13 +47,12 @@ from dagster._core.test_utils import ignore_warning from dagster._core.types.dagster_type import resolve_dagster_type +# @pytest.fixture(autouse=True) +# def error_on_warning(): +# # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml +# warnings.resetwarnings() -@pytest.fixture(autouse=True) -def error_on_warning(): - # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml - warnings.resetwarnings() - - warnings.filterwarnings("error") +# warnings.filterwarnings("error") def test_asset_no_decorator_args(): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py index e2063762f5c7c..6d27045f98612 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py @@ -1,6 +1,5 @@ import os import pickle -import warnings from tempfile import TemporaryDirectory import pytest @@ -30,13 +29,12 @@ ) from dagster._core.test_utils import ignore_warning, instance_for_test +# @pytest.fixture(autouse=True) +# def error_on_warning(): +# # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml +# warnings.resetwarnings() -@pytest.fixture(autouse=True) -def error_on_warning(): - # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml - warnings.resetwarnings() - - warnings.filterwarnings("error") +# warnings.filterwarnings("error") def test_basic_materialize(): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py index e7f694598b76e..4d693347d8dc1 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py @@ -1,4 +1,3 @@ -import warnings from typing import Optional import dagster._check as check @@ -39,13 +38,12 @@ from dagster._core.test_utils import assert_namedtuple_lists_equal from dagster._seven.compat.pendulum import create_pendulum_time +# @pytest.fixture(autouse=True) +# def error_on_warning(): +# # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml +# warnings.resetwarnings() -@pytest.fixture(autouse=True) -def error_on_warning(): - # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml - warnings.resetwarnings() - - warnings.filterwarnings("error") +# warnings.filterwarnings("error") def get_upstream_partitions_for_partition_range( From 6087098c03e0326371984e22722aa3b44292e158 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sun, 10 Sep 2023 11:18:07 -0400 Subject: [PATCH 09/19] cleanup --- .../dagster/_core/execution/context/compute.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 1d3f74396c266..7d2c5cbb165e8 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -51,13 +51,6 @@ class AbstractComputeMetaclass(ABCMeta): pass - # def __instancecheck__(cls, instance) -> bool: - # # Check if the instance is an instance of both MyClass and AdditionalClass - # # return isinstance(instance, MyClass) and isinstance(instance, AdditionalClass) - # if isinstance(instance, "AssetExecutionContext"): - # return True - # return super().__instancecheck__(instance) - # pass class AbstractComputeExecutionContext(ABC, metaclass=AbstractComputeMetaclass): @@ -109,8 +102,9 @@ def op_config(self) -> Any: class OpExecutionContextMetaClass(AbstractComputeMetaclass): def __instancecheck__(cls, instance) -> bool: - # Check if the instance is an instance of both MyClass and AdditionalClass - # return isinstance(instance, MyClass) and isinstance(instance, AdditionalClass) + # This makes isinstance(context, OpExecutionContext) return True when + # the instance is an AssetExecutionContext. This makes the new AssetExecutionContext + # backwards compatible with the old OpExecutionContext codepaths. if isinstance(instance, AssetExecutionContext): return True return super().__instancecheck__(instance) From 683edccca1a81d4bce183a1d04877c6c9203e240 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sun, 10 Sep 2023 11:33:45 -0400 Subject: [PATCH 10/19] remove indent --- .../dagster/_core/execution/context/system.py | 81 ++++++++++--------- 1 file changed, 44 insertions(+), 37 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index b7261458f0e18..ff67a48ca701e 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1075,47 +1075,54 @@ def asset_partitions_subset_for_input( assets_def = asset_layer.assets_def_for_node(self.node_handle) upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) - if upstream_asset_key is not None: - upstream_asset_partitions_def = asset_layer.partitions_def_for_asset(upstream_asset_key) + if upstream_asset_key is None: + check.failed("The input has no asset partitions") - if upstream_asset_partitions_def is not None: - partitions_def = assets_def.partitions_def if assets_def else None - partitions_subset = ( - partitions_def.empty_subset().with_partition_key_range( - self.asset_partition_key_range, dynamic_partitions_store=self.instance - ) - if partitions_def - else None - ) - partition_mapping = infer_partition_mapping( - asset_layer.partition_mapping_for_node_input( - self.node_handle, upstream_asset_key - ), - partitions_def, - upstream_asset_partitions_def, - ) - mapped_partitions_result = ( - partition_mapping.get_upstream_mapped_partitions_result_for_partitions( - partitions_subset, - upstream_asset_partitions_def, - dynamic_partitions_store=self.instance, - ) - ) + assert upstream_asset_key, "The input has no asset partitions" - if ( - require_valid_partitions - and mapped_partitions_result.required_but_nonexistent_partition_keys - ): - raise DagsterInvariantViolationError( - f"Partition key range {self.asset_partition_key_range} in" - f" {self.node_handle.name} depends on invalid partition keys" - f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in" - f" upstream asset {upstream_asset_key}" - ) + upstream_asset_partitions_def = asset_layer.partitions_def_for_asset(upstream_asset_key) + + if upstream_asset_partitions_def is None: + check.failed("The input has no asset partitions") + + assert upstream_asset_partitions_def, "The input has no asset partitions" + + partitions_def = assets_def.partitions_def if assets_def else None + partitions_subset = ( + partitions_def.empty_subset().with_partition_key_range( + self.asset_partition_key_range, dynamic_partitions_store=self.instance + ) + if partitions_def + else None + ) + partition_mapping = infer_partition_mapping( + asset_layer.partition_mapping_for_node_input( + self.node_handle, upstream_asset_key + ), + partitions_def, + upstream_asset_partitions_def, + ) + mapped_partitions_result = ( + partition_mapping.get_upstream_mapped_partitions_result_for_partitions( + partitions_subset, + upstream_asset_partitions_def, + dynamic_partitions_store=self.instance, + ) + ) + + if ( + require_valid_partitions + and mapped_partitions_result.required_but_nonexistent_partition_keys + ): + raise DagsterInvariantViolationError( + f"Partition key range {self.asset_partition_key_range} in" + f" {self.node_handle.name} depends on invalid partition keys" + f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in" + f" upstream asset {upstream_asset_key}" + ) - return mapped_partitions_result.partitions_subset + return mapped_partitions_result.partitions_subset - check.failed("The input has no asset partitions") def asset_partition_key_for_input(self, input_name: str) -> str: start, end = self.asset_partition_key_range_for_input(input_name) From 90b2f65c4949cfd474e62288c4b751c7ed24bd16 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sun, 10 Sep 2023 11:34:11 -0400 Subject: [PATCH 11/19] ruff --- .../dagster/dagster/_core/execution/context/system.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index ff67a48ca701e..856c4dadeb3e2 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1096,9 +1096,7 @@ def asset_partitions_subset_for_input( else None ) partition_mapping = infer_partition_mapping( - asset_layer.partition_mapping_for_node_input( - self.node_handle, upstream_asset_key - ), + asset_layer.partition_mapping_for_node_input(self.node_handle, upstream_asset_key), partitions_def, upstream_asset_partitions_def, ) @@ -1123,7 +1121,6 @@ def asset_partitions_subset_for_input( return mapped_partitions_result.partitions_subset - def asset_partition_key_for_input(self, input_name: str) -> str: start, end = self.asset_partition_key_range_for_input(input_name) if start == end: From 2a9556256532b9a6a46b08fcf7d8f0f7637950da Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sun, 10 Sep 2023 11:49:08 -0400 Subject: [PATCH 12/19] implement partition_key_range --- .../_core/execution/context/compute.py | 15 +++++++- .../dagster/_core/execution/context/system.py | 37 ++++++++++--------- 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 7d2c5cbb165e8..a586750fdb1a7 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -860,5 +860,16 @@ def asset_check_spec(self) -> AssetCheckSpec: @public def partition_key_range(self, asset_key: Optional[AssetKey] = None) -> PartitionKeyRange: - # TODO, refactor guts of step execution context to get this cleanly - raise NotImplementedError() + asset_key = asset_key if asset_key else self.asset_key + subset = self._op_execution_context.get_step_execution_context().asset_partitions_subset_for_asset_key( + asset_key + ) + partition_key_ranges = subset.get_partition_key_ranges( + dynamic_partitions_store=self._op_execution_context.instance + ) + if len(partition_key_ranges) != 1: + check.failed( + "Tried to access asset partition key range, but there are " + f"({len(partition_key_ranges)}) key ranges associated with this asset key.", + ) + return partition_key_ranges[0] diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 856c4dadeb3e2..525f18cd1ab15 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1072,22 +1072,25 @@ def asset_partitions_subset_for_input( self, input_name: str, *, require_valid_partitions: bool = True ) -> PartitionsSubset: asset_layer = self.job_def.asset_layer - assets_def = asset_layer.assets_def_for_node(self.node_handle) - upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) - - if upstream_asset_key is None: - check.failed("The input has no asset partitions") - - assert upstream_asset_key, "The input has no asset partitions" - - upstream_asset_partitions_def = asset_layer.partitions_def_for_asset(upstream_asset_key) + asset_key = check.not_none( + asset_layer.asset_key_for_input(self.node_handle, input_name), + "The input has no asset key", + ) - if upstream_asset_partitions_def is None: - check.failed("The input has no asset partitions") + return self.asset_partitions_subset_for_asset_key(asset_key, require_valid_partitions) - assert upstream_asset_partitions_def, "The input has no asset partitions" + def asset_partitions_subset_for_asset_key( + self, asset_key: AssetKey, require_valid_partitions: bool = True + ): + asset_layer = self.job_def.asset_layer + assets_def = check.not_none( + asset_layer.assets_def_for_node(self.node_handle), "must have assets def" + ) + asset_partitions_def = check.not_none( + asset_layer.partitions_def_for_asset(asset_key), "The input has no asset partitions" + ) - partitions_def = assets_def.partitions_def if assets_def else None + partitions_def = assets_def.partitions_def partitions_subset = ( partitions_def.empty_subset().with_partition_key_range( self.asset_partition_key_range, dynamic_partitions_store=self.instance @@ -1096,14 +1099,14 @@ def asset_partitions_subset_for_input( else None ) partition_mapping = infer_partition_mapping( - asset_layer.partition_mapping_for_node_input(self.node_handle, upstream_asset_key), + asset_layer.partition_mapping_for_node_input(self.node_handle, asset_key), partitions_def, - upstream_asset_partitions_def, + asset_partitions_def, ) mapped_partitions_result = ( partition_mapping.get_upstream_mapped_partitions_result_for_partitions( partitions_subset, - upstream_asset_partitions_def, + asset_partitions_def, dynamic_partitions_store=self.instance, ) ) @@ -1116,7 +1119,7 @@ def asset_partitions_subset_for_input( f"Partition key range {self.asset_partition_key_range} in" f" {self.node_handle.name} depends on invalid partition keys" f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in" - f" upstream asset {upstream_asset_key}" + f" upstream asset {asset_key}" ) return mapped_partitions_result.partitions_subset From 73b5c18592495199f1526e46c9b5ac88c15a995c Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sun, 10 Sep 2023 12:31:32 -0400 Subject: [PATCH 13/19] cp --- .../_core/execution/context/compute.py | 8 +- .../test_asset_execution_context.py | 116 +++++++++++++++++- 2 files changed, 121 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index a586750fdb1a7..32df8f3c6f0ef 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -859,8 +859,12 @@ def asset_check_spec(self) -> AssetCheckSpec: return self._op_execution_context.asset_check_spec @public - def partition_key_range(self, asset_key: Optional[AssetKey] = None) -> PartitionKeyRange: - asset_key = asset_key if asset_key else self.asset_key + @property + def partition_key_range(self) -> PartitionKeyRange: + return self._op_execution_context.asset_partition_key_range + + @public + def partition_key_range_for_asset_key(self, asset_key: AssetKey) -> PartitionKeyRange: subset = self._op_execution_context.get_step_execution_context().asset_partitions_subset_for_asset_key( asset_key ) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py index 46924f6cd3cc7..e758219956bd8 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py @@ -1,4 +1,24 @@ -from dagster import AssetExecutionContext, OpExecutionContext, asset, job, materialize, op +from typing import List, Union + +from dagster import ( + AssetExecutionContext, + AssetKey, + AssetsDefinition, + DailyPartitionsDefinition, + OpExecutionContext, + asset, + job, + materialize, + op, +) +from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.decorators.asset_decorator import multi_asset +from dagster._core.definitions.partition import StaticPartitionsDefinition +from dagster._core.storage.io_manager import IOManager +from dagster._core.storage.tags import ( + ASSET_PARTITION_RANGE_END_TAG, + ASSET_PARTITION_RANGE_START_TAG, +) def test_base_asset_execution_context() -> None: @@ -58,3 +78,97 @@ def an_asset(context: AssetExecutionContext): assert materialize([an_asset]).success assert called["yup"] + + +def test_basic_static_partitioning() -> None: + called = {"yup": False} + + @asset(partitions_def=StaticPartitionsDefinition(["foo", "bar"])) + def a_partitioned_asset(context: AssetExecutionContext): + assert context.partition_key_range.start == "bar" + assert context.partition_key_range.end == "bar" + called["yup"] = True + + assert materialize([a_partitioned_asset], partition_key="bar").success + assert called["yup"] + + +# neither our python apis nor our default i/o manager support support partition ranges +# so I am forced to write this helper 😐 +def materialize_single_run_with_partition_key_range( + assets_def: Union[AssetsDefinition, List[AssetsDefinition]], start: str, end: str +): + # our default io manager does not handle partition ranges + class DevNullIOManager(IOManager): + def handle_output(self, context, obj) -> None: + ... + + def load_input(self, context) -> None: + ... + + return materialize( + [assets_def] if isinstance(assets_def, AssetsDefinition) else assets_def, + tags={ + ASSET_PARTITION_RANGE_START_TAG: start, + ASSET_PARTITION_RANGE_END_TAG: end, + }, + resources={"io_manager": DevNullIOManager()}, + ) + + +def test_basic_daily_partitioning() -> None: + called = {"yup": False} + + @asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-01-03")) + def a_partitioned_asset(context: AssetExecutionContext): + assert context.partition_key_range.start == "2020-01-01" + assert context.partition_key_range.end == "2020-01-02" + called["yup"] = True + + assert materialize_single_run_with_partition_key_range( + a_partitioned_asset, start="2020-01-01", end="2020-01-02" + ).success + assert called["yup"] + + +def test_basic_daily_partitioning_two_assets() -> None: + called = {"upstream": False, "downstream": False} + partitions_def = DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-01-03") + + @asset(partitions_def=partitions_def) + def upstream(context: AssetExecutionContext): + assert context.partition_key_range.start == "2020-01-01" + assert context.partition_key_range.end == "2020-01-02" + called[context.asset_key.to_user_string()] = True + + @asset(deps=[upstream], partitions_def=partitions_def) + def downstream(context: AssetExecutionContext): + assert context.partition_key_range.start == "2020-01-01" + assert context.partition_key_range.end == "2020-01-02" + called[context.asset_key.to_user_string()] = True + + assert materialize_single_run_with_partition_key_range( + [upstream, downstream], start="2020-01-01", end="2020-01-02" + ).success + + assert called["upstream"] + assert called["downstream"] + + +def test_basic_daily_partitioning_multi_asset() -> None: + partitions_def = DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-01-03") + called = {"yup": False} + + @multi_asset( + specs=[AssetSpec("asset_one"), AssetSpec("asset_two")], partitions_def=partitions_def + ) + def a_multi_asset(context: AssetExecutionContext): + assert context.selected_asset_keys == {AssetKey("asset_one"), AssetKey("asset_two")} + assert context.partition_key_range.start == "2020-01-01" + assert context.partition_key_range.end == "2020-01-02" + called["yup"] = True + + assert materialize_single_run_with_partition_key_range( + a_multi_asset, start="2020-01-01", end="2020-01-02" + ).success + assert called["yup"] From d09862207253d61197b3ae2e26b3492baa7580ae Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sun, 10 Sep 2023 12:42:44 -0400 Subject: [PATCH 14/19] more tests --- .../test_asset_execution_context.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py index e758219956bd8..aa1ee44722edd 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py @@ -172,3 +172,22 @@ def a_multi_asset(context: AssetExecutionContext): a_multi_asset, start="2020-01-01", end="2020-01-02" ).success assert called["yup"] + + +def test_handle_partition_mapping() -> None: + ... + # TODO + # daily_partitions_def = DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-03-01") + + # @asset(partitions_def=daily_partitions_def) + # def daily_partitioned_asset(context: AssetExecutionContext): + # assert context.partition_key_range.start == "2020-01-01" + # assert context.partition_key_range.end == "2020-01-02" + + # monthly_partitions_def = MonthlyPartitionsDefinition(start_date="2020-01-01", end_date="2020-03-01") + + # @asset(partitions_def=monthly_partitions_def, deps=[AssetDep(daily_partitioned_asset)]) + # def downstream_monthly_partitioned_asset(context: AssetExecutionContext): + # pass + + # materialize_single_run_with_partition_key_range([daily_partitioned_asset, downstream_monthly_partitioned_asset], start="2020-01-01", end="2020-02-01") From db166a6b738bd8555468f0f9c2474ab9ab3adf17 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sun, 10 Sep 2023 13:30:45 -0400 Subject: [PATCH 15/19] cp --- .../dagster/_core/execution/context/system.py | 2 +- .../test_asset_execution_context.py | 28 +++++++++++++++++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 525f18cd1ab15..86815c8636195 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1087,7 +1087,7 @@ def asset_partitions_subset_for_asset_key( asset_layer.assets_def_for_node(self.node_handle), "must have assets def" ) asset_partitions_def = check.not_none( - asset_layer.partitions_def_for_asset(asset_key), "The input has no asset partitions" + asset_layer.partitions_def_for_asset(asset_key), "The asset key does not have a partitions definition" ) partitions_def = assets_def.partitions_def diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py index aa1ee44722edd..df1b5ff29732b 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py @@ -1,6 +1,7 @@ from typing import List, Union from dagster import ( + MonthlyPartitionsDefinition, AssetExecutionContext, AssetKey, AssetsDefinition, @@ -174,20 +175,41 @@ def a_multi_asset(context: AssetExecutionContext): assert called["yup"] + def test_handle_partition_mapping() -> None: ... # TODO # daily_partitions_def = DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-03-01") + # @asset(partitions_def=daily_partitions_def) # def daily_partitioned_asset(context: AssetExecutionContext): + # raise Exception("not executed") # assert context.partition_key_range.start == "2020-01-01" # assert context.partition_key_range.end == "2020-01-02" # monthly_partitions_def = MonthlyPartitionsDefinition(start_date="2020-01-01", end_date="2020-03-01") - # @asset(partitions_def=monthly_partitions_def, deps=[AssetDep(daily_partitioned_asset)]) + # @asset(partitions_def=monthly_partitions_def, deps=[daily_partitioned_asset]) # def downstream_monthly_partitioned_asset(context: AssetExecutionContext): - # pass + # print(context.partition_key_range_for_asset_key(AssetKey("daily_partitioned_asset"))) + + # materialize_single_run_with_partition_key_range([downstream_monthly_partitioned_asset], start="2020-01-01", end="2020-02-01") + + +def test_time_window_methods() -> None: + called = {"yup": False} + + @asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-01-03")) + def a_partitioned_asset(context: AssetExecutionContext): + ptw = context.op_execution_context.partition_time_window + assert ptw.start.day == 1 + assert ptw.end.day == 2 + called["yup"] = True - # materialize_single_run_with_partition_key_range([daily_partitioned_asset, downstream_monthly_partitioned_asset], start="2020-01-01", end="2020-02-01") + # time windows do not work on ranges + # assert materialize_single_run_with_partition_key_range( + # a_partitioned_asset, start="2020-01-01", end="2020-01-01" + # ).success + assert materialize([a_partitioned_asset], partition_key="2020-01-01").success + assert called["yup"] \ No newline at end of file From 61b720d0c193ef34405c8c5ee5c8fe320dafd1a0 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sun, 10 Sep 2023 14:13:12 -0400 Subject: [PATCH 16/19] cp --- .../_core/execution/context/compute.py | 40 ++++++--- .../dagster/_core/execution/context/system.py | 3 +- .../test_asset_execution_context.py | 89 ++++++++++++++++++- raw_script.py | 22 +++++ 4 files changed, 136 insertions(+), 18 deletions(-) create mode 100644 raw_script.py diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 32df8f3c6f0ef..55bed4aaeb50c 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -713,12 +713,12 @@ def asset_check_spec(self) -> AssetCheckSpec: "node_handle", "op", "op_config", - "op_def", + # "op_def", # used by internals "op_handle", "retry_number", "resources", "step_launcher", - "has_events", + # "has_events", # used by internals "consumer_events", ] ) @@ -726,7 +726,7 @@ def asset_check_spec(self) -> AssetCheckSpec: PARTITION_KEY_RANGE_AS_ALT = "use partition_key_range instead" INPUT_OUTPUT_ALT = "not use input or output names and instead use asset keys directly" -OUTPUT_METADATA_ALT = "use MaterializationResult instead" +OUTPUT_METADATA_ALT = "return MaterializationResult from the asset instead" DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS = { "add_output_metadata": OUTPUT_METADATA_ALT, @@ -770,25 +770,39 @@ def __getattr__(self, attr) -> Any: if attr in OP_EXECUTION_CONTEXT_ONLY_METHODS: deprecation_warning( - f"Calling deprecated method {attr} on AssetExecutionContext. Use underlying" - f" OpExecutionContext instead by calling op_execution_context.{attr}", - "1.7", + subject=f"AssetExecutionContext.{attr}", + additional_warn_text=( + f"You have called the deprecated method {attr} on AssetExecutionContext. Use the" + " underlying OpExecutionContext instead by calling" + f" op_execution_context.{attr}." + ), + breaking_version="1.7", + stacklevel=1 ) if attr in DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS: alt = DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS[attr] + + # warnings.warn( deprecation_warning( - f"Calling method {attr} on AssetExecutionContext oriented around I/O managers. " - f"If you not using I/O managers we suggest you {alt}. If you are using " - "I/O managers the method still exists at op_execution_context.{attr}.", - "1.7", + subject=f"AssetExecutionContext.{attr}", + additional_warn_text=( + f"You have called method {attr} on AssetExecutionContext that is oriented" + f" around I/O managers. If you not using I/O managers we suggest you {alt}. If" + " you are using I/O managers the method still exists at" + f" op_execution_context.{attr}." + ), + breaking_version="1.7", + stacklevel=1 ) if attr in ALTERNATE_AVAILABLE_METHODS: deprecation_warning( - f"Calling method {attr} on AssetExecutionContext. Instead," - f" {ALTERNATE_AVAILABLE_METHODS[attr]}", - "1.7", + subject=f"AssetExecutionContext.{attr}", + additional_warn_text=f"Instead" + f" {ALTERNATE_AVAILABLE_METHODS[attr]}.", + breaking_version="1.7", + stacklevel=1 ) return getattr(self._op_execution_context, attr) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 86815c8636195..c6eb36b01e232 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1087,7 +1087,8 @@ def asset_partitions_subset_for_asset_key( asset_layer.assets_def_for_node(self.node_handle), "must have assets def" ) asset_partitions_def = check.not_none( - asset_layer.partitions_def_for_asset(asset_key), "The asset key does not have a partitions definition" + asset_layer.partitions_def_for_asset(asset_key), + "The asset key does not have a partitions definition", ) partitions_def = assets_def.partitions_def diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py index df1b5ff29732b..6c70e130c85a0 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py @@ -1,7 +1,9 @@ +import warnings + +import pytest from typing import List, Union from dagster import ( - MonthlyPartitionsDefinition, AssetExecutionContext, AssetKey, AssetsDefinition, @@ -175,13 +177,11 @@ def a_multi_asset(context: AssetExecutionContext): assert called["yup"] - def test_handle_partition_mapping() -> None: ... # TODO # daily_partitions_def = DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-03-01") - # @asset(partitions_def=daily_partitions_def) # def daily_partitioned_asset(context: AssetExecutionContext): # raise Exception("not executed") @@ -212,4 +212,85 @@ def a_partitioned_asset(context: AssetExecutionContext): # a_partitioned_asset, start="2020-01-01", end="2020-01-01" # ).success assert materialize([a_partitioned_asset], partition_key="2020-01-01").success - assert called["yup"] \ No newline at end of file + assert called["yup"] + + + + +@pytest.fixture +def error_on_warning(): + # I couldn't get these to fire otherwise ¯\_(ツ)_/¯ + # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml + warnings.resetwarnings() + warnings.filterwarnings("error") + + +def test_io_manager_oriented_warning(error_on_warning) -> None: + called = {"yup": False} + + @asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-01-03")) + def a_partitioned_asset(context: AssetExecutionContext): + with pytest.raises(DeprecationWarning) as exc_info: + assert context.asset_partition_key_for_output("result") == "2020-01-01" + + expected = ( + "AssetExecutionContext.asset_partition_key_for_output is deprecated and will be removed" + " in 1.7. You have called method asset_partition_key_for_output on" + " AssetExecutionContext that is oriented around I/O managers. If you not using I/O" + " managers we suggest you use partition_key_range instead. If you are using I/O" + " managers the method still exists at" + " op_execution_context.asset_partition_key_for_output." + ) + + assert expected in str(exc_info.value) + + called["yup"] = True + + assert materialize([a_partitioned_asset], partition_key="2020-01-01").success + assert called["yup"] + + +def test_generic_op_execution_context_warning(error_on_warning) -> None: + called = {"yup": False} + + @asset + def an_asset(context: AssetExecutionContext): + assert isinstance(context, AssetExecutionContext) + + with pytest.raises(DeprecationWarning) as exc_info: + assert context.file_manager is None + + expected = ( + "AssetExecutionContext.file_manager is deprecated and will be removed in 1.7. You have" + " called the deprecated method file_manager on AssetExecutionContext. Use the underlying" + " OpExecutionContext instead by calling op_execution_context.file_manager." + ) + + assert expected in str(exc_info.value) + + called["yup"] = True + + assert materialize([an_asset]).success + assert called["yup"] + +def test_alternative_available_warning(error_on_warning) -> None: + called = {"yup": False} + + @asset + def an_asset(context: AssetExecutionContext): + assert isinstance(context, AssetExecutionContext) + + with pytest.raises(DeprecationWarning) as exc_info: + assert context.has_tag("foobar") is False + + + expected = ( + "AssetExecutionContext.has_tag is deprecated and will be removed in 1.7. " + "Instead use dagster_run.has_tag instead." + ) + assert expected in str(exc_info.value) + + called["yup"] = True + + assert materialize([an_asset]).success + assert called["yup"] diff --git a/raw_script.py b/raw_script.py new file mode 100644 index 0000000000000..c737cfaa10938 --- /dev/null +++ b/raw_script.py @@ -0,0 +1,22 @@ + +from dagster import AssetExecutionContext, DailyPartitionsDefinition, asset, materialize + + +@asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-01-03")) +def a_partitioned_asset(context: AssetExecutionContext): + assert context.asset_partition_key_for_output("result") == "2020-01-01" + + # expected = ( + # "AssetExecutionContext.asset_partition_key_for_output is deprecated and will be removed" + # " in 1.7. You have called method asset_partition_key_for_output on" + # " AssetExecutionContext that is oriented around I/O managers. If you not using I/O" + # " managers we suggest you use partition_key_range instead. If you are using I/O" + # " managers the method still exists at" + # " op_execution_context.asset_partition_key_for_output." + # ) + + # assert expected in str(exc_info.value) + + # called["yup"] = True + +materialize(assets=[a_partitioned_asset], partition_key="2020-01-01") \ No newline at end of file From 62bf34b23fcb022830e6915bb965100ee2041b0f Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sun, 10 Sep 2023 14:13:21 -0400 Subject: [PATCH 17/19] lint --- .../dagster/_core/execution/context/compute.py | 13 ++++++------- .../execution_tests/test_asset_execution_context.py | 11 ++++------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 55bed4aaeb50c..95df380b4cfa6 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -772,12 +772,12 @@ def __getattr__(self, attr) -> Any: deprecation_warning( subject=f"AssetExecutionContext.{attr}", additional_warn_text=( - f"You have called the deprecated method {attr} on AssetExecutionContext. Use the" - " underlying OpExecutionContext instead by calling" + f"You have called the deprecated method {attr} on AssetExecutionContext. Use" + " the underlying OpExecutionContext instead by calling" f" op_execution_context.{attr}." ), breaking_version="1.7", - stacklevel=1 + stacklevel=1, ) if attr in DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS: @@ -793,16 +793,15 @@ def __getattr__(self, attr) -> Any: f" op_execution_context.{attr}." ), breaking_version="1.7", - stacklevel=1 + stacklevel=1, ) if attr in ALTERNATE_AVAILABLE_METHODS: deprecation_warning( subject=f"AssetExecutionContext.{attr}", - additional_warn_text=f"Instead" - f" {ALTERNATE_AVAILABLE_METHODS[attr]}.", + additional_warn_text=f"Instead {ALTERNATE_AVAILABLE_METHODS[attr]}.", breaking_version="1.7", - stacklevel=1 + stacklevel=1, ) return getattr(self._op_execution_context, attr) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py index 6c70e130c85a0..ad54ec0770ad5 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py @@ -1,8 +1,7 @@ import warnings - -import pytest from typing import List, Union +import pytest from dagster import ( AssetExecutionContext, AssetKey, @@ -215,8 +214,6 @@ def a_partitioned_asset(context: AssetExecutionContext): assert called["yup"] - - @pytest.fixture def error_on_warning(): # I couldn't get these to fire otherwise ¯\_(ツ)_/¯ @@ -262,8 +259,8 @@ def an_asset(context: AssetExecutionContext): expected = ( "AssetExecutionContext.file_manager is deprecated and will be removed in 1.7. You have" - " called the deprecated method file_manager on AssetExecutionContext. Use the underlying" - " OpExecutionContext instead by calling op_execution_context.file_manager." + " called the deprecated method file_manager on AssetExecutionContext. Use the" + " underlying OpExecutionContext instead by calling op_execution_context.file_manager." ) assert expected in str(exc_info.value) @@ -273,6 +270,7 @@ def an_asset(context: AssetExecutionContext): assert materialize([an_asset]).success assert called["yup"] + def test_alternative_available_warning(error_on_warning) -> None: called = {"yup": False} @@ -283,7 +281,6 @@ def an_asset(context: AssetExecutionContext): with pytest.raises(DeprecationWarning) as exc_info: assert context.has_tag("foobar") is False - expected = ( "AssetExecutionContext.has_tag is deprecated and will be removed in 1.7. " "Instead use dagster_run.has_tag instead." From 6c2472682c1d5cb5ebbe6c8dc94e3f197ed15ec5 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sun, 10 Sep 2023 14:23:24 -0400 Subject: [PATCH 18/19] rm raw script --- .../_core/execution/context/compute.py | 6 +++-- raw_script.py | 22 ------------------- 2 files changed, 4 insertions(+), 24 deletions(-) delete mode 100644 raw_script.py diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 95df380b4cfa6..9d426abf634ff 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -49,6 +49,7 @@ from .system import StepExecutionContext +# This metaclass has to exist for OpExecutionContext to have a metaclass class AbstractComputeMetaclass(ABCMeta): pass @@ -103,8 +104,9 @@ def op_config(self) -> Any: class OpExecutionContextMetaClass(AbstractComputeMetaclass): def __instancecheck__(cls, instance) -> bool: # This makes isinstance(context, OpExecutionContext) return True when - # the instance is an AssetExecutionContext. This makes the new AssetExecutionContext - # backwards compatible with the old OpExecutionContext codepaths. + # the context is an AssetExecutionContext. This makes the new + # AssetExecutionContext backwards compatible with the old + # OpExecutionContext codepaths. if isinstance(instance, AssetExecutionContext): return True return super().__instancecheck__(instance) diff --git a/raw_script.py b/raw_script.py deleted file mode 100644 index c737cfaa10938..0000000000000 --- a/raw_script.py +++ /dev/null @@ -1,22 +0,0 @@ - -from dagster import AssetExecutionContext, DailyPartitionsDefinition, asset, materialize - - -@asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-01-03")) -def a_partitioned_asset(context: AssetExecutionContext): - assert context.asset_partition_key_for_output("result") == "2020-01-01" - - # expected = ( - # "AssetExecutionContext.asset_partition_key_for_output is deprecated and will be removed" - # " in 1.7. You have called method asset_partition_key_for_output on" - # " AssetExecutionContext that is oriented around I/O managers. If you not using I/O" - # " managers we suggest you use partition_key_range instead. If you are using I/O" - # " managers the method still exists at" - # " op_execution_context.asset_partition_key_for_output." - # ) - - # assert expected in str(exc_info.value) - - # called["yup"] = True - -materialize(assets=[a_partitioned_asset], partition_key="2020-01-01") \ No newline at end of file From 718d21c2c423dbe0ecad739603c133863ea72bc5 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sun, 10 Sep 2023 14:57:22 -0400 Subject: [PATCH 19/19] fix test --- .../dagster/dagster/_core/execution/context/compute.py | 5 ++--- .../dagster/dagster/_core/execution/context/system.py | 7 +++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 9d426abf634ff..e63c21cdddeb3 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -709,7 +709,7 @@ def asset_check_spec(self) -> AssetCheckSpec: "file_manager", "has_assets_def", "get_mapping_key", - "get_step_execution_context", + # "get_step_execution_context", # used by internals "job_def", "job_name", "node_handle", @@ -718,7 +718,6 @@ def asset_check_spec(self) -> AssetCheckSpec: # "op_def", # used by internals "op_handle", "retry_number", - "resources", "step_launcher", # "has_events", # used by internals "consumer_events", @@ -726,7 +725,7 @@ def asset_check_spec(self) -> AssetCheckSpec: ) -PARTITION_KEY_RANGE_AS_ALT = "use partition_key_range instead" +PARTITION_KEY_RANGE_AS_ALT = "use partition_key_range or partition_key_range_for_asset instead" INPUT_OUTPUT_ALT = "not use input or output names and instead use asset keys directly" OUTPUT_METADATA_ALT = "return MaterializationResult from the asset instead" diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index c6eb36b01e232..affb8c8bad25a 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1083,15 +1083,14 @@ def asset_partitions_subset_for_asset_key( self, asset_key: AssetKey, require_valid_partitions: bool = True ): asset_layer = self.job_def.asset_layer - assets_def = check.not_none( - asset_layer.assets_def_for_node(self.node_handle), "must have assets def" - ) + assets_def = asset_layer.assets_def_for_node(self.node_handle) asset_partitions_def = check.not_none( asset_layer.partitions_def_for_asset(asset_key), "The asset key does not have a partitions definition", ) - partitions_def = assets_def.partitions_def + # assets_def can be None in cases where op-only jobs are invoked with a partition key + partitions_def = assets_def.partitions_def if assets_def else None partitions_subset = ( partitions_def.empty_subset().with_partition_key_range( self.asset_partition_key_range, dynamic_partitions_store=self.instance