Skip to content

Commit

Permalink
update naming
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 29, 2024
1 parent a0cd68d commit 902a82b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 17 deletions.
20 changes: 10 additions & 10 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,13 +1378,13 @@ def _copy_docs_from_op_execution_context(obj):
"asset_partition_key_for_output": "partition_key",
"asset_partitions_time_window_for_output": "partition_time_window",
"asset_partition_key_range_for_output": "partition_key_range",
"asset_partition_key_range_for_input": "upstream_partition_key_range",
"asset_partition_key_for_input": "upstream_partition_key",
"asset_partition_key_range_for_input": "partition_key_range_for_upstream_asset",
"asset_partition_key_for_input": "partition_key_for_upstream_asset",
"asset_partitions_def_for_output": "assets_def.partitions_def",
"asset_partitions_def_for_input": "upstream_partitions_def",
"asset_partitions_def_for_input": "partitions_def_for_upstream_asset",
"asset_partition_keys_for_output": "partition_keys",
"asset_partition_keys_for_input": "upstream_partition_keys",
"asset_partitions_time_window_for_input": "upstream_partitions_time_window",
"asset_partition_keys_for_input": "partition_keys_for_upstream_asset",
"asset_partitions_time_window_for_input": "partition_time_window_for_upstream_asset",
}

ALTERNATE_EXPRESSIONS = {
Expand Down Expand Up @@ -1531,35 +1531,35 @@ def partition_time_window(self) -> TimeWindow:
return self.op_execution_context.partition_time_window

@public
def upstream_partition_key(self, key: CoercibleToAssetKey) -> str:
def partition_key_for_upstream_asset(self, key: CoercibleToAssetKey) -> str:
return self._step_execution_context.asset_partition_key_for_upstream(
AssetKey.from_coercible(key)
)

@public
def upstream_partition_keys(self, key: CoercibleToAssetKey) -> Sequence[str]:
def partition_keys_for_upstream_asset(self, key: CoercibleToAssetKey) -> Sequence[str]:
return list(
self._step_execution_context.asset_partitions_subset_for_upstream(
AssetKey.from_coercible(key)
).get_partition_keys()
)

@public
def upstream_partition_key_range(self, key: CoercibleToAssetKey) -> PartitionKeyRange:
def partition_key_range_for_upstream_asset(self, key: CoercibleToAssetKey) -> PartitionKeyRange:
return self._step_execution_context.asset_partition_key_range_for_upstream(
AssetKey.from_coercible(key)
)

@public
def upstream_partitions_time_window(
def partition_time_window_for_upstream_asset(
self, key: CoercibleToAssetKey
) -> TimeWindow: # TODO align on plurality of partition(s)
return self._step_execution_context.asset_partitions_time_window_for_upstream(
AssetKey.from_coercible(key)
)

@public
def upstream_partitions_def(self, key: CoercibleToAssetKey) -> PartitionsDefinition:
def partitions_def_for_upstream_asset(self, key: CoercibleToAssetKey) -> PartitionsDefinition:
result = self._step_execution_context.job_def.asset_layer.partitions_def_for_asset(
AssetKey.from_coercible(key)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,9 @@ def upstream():
],
)
def downstream(context: AssetExecutionContext):
upstream_key = datetime.strptime(context.upstream_partition_key("upstream"), "%Y-%m-%d")
upstream_key = datetime.strptime(
context.partition_key_for_upstream_asset("upstream"), "%Y-%m-%d"
)

current_partition_key = datetime.strptime(context.partition_key, "%Y-%m-%d")

Expand Down Expand Up @@ -651,8 +653,12 @@ def multi_asset_1():

@multi_asset(specs=[asset_3, asset_4], partitions_def=partitions_def)
def multi_asset_2(context: AssetExecutionContext):
asset_1_key = datetime.strptime(context.upstream_partition_key("asset_1"), "%Y-%m-%d")
asset_2_key = datetime.strptime(context.upstream_partition_key("asset_2"), "%Y-%m-%d")
asset_1_key = datetime.strptime(
context.partition_key_for_upstream_asset("asset_1"), "%Y-%m-%d"
)
asset_2_key = datetime.strptime(
context.partition_key_for_upstream_asset("asset_2"), "%Y-%m-%d"
)

current_partition_key = datetime.strptime(context.partition_key, "%Y-%m-%d")

Expand Down Expand Up @@ -754,7 +760,7 @@ def test_self_dependent_partition_mapping_with_asset_deps():
)
def self_dependent(context: AssetExecutionContext):
upstream_key = datetime.strptime(
context.upstream_partition_key("self_dependent"), "%Y-%m-%d"
context.partition_key_for_upstream_asset("self_dependent"), "%Y-%m-%d"
)

current_partition_key = datetime.strptime(context.partition_key, "%Y-%m-%d")
Expand All @@ -780,7 +786,9 @@ def self_dependent(context: AssetExecutionContext):

@multi_asset(specs=[asset_1], partitions_def=partitions_def)
def the_multi_asset(context: AssetExecutionContext):
asset_1_key = datetime.strptime(context.upstream_partition_key("asset_1"), "%Y-%m-%d")
asset_1_key = datetime.strptime(
context.partition_key_for_upstream_asset("asset_1"), "%Y-%m-%d"
)

current_partition_key = datetime.strptime(context.partition_key, "%Y-%m-%d")

Expand All @@ -802,7 +810,7 @@ def upstream():
deps=[AssetDep(upstream, partition_mapping=SpecificPartitionsPartitionMapping(["apple"]))],
)
def downstream(context: AssetExecutionContext):
assert context.upstream_partition_key("upstream") == "apple"
assert context.partition_key_for_upstream_asset("upstream") == "apple"
assert context.partition_key == "orange"

with instance_for_test() as instance:
Expand Down Expand Up @@ -832,7 +840,7 @@ def asset_1_multi_asset():

@multi_asset(specs=[asset_2], partitions_def=partitions_def)
def asset_2_multi_asset(context: AssetExecutionContext):
assert context.upstream_partition_key("asset_1") == "apple"
assert context.partition_key_for_upstream_asset("asset_1") == "apple"
assert context.partition_key == "orange"

with instance_for_test() as instance:
Expand Down

0 comments on commit 902a82b

Please sign in to comment.