diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 0a83e3b0aa2ba..010507e57c102 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1377,13 +1377,13 @@ def _copy_docs_from_op_execution_context(obj): "asset_partition_key_for_output": "partition_key", "asset_partitions_time_window_for_output": "partition_time_window", "asset_partition_key_range_for_output": "partition_key_range", - "asset_partition_key_range_for_input": "upstream_partition_key_range", - "asset_partition_key_for_input": "upstream_partition_key", + "asset_partition_key_range_for_input": "partition_key_range_for_upstream_asset", + "asset_partition_key_for_input": "partition_key_for_upstream_asset", "asset_partitions_def_for_output": "assets_def.partitions_def", - "asset_partitions_def_for_input": "upstream_partitions_def", + "asset_partitions_def_for_input": "partitions_def_for_upstream_asset", "asset_partition_keys_for_output": "partition_keys", - "asset_partition_keys_for_input": "upstream_partition_keys", - "asset_partitions_time_window_for_input": "upstream_partitions_time_window", + "asset_partition_keys_for_input": "partition_keys_for_upstream_asset", + "asset_partitions_time_window_for_input": "partition_time_window_for_upstream_asset", } ALTERNATE_EXPRESSIONS = { @@ -1530,13 +1530,13 @@ def partition_time_window(self) -> TimeWindow: return self.op_execution_context.partition_time_window @public - def upstream_partition_key(self, key: CoercibleToAssetKey) -> str: + def partition_key_for_upstream_asset(self, key: CoercibleToAssetKey) -> str: return self._step_execution_context.asset_partition_key_for_upstream( AssetKey.from_coercible(key) ) @public - def upstream_partition_keys(self, key: CoercibleToAssetKey) -> Sequence[str]: + def partition_keys_for_upstream_asset(self, key: CoercibleToAssetKey) -> Sequence[str]: return list( self._step_execution_context.asset_partitions_subset_for_upstream( AssetKey.from_coercible(key) @@ -1544,13 +1544,13 @@ def upstream_partition_keys(self, key: CoercibleToAssetKey) -> Sequence[str]: ) @public - def upstream_partition_key_range(self, key: CoercibleToAssetKey) -> PartitionKeyRange: + def partition_key_range_for_upstream_asset(self, key: CoercibleToAssetKey) -> PartitionKeyRange: return self._step_execution_context.asset_partition_key_range_for_upstream( AssetKey.from_coercible(key) ) @public - def upstream_partitions_time_window( + def partition_time_window_for_upstream_asset( self, key: CoercibleToAssetKey ) -> TimeWindow: # TODO align on plurality of partition(s) return self._step_execution_context.asset_partitions_time_window_for_upstream( @@ -1558,7 +1558,7 @@ def upstream_partitions_time_window( ) @public - def upstream_partitions_def(self, key: CoercibleToAssetKey) -> PartitionsDefinition: + def partitions_def_for_upstream_asset(self, key: CoercibleToAssetKey) -> PartitionsDefinition: result = self._step_execution_context.job_def.asset_layer.partitions_def_for_asset( AssetKey.from_coercible(key) ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py index 6da99ce78abdd..d258ac08aac81 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py @@ -600,7 +600,9 @@ def upstream(): ], ) def downstream(context: AssetExecutionContext): - upstream_key = datetime.strptime(context.upstream_partition_key("upstream"), "%Y-%m-%d") + upstream_key = datetime.strptime( + context.partition_key_for_upstream_asset("upstream"), "%Y-%m-%d" + ) current_partition_key = datetime.strptime(context.partition_key, "%Y-%m-%d") @@ -651,8 +653,12 @@ def multi_asset_1(): @multi_asset(specs=[asset_3, asset_4], partitions_def=partitions_def) def multi_asset_2(context: AssetExecutionContext): - asset_1_key = datetime.strptime(context.upstream_partition_key("asset_1"), "%Y-%m-%d") - asset_2_key = datetime.strptime(context.upstream_partition_key("asset_2"), "%Y-%m-%d") + asset_1_key = datetime.strptime( + context.partition_key_for_upstream_asset("asset_1"), "%Y-%m-%d" + ) + asset_2_key = datetime.strptime( + context.partition_key_for_upstream_asset("asset_2"), "%Y-%m-%d" + ) current_partition_key = datetime.strptime(context.partition_key, "%Y-%m-%d") @@ -754,7 +760,7 @@ def test_self_dependent_partition_mapping_with_asset_deps(): ) def self_dependent(context: AssetExecutionContext): upstream_key = datetime.strptime( - context.upstream_partition_key("self_dependent"), "%Y-%m-%d" + context.partition_key_for_upstream_asset("self_dependent"), "%Y-%m-%d" ) current_partition_key = datetime.strptime(context.partition_key, "%Y-%m-%d") @@ -780,7 +786,9 @@ def self_dependent(context: AssetExecutionContext): @multi_asset(specs=[asset_1], partitions_def=partitions_def) def the_multi_asset(context: AssetExecutionContext): - asset_1_key = datetime.strptime(context.upstream_partition_key("asset_1"), "%Y-%m-%d") + asset_1_key = datetime.strptime( + context.partition_key_for_upstream_asset("asset_1"), "%Y-%m-%d" + ) current_partition_key = datetime.strptime(context.partition_key, "%Y-%m-%d") @@ -802,7 +810,7 @@ def upstream(): deps=[AssetDep(upstream, partition_mapping=SpecificPartitionsPartitionMapping(["apple"]))], ) def downstream(context: AssetExecutionContext): - assert context.upstream_partition_key("upstream") == "apple" + assert context.partition_key_for_upstream_asset("upstream") == "apple" assert context.partition_key == "orange" with instance_for_test() as instance: @@ -832,7 +840,7 @@ def asset_1_multi_asset(): @multi_asset(specs=[asset_2], partitions_def=partitions_def) def asset_2_multi_asset(context: AssetExecutionContext): - assert context.upstream_partition_key("asset_1") == "apple" + assert context.partition_key_for_upstream_asset("asset_1") == "apple" assert context.partition_key == "orange" with instance_for_test() as instance: