Skip to content

Commit

Permalink
make the methods work
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 29, 2024
1 parent 6279815 commit 90c247c
Showing 1 changed file with 28 additions and 9 deletions.
37 changes: 28 additions & 9 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,7 @@ def _copy_docs_from_op_execution_context(obj):
"asset_partitions_def_for_input": "upstream_partitions_def",
"asset_partition_keys_for_output": "partition_keys",
"asset_partition_keys_for_input": "upstream_partition_keys",
"asset_partitions_time_window_for_input": "upstream_partitions_time_window"
"asset_partitions_time_window_for_input": "upstream_partitions_time_window",
}

ALTERNATE_EXPRESSIONS = {
Expand Down Expand Up @@ -1506,7 +1506,7 @@ def latest_materialization_for_upstream_asset(

@public
@property
def has_partition_key(self) -> bool: # TODO - maybe this one can be replaced by something else?
def has_partition_key(self) -> bool: # TODO - maybe this one can be replaced by something else?
return self.op_execution_context.has_partition_key

@public
Expand All @@ -1531,24 +1531,43 @@ def partition_time_window(self) -> TimeWindow:

@public
def upstream_partition_key(self, key: CoercibleToAssetKey) -> str:
return self.op_execution_context.asset_partition_key_for_input(input_name)
return self._step_execution_context.asset_partition_key_for_upstream(
AssetKey.from_coercible(key)
)

@public
def upstream_partition_keys(self, key: CoercibleToAssetKey) -> Sequence[str]:
return self.op_execution_context.asset_partition_keys_for_input(input_name=input_name)
return list(
self._step_execution_context.asset_partitions_subset_for_upstream(
AssetKey.from_coercible(key)
).get_partition_keys()
)

@public
def upstream_partition_key_range(self, key: CoercibleToAssetKey) -> PartitionKeyRange:
return self.op_execution_context.asset_partition_key_range_for_input(input_name)
return self._step_execution_context.asset_partition_key_range_for_upstream(
AssetKey.from_coercible(key)
)

@public
def upstream_partitions_time_window(self, key: CoercibleToAssetKey) -> TimeWindow: # TODO align on plurality of partition(s)
return self.op_execution_context.asset_partitions_time_window_for_input(input_name)
def upstream_partitions_time_window(
self, key: CoercibleToAssetKey
) -> TimeWindow: # TODO align on plurality of partition(s)
return self._step_execution_context.asset_partitions_time_window_for_upstream(
AssetKey.from_coercible(key)
)

@public
@_copy_docs_from_op_execution_context
def upstream_partitions_def(self, key: CoercibleToAssetKey) -> PartitionsDefinition:
return self.op_execution_context.asset_partitions_def_for_input(input_name=input_name)
result = self._step_execution_context.job_def.asset_layer.partitions_def_for_asset(
AssetKey.from_coercible(key)
)
if result is None:
raise DagsterInvariantViolationError(
f"Attempting to access partitions def for asset {key}, but it is not" " partitioned"
)

return result

######## Deprecated methods

Expand Down

0 comments on commit 90c247c

Please sign in to comment.