diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index b4f60d644e8e0..1aa41b4b630f1 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1415,7 +1415,7 @@ def __new__( ) -class AssetExecutionContext: +class AssetExecutionContext(OpExecutionContext): def __init__(self, op_execution_context: OpExecutionContext) -> None: self._op_execution_context = check.inst_param( op_execution_context, "op_execution_context", OpExecutionContext @@ -1599,6 +1599,30 @@ def my_asset(context: AssetExecutionContext): """ return self.op_execution_context.partition_key + @public + @property + def partition_keys(self) -> Sequence[str]: + """Returns a list of the partition keys for the current run. + + If you want to write your asset to support running a backfill of several partitions in a single run, + you can use ``partition_keys`` to get all of the partitions being materialized + by the backfill. + + Examples: + .. code-block:: python + + partitions_def = DailyPartitionsDefinition("2023-08-20") + + @asset(partitions_def=partitions_def) + def an_asset(context: AssetExecutionContext): + context.log.info(context.partition_keys) + + + # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log: + # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"] + """ + return self.op_execution_context.partition_keys + @deprecated(breaking_version="2.0", additional_warn_text="Use `partition_key_range` instead.") @public @property @@ -2460,6 +2484,10 @@ def set_data_version(self, asset_key: AssetKey, data_version: DataVersion) -> No def asset_check_spec(self) -> AssetCheckSpec: return self.op_execution_context.asset_check_spec + @property + def is_subset(self): + return self.op_execution_context.is_subset + # In this mode no conversion is done on returned values and missing but expected outputs are not # allowed. @property diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py index 7cc385922b460..d9f73b2734454 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py @@ -76,6 +76,9 @@ def test_deprecation_warnings(): "consume_events", "log_event", "get_asset_provenance", + "is_subset", + "partition_keys", + "get", ] other_ignores = [ @@ -156,8 +159,9 @@ def test_context(context: AssetExecutionContext): assert_deprecation_messages_as_expected(deprecation_info, expected_deprecation_args) else: raise Exception( - f"Attribute {method} not accounted for in AssetExecutionContext deprecation" - " test" + f"Method {method} on OpExecutionContext not accounted for in AssetExecutionContext deprecation" + f" test. Ensure that the method {method} exists on AssetExecutionContext, or is explicitly ignored in" + " the test." ) materialize([test_context]) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py index 6bd4da58fb3d2..bb24c3119f133 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py @@ -58,7 +58,7 @@ def __init__(self): @op def test_op_context_instance_check(context: OpExecutionContext): step_context = context._step_execution_context # noqa: SLF001 - asset_context = AssetExecutionContext(step_execution_context=step_context) + asset_context = AssetExecutionContext(op_execution_context=context) op_context = OpExecutionContext(step_execution_context=step_context) with pytest.raises(DeprecationWarning): isinstance(asset_context, OpExecutionContext)