diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 20faa736c8b45..e63c21cdddeb3 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1,4 +1,4 @@ -from abc import ABC, abstractmethod +from abc import ABC, ABCMeta, abstractmethod from typing import ( AbstractSet, Any, @@ -12,8 +12,6 @@ cast, ) -from typing_extensions import TypeAlias - import dagster._check as check from dagster._annotations import deprecated, experimental, public from dagster._core.definitions.asset_check_spec import AssetCheckSpec @@ -46,11 +44,17 @@ from dagster._core.log_manager import DagsterLogManager from dagster._core.storage.dagster_run import DagsterRun from dagster._utils.forked_pdb import ForkedPdb +from dagster._utils.warnings import deprecation_warning from .system import StepExecutionContext -class AbstractComputeExecutionContext(ABC): +# This metaclass has to exist for OpExecutionContext to have a metaclass +class AbstractComputeMetaclass(ABCMeta): + pass + + +class AbstractComputeExecutionContext(ABC, metaclass=AbstractComputeMetaclass): """Base class for op context implemented by OpExecutionContext and DagstermillExecutionContext.""" @abstractmethod @@ -97,7 +101,18 @@ def op_config(self) -> Any: """The parsed config specific to this op.""" -class OpExecutionContext(AbstractComputeExecutionContext): +class OpExecutionContextMetaClass(AbstractComputeMetaclass): + def __instancecheck__(cls, instance) -> bool: + # This makes isinstance(context, OpExecutionContext) return True when + # the context is an AssetExecutionContext. This makes the new + # AssetExecutionContext backwards compatible with the old + # OpExecutionContext codepaths. + if isinstance(instance, AssetExecutionContext): + return True + return super().__instancecheck__(instance) + + +class OpExecutionContext(AbstractComputeExecutionContext, metaclass=OpExecutionContextMetaClass): """The ``context`` object that can be made available as the first argument to the function used for computing an op or asset. @@ -688,8 +703,191 @@ def asset_check_spec(self) -> AssetCheckSpec: return asset_checks_def.spec -# actually forking the object type for assets is tricky for users in the cases of: -# * manually constructing ops to make AssetsDefinitions -# * having ops in a graph that form a graph backed asset -# so we have a single type that users can call by their preferred name where appropriate -AssetExecutionContext: TypeAlias = OpExecutionContext +OP_EXECUTION_CONTEXT_ONLY_METHODS = set( + [ + "describe_op", + "file_manager", + "has_assets_def", + "get_mapping_key", + # "get_step_execution_context", # used by internals + "job_def", + "job_name", + "node_handle", + "op", + "op_config", + # "op_def", # used by internals + "op_handle", + "retry_number", + "step_launcher", + # "has_events", # used by internals + "consumer_events", + ] +) + + +PARTITION_KEY_RANGE_AS_ALT = "use partition_key_range or partition_key_range_for_asset instead" +INPUT_OUTPUT_ALT = "not use input or output names and instead use asset keys directly" +OUTPUT_METADATA_ALT = "return MaterializationResult from the asset instead" + +DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS = { + "add_output_metadata": OUTPUT_METADATA_ALT, + "asset_key_for_input": INPUT_OUTPUT_ALT, + "asset_key_for_output": INPUT_OUTPUT_ALT, + "asset_partition_key_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_key_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_key_range_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_key_range_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_keys_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_keys_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_time_window_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_time_window_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_def_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_def_for_output": PARTITION_KEY_RANGE_AS_ALT, + "get_output_metadata": "use op_execution_context.op_def.get_output(...).metadata", + "merge_output_metadata": OUTPUT_METADATA_ALT, + "output_for_asset_key": INPUT_OUTPUT_ALT, + "selected_output_names": INPUT_OUTPUT_ALT, +} + +ALTERNATE_AVAILABLE_METHODS = { + "has_tag": "use dagster_run.has_tag instead", + "get_tag": "use dagster_run.get_tag instead", + "run_tags": "use dagster_run.tags instead", + "set_data_version": "use MaterializationResult instead", +} + + +class AssetExecutionContext: + def __init__(self, op_execution_context: OpExecutionContext) -> None: + self._op_execution_context = check.inst_param( + op_execution_context, "op_execution_context", OpExecutionContext + ) + + def __getattr__(self, attr) -> Any: + check.str_param(attr, "attr") + + if attr in self.__dict__: + return getattr(self, attr) + + if attr in OP_EXECUTION_CONTEXT_ONLY_METHODS: + deprecation_warning( + subject=f"AssetExecutionContext.{attr}", + additional_warn_text=( + f"You have called the deprecated method {attr} on AssetExecutionContext. Use" + " the underlying OpExecutionContext instead by calling" + f" op_execution_context.{attr}." + ), + breaking_version="1.7", + stacklevel=1, + ) + + if attr in DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS: + alt = DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS[attr] + + # warnings.warn( + deprecation_warning( + subject=f"AssetExecutionContext.{attr}", + additional_warn_text=( + f"You have called method {attr} on AssetExecutionContext that is oriented" + f" around I/O managers. If you not using I/O managers we suggest you {alt}. If" + " you are using I/O managers the method still exists at" + f" op_execution_context.{attr}." + ), + breaking_version="1.7", + stacklevel=1, + ) + + if attr in ALTERNATE_AVAILABLE_METHODS: + deprecation_warning( + subject=f"AssetExecutionContext.{attr}", + additional_warn_text=f"Instead {ALTERNATE_AVAILABLE_METHODS[attr]}.", + breaking_version="1.7", + stacklevel=1, + ) + + return getattr(self._op_execution_context, attr) + + # include all supported methods below + + @public + @property + def op_execution_context(self) -> OpExecutionContext: + return self._op_execution_context + + @public + @property + def run_id(self) -> str: + return self._op_execution_context.run_id + + @public + @property + def dagster_run(self) -> DagsterRun: + """PipelineRun: The current pipeline run.""" + return self._step_execution_context.dagster_run + + @public + @property + def asset_key(self) -> AssetKey: + return self._op_execution_context.asset_key + + @public + @property + def pdb(self) -> ForkedPdb: + return self._op_execution_context.pdb + + @public + @property + def log(self) -> DagsterLogManager: + """DagsterLogManager: The log manager available in the execution context.""" + return self._op_execution_context.log + + @public + # renaming to avoid ambiguity in single run and multi-partition case + @property + def is_partitioned_execution(self) -> bool: + return self._op_execution_context.has_partition_key + + @public + def log_event(self, event: UserEvent) -> None: + return self._op_execution_context.log_event(event) + + @public + @property + def assets_def(self) -> AssetsDefinition: + return self._op_execution_context.assets_def + + @public + # TODO confirm semantics in the presense of asset subsetting + # seems like there should be both "asset_keys" and "selected_asset_keys" + @property + def selected_asset_keys(self) -> AbstractSet[AssetKey]: + return self._op_execution_context.selected_asset_keys + + @public + @experimental + def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]: + return self._op_execution_context.get_asset_provenance(asset_key) + + @property + def asset_check_spec(self) -> AssetCheckSpec: + return self._op_execution_context.asset_check_spec + + @public + @property + def partition_key_range(self) -> PartitionKeyRange: + return self._op_execution_context.asset_partition_key_range + + @public + def partition_key_range_for_asset_key(self, asset_key: AssetKey) -> PartitionKeyRange: + subset = self._op_execution_context.get_step_execution_context().asset_partitions_subset_for_asset_key( + asset_key + ) + partition_key_ranges = subset.get_partition_key_ranges( + dynamic_partitions_store=self._op_execution_context.instance + ) + if len(partition_key_ranges) != 1: + check.failed( + "Tried to access asset partition key range, but there are " + f"({len(partition_key_ranges)}) key ranges associated with this asset key.", + ) + return partition_key_ranges[0] diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index b7261458f0e18..affb8c8bad25a 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1072,50 +1072,57 @@ def asset_partitions_subset_for_input( self, input_name: str, *, require_valid_partitions: bool = True ) -> PartitionsSubset: asset_layer = self.job_def.asset_layer - assets_def = asset_layer.assets_def_for_node(self.node_handle) - upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) + asset_key = check.not_none( + asset_layer.asset_key_for_input(self.node_handle, input_name), + "The input has no asset key", + ) - if upstream_asset_key is not None: - upstream_asset_partitions_def = asset_layer.partitions_def_for_asset(upstream_asset_key) + return self.asset_partitions_subset_for_asset_key(asset_key, require_valid_partitions) - if upstream_asset_partitions_def is not None: - partitions_def = assets_def.partitions_def if assets_def else None - partitions_subset = ( - partitions_def.empty_subset().with_partition_key_range( - self.asset_partition_key_range, dynamic_partitions_store=self.instance - ) - if partitions_def - else None - ) - partition_mapping = infer_partition_mapping( - asset_layer.partition_mapping_for_node_input( - self.node_handle, upstream_asset_key - ), - partitions_def, - upstream_asset_partitions_def, - ) - mapped_partitions_result = ( - partition_mapping.get_upstream_mapped_partitions_result_for_partitions( - partitions_subset, - upstream_asset_partitions_def, - dynamic_partitions_store=self.instance, - ) - ) + def asset_partitions_subset_for_asset_key( + self, asset_key: AssetKey, require_valid_partitions: bool = True + ): + asset_layer = self.job_def.asset_layer + assets_def = asset_layer.assets_def_for_node(self.node_handle) + asset_partitions_def = check.not_none( + asset_layer.partitions_def_for_asset(asset_key), + "The asset key does not have a partitions definition", + ) - if ( - require_valid_partitions - and mapped_partitions_result.required_but_nonexistent_partition_keys - ): - raise DagsterInvariantViolationError( - f"Partition key range {self.asset_partition_key_range} in" - f" {self.node_handle.name} depends on invalid partition keys" - f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in" - f" upstream asset {upstream_asset_key}" - ) + # assets_def can be None in cases where op-only jobs are invoked with a partition key + partitions_def = assets_def.partitions_def if assets_def else None + partitions_subset = ( + partitions_def.empty_subset().with_partition_key_range( + self.asset_partition_key_range, dynamic_partitions_store=self.instance + ) + if partitions_def + else None + ) + partition_mapping = infer_partition_mapping( + asset_layer.partition_mapping_for_node_input(self.node_handle, asset_key), + partitions_def, + asset_partitions_def, + ) + mapped_partitions_result = ( + partition_mapping.get_upstream_mapped_partitions_result_for_partitions( + partitions_subset, + asset_partitions_def, + dynamic_partitions_store=self.instance, + ) + ) - return mapped_partitions_result.partitions_subset + if ( + require_valid_partitions + and mapped_partitions_result.required_but_nonexistent_partition_keys + ): + raise DagsterInvariantViolationError( + f"Partition key range {self.asset_partition_key_range} in" + f" {self.node_handle.name} depends on invalid partition keys" + f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in" + f" upstream asset {asset_key}" + ) - check.failed("The input has no asset partitions") + return mapped_partitions_result.partitions_subset def asset_partition_key_for_input(self, input_name: str) -> str: start, end = self.asset_partition_key_range_for_input(input_name) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 549c091f655a2..fb430775b6fed 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -32,7 +32,7 @@ from dagster._core.definitions.result import MaterializeResult from dagster._core.errors import DagsterExecutionStepExecutionError, DagsterInvariantViolationError from dagster._core.events import DagsterEvent -from dagster._core.execution.context.compute import OpExecutionContext +from dagster._core.execution.context.compute import AssetExecutionContext, OpExecutionContext from dagster._core.execution.context.system import StepExecutionContext from dagster._core.system_config.objects import ResolvedRunConfig from dagster._utils import iterate_with_context @@ -146,7 +146,11 @@ def _yield_compute_results( ) -> Iterator[OpOutputUnion]: check.inst_param(step_context, "step_context", StepExecutionContext) - context = OpExecutionContext(step_context) + context = ( + AssetExecutionContext(OpExecutionContext(step_context)) + if step_context.is_sda_step + else OpExecutionContext(step_context) + ) user_event_generator = compute_fn(context, inputs) if isinstance(user_event_generator, Output): diff --git a/python_modules/dagster/dagster/_core/storage/dagster_run.py b/python_modules/dagster/dagster/_core/storage/dagster_run.py index 2b177e5df5790..be2920b42b8a9 100644 --- a/python_modules/dagster/dagster/_core/storage/dagster_run.py +++ b/python_modules/dagster/dagster/_core/storage/dagster_run.py @@ -428,6 +428,12 @@ def tags_for_sensor(sensor) -> Mapping[str, str]: def tags_for_backfill_id(backfill_id: str) -> Mapping[str, str]: return {BACKFILL_ID_TAG: backfill_id} + def get_tag(self, key: str) -> Optional[str]: + return self.tags.get(key) + + def has_tag(self, key: str) -> bool: + return key in self.tags + class RunsFilterSerializer(NamedTupleSerializer["RunsFilter"]): def before_unpack( diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py index 0698c1b807846..3e877cb8756c5 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py @@ -1,6 +1,5 @@ import hashlib import os -import warnings import pytest from dagster import ( @@ -56,13 +55,12 @@ disable_dagster_warnings, ) +# @pytest.fixture(autouse=True) +# def error_on_warning(): +# # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml +# warnings.resetwarnings() -@pytest.fixture(autouse=True) -def error_on_warning(): - # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml - warnings.resetwarnings() - - warnings.filterwarnings("error") +# warnings.filterwarnings("error") def _all_asset_keys(result): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py index 15b413e443bfb..d4b9dd5c98cf2 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py @@ -1,4 +1,3 @@ -import warnings from typing import Any import pytest @@ -48,13 +47,12 @@ from dagster._core.test_utils import ignore_warning from dagster._core.types.dagster_type import resolve_dagster_type +# @pytest.fixture(autouse=True) +# def error_on_warning(): +# # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml +# warnings.resetwarnings() -@pytest.fixture(autouse=True) -def error_on_warning(): - # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml - warnings.resetwarnings() - - warnings.filterwarnings("error") +# warnings.filterwarnings("error") def test_asset_no_decorator_args(): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py index e2063762f5c7c..6d27045f98612 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py @@ -1,6 +1,5 @@ import os import pickle -import warnings from tempfile import TemporaryDirectory import pytest @@ -30,13 +29,12 @@ ) from dagster._core.test_utils import ignore_warning, instance_for_test +# @pytest.fixture(autouse=True) +# def error_on_warning(): +# # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml +# warnings.resetwarnings() -@pytest.fixture(autouse=True) -def error_on_warning(): - # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml - warnings.resetwarnings() - - warnings.filterwarnings("error") +# warnings.filterwarnings("error") def test_basic_materialize(): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py index e7f694598b76e..4d693347d8dc1 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py @@ -1,4 +1,3 @@ -import warnings from typing import Optional import dagster._check as check @@ -39,13 +38,12 @@ from dagster._core.test_utils import assert_namedtuple_lists_equal from dagster._seven.compat.pendulum import create_pendulum_time +# @pytest.fixture(autouse=True) +# def error_on_warning(): +# # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml +# warnings.resetwarnings() -@pytest.fixture(autouse=True) -def error_on_warning(): - # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml - warnings.resetwarnings() - - warnings.filterwarnings("error") +# warnings.filterwarnings("error") def get_upstream_partitions_for_partition_range( diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py new file mode 100644 index 0000000000000..ad54ec0770ad5 --- /dev/null +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py @@ -0,0 +1,293 @@ +import warnings +from typing import List, Union + +import pytest +from dagster import ( + AssetExecutionContext, + AssetKey, + AssetsDefinition, + DailyPartitionsDefinition, + OpExecutionContext, + asset, + job, + materialize, + op, +) +from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.decorators.asset_decorator import multi_asset +from dagster._core.definitions.partition import StaticPartitionsDefinition +from dagster._core.storage.io_manager import IOManager +from dagster._core.storage.tags import ( + ASSET_PARTITION_RANGE_END_TAG, + ASSET_PARTITION_RANGE_START_TAG, +) + + +def test_base_asset_execution_context() -> None: + called = {"yup": False} + + @asset + def an_asset(context: AssetExecutionContext): + assert isinstance(context, AssetExecutionContext) + called["yup"] = True + + assert materialize([an_asset]).success + assert called["yup"] + + +def test_isinstance_op_execution_context_asset_execution_context() -> None: + called = {"yup": False} + + @asset + def an_asset(context: AssetExecutionContext): + # we make this work for backwards compat + assert isinstance(context, OpExecutionContext) + assert type(context) is AssetExecutionContext + called["yup"] = True + + assert materialize([an_asset]).success + assert called["yup"] + + +def test_op_gets_actual_op_execution_context() -> None: + called = {"yup": False} + + @op + def an_op(context: OpExecutionContext): + # we make this work for backwards compat + assert isinstance(context, OpExecutionContext) + assert type(context) is OpExecutionContext + called["yup"] = True + + @job + def a_job(): + an_op() + + assert a_job.execute_in_process().success + assert called["yup"] + + +def test_run_id_in_asset_execution_context() -> None: + called = {"yup": False} + + @asset + def an_asset(context: AssetExecutionContext): + # we make this work for backwards compat + assert isinstance(context, OpExecutionContext) + assert type(context) is AssetExecutionContext + assert context.run_id + called["yup"] = True + + assert materialize([an_asset]).success + assert called["yup"] + + +def test_basic_static_partitioning() -> None: + called = {"yup": False} + + @asset(partitions_def=StaticPartitionsDefinition(["foo", "bar"])) + def a_partitioned_asset(context: AssetExecutionContext): + assert context.partition_key_range.start == "bar" + assert context.partition_key_range.end == "bar" + called["yup"] = True + + assert materialize([a_partitioned_asset], partition_key="bar").success + assert called["yup"] + + +# neither our python apis nor our default i/o manager support support partition ranges +# so I am forced to write this helper 😐 +def materialize_single_run_with_partition_key_range( + assets_def: Union[AssetsDefinition, List[AssetsDefinition]], start: str, end: str +): + # our default io manager does not handle partition ranges + class DevNullIOManager(IOManager): + def handle_output(self, context, obj) -> None: + ... + + def load_input(self, context) -> None: + ... + + return materialize( + [assets_def] if isinstance(assets_def, AssetsDefinition) else assets_def, + tags={ + ASSET_PARTITION_RANGE_START_TAG: start, + ASSET_PARTITION_RANGE_END_TAG: end, + }, + resources={"io_manager": DevNullIOManager()}, + ) + + +def test_basic_daily_partitioning() -> None: + called = {"yup": False} + + @asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-01-03")) + def a_partitioned_asset(context: AssetExecutionContext): + assert context.partition_key_range.start == "2020-01-01" + assert context.partition_key_range.end == "2020-01-02" + called["yup"] = True + + assert materialize_single_run_with_partition_key_range( + a_partitioned_asset, start="2020-01-01", end="2020-01-02" + ).success + assert called["yup"] + + +def test_basic_daily_partitioning_two_assets() -> None: + called = {"upstream": False, "downstream": False} + partitions_def = DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-01-03") + + @asset(partitions_def=partitions_def) + def upstream(context: AssetExecutionContext): + assert context.partition_key_range.start == "2020-01-01" + assert context.partition_key_range.end == "2020-01-02" + called[context.asset_key.to_user_string()] = True + + @asset(deps=[upstream], partitions_def=partitions_def) + def downstream(context: AssetExecutionContext): + assert context.partition_key_range.start == "2020-01-01" + assert context.partition_key_range.end == "2020-01-02" + called[context.asset_key.to_user_string()] = True + + assert materialize_single_run_with_partition_key_range( + [upstream, downstream], start="2020-01-01", end="2020-01-02" + ).success + + assert called["upstream"] + assert called["downstream"] + + +def test_basic_daily_partitioning_multi_asset() -> None: + partitions_def = DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-01-03") + called = {"yup": False} + + @multi_asset( + specs=[AssetSpec("asset_one"), AssetSpec("asset_two")], partitions_def=partitions_def + ) + def a_multi_asset(context: AssetExecutionContext): + assert context.selected_asset_keys == {AssetKey("asset_one"), AssetKey("asset_two")} + assert context.partition_key_range.start == "2020-01-01" + assert context.partition_key_range.end == "2020-01-02" + called["yup"] = True + + assert materialize_single_run_with_partition_key_range( + a_multi_asset, start="2020-01-01", end="2020-01-02" + ).success + assert called["yup"] + + +def test_handle_partition_mapping() -> None: + ... + # TODO + # daily_partitions_def = DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-03-01") + + # @asset(partitions_def=daily_partitions_def) + # def daily_partitioned_asset(context: AssetExecutionContext): + # raise Exception("not executed") + # assert context.partition_key_range.start == "2020-01-01" + # assert context.partition_key_range.end == "2020-01-02" + + # monthly_partitions_def = MonthlyPartitionsDefinition(start_date="2020-01-01", end_date="2020-03-01") + + # @asset(partitions_def=monthly_partitions_def, deps=[daily_partitioned_asset]) + # def downstream_monthly_partitioned_asset(context: AssetExecutionContext): + # print(context.partition_key_range_for_asset_key(AssetKey("daily_partitioned_asset"))) + + # materialize_single_run_with_partition_key_range([downstream_monthly_partitioned_asset], start="2020-01-01", end="2020-02-01") + + +def test_time_window_methods() -> None: + called = {"yup": False} + + @asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-01-03")) + def a_partitioned_asset(context: AssetExecutionContext): + ptw = context.op_execution_context.partition_time_window + assert ptw.start.day == 1 + assert ptw.end.day == 2 + called["yup"] = True + + # time windows do not work on ranges + # assert materialize_single_run_with_partition_key_range( + # a_partitioned_asset, start="2020-01-01", end="2020-01-01" + # ).success + assert materialize([a_partitioned_asset], partition_key="2020-01-01").success + assert called["yup"] + + +@pytest.fixture +def error_on_warning(): + # I couldn't get these to fire otherwise ¯\_(ツ)_/¯ + # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml + warnings.resetwarnings() + warnings.filterwarnings("error") + + +def test_io_manager_oriented_warning(error_on_warning) -> None: + called = {"yup": False} + + @asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-01-03")) + def a_partitioned_asset(context: AssetExecutionContext): + with pytest.raises(DeprecationWarning) as exc_info: + assert context.asset_partition_key_for_output("result") == "2020-01-01" + + expected = ( + "AssetExecutionContext.asset_partition_key_for_output is deprecated and will be removed" + " in 1.7. You have called method asset_partition_key_for_output on" + " AssetExecutionContext that is oriented around I/O managers. If you not using I/O" + " managers we suggest you use partition_key_range instead. If you are using I/O" + " managers the method still exists at" + " op_execution_context.asset_partition_key_for_output." + ) + + assert expected in str(exc_info.value) + + called["yup"] = True + + assert materialize([a_partitioned_asset], partition_key="2020-01-01").success + assert called["yup"] + + +def test_generic_op_execution_context_warning(error_on_warning) -> None: + called = {"yup": False} + + @asset + def an_asset(context: AssetExecutionContext): + assert isinstance(context, AssetExecutionContext) + + with pytest.raises(DeprecationWarning) as exc_info: + assert context.file_manager is None + + expected = ( + "AssetExecutionContext.file_manager is deprecated and will be removed in 1.7. You have" + " called the deprecated method file_manager on AssetExecutionContext. Use the" + " underlying OpExecutionContext instead by calling op_execution_context.file_manager." + ) + + assert expected in str(exc_info.value) + + called["yup"] = True + + assert materialize([an_asset]).success + assert called["yup"] + + +def test_alternative_available_warning(error_on_warning) -> None: + called = {"yup": False} + + @asset + def an_asset(context: AssetExecutionContext): + assert isinstance(context, AssetExecutionContext) + + with pytest.raises(DeprecationWarning) as exc_info: + assert context.has_tag("foobar") is False + + expected = ( + "AssetExecutionContext.has_tag is deprecated and will be removed in 1.7. " + "Instead use dagster_run.has_tag instead." + ) + assert expected in str(exc_info.value) + + called["yup"] = True + + assert materialize([an_asset]).success + assert called["yup"]