Skip to content

Commit

Permalink
make the methods work
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 4, 2024
1 parent cbabb1b commit 9704201
Showing 1 changed file with 29 additions and 9 deletions.
38 changes: 29 additions & 9 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,7 @@ def _copy_docs_from_op_execution_context(obj):
"asset_partitions_def_for_input": "upstream_partitions_def",
"asset_partition_keys_for_output": "partition_keys",
"asset_partition_keys_for_input": "upstream_partition_keys",
"asset_partitions_time_window_for_input": "upstream_partitions_time_window"
"asset_partitions_time_window_for_input": "upstream_partitions_time_window",
}

ALTERNATE_EXPRESSIONS = {
Expand Down Expand Up @@ -1413,6 +1413,7 @@ def __init__(self, op_execution_context: OpExecutionContext) -> None:
self._op_execution_context = check.inst_param(
op_execution_context, "op_execution_context", OpExecutionContext
)
self._step_execution_context = self._op_execution_context._step_execution_context # noqa: SLF001

@staticmethod
def get() -> "AssetExecutionContext":
Expand Down Expand Up @@ -1494,7 +1495,7 @@ def latest_materialization_event(

@public
@property
def has_partition_key(self) -> bool: # TODO - maybe this one can be replaced by something else?
def has_partition_key(self) -> bool: # TODO - maybe this one can be replaced by something else?
return self.op_execution_context.has_partition_key

@public
Expand All @@ -1519,24 +1520,43 @@ def partition_time_window(self) -> TimeWindow:

@public
def upstream_partition_key(self, key: CoercibleToAssetKey) -> str:
return self.op_execution_context.asset_partition_key_for_input(input_name)
return self._step_execution_context.asset_partition_key_for_upstream(
AssetKey.from_coercible(key)
)

@public
def upstream_partition_keys(self, key: CoercibleToAssetKey) -> Sequence[str]:
return self.op_execution_context.asset_partition_keys_for_input(input_name=input_name)
return list(
self._step_execution_context.asset_partitions_subset_for_upstream(
AssetKey.from_coercible(key)
).get_partition_keys()
)

@public
def upstream_partition_key_range(self, key: CoercibleToAssetKey) -> PartitionKeyRange:
return self.op_execution_context.asset_partition_key_range_for_input(input_name)
return self._step_execution_context.asset_partition_key_range_for_upstream(
AssetKey.from_coercible(key)
)

@public
def upstream_partitions_time_window(self, key: CoercibleToAssetKey) -> TimeWindow: # TODO align on plurality of partition(s)
return self.op_execution_context.asset_partitions_time_window_for_input(input_name)
def upstream_partitions_time_window(
self, key: CoercibleToAssetKey
) -> TimeWindow: # TODO align on plurality of partition(s)
return self._step_execution_context.asset_partitions_time_window_for_upstream(
AssetKey.from_coercible(key)
)

@public
@_copy_docs_from_op_execution_context
def upstream_partitions_def(self, key: CoercibleToAssetKey) -> PartitionsDefinition:
return self.op_execution_context.asset_partitions_def_for_input(input_name=input_name)
result = self._step_execution_context.job_def.asset_layer.partitions_def_for_asset(
AssetKey.from_coercible(key)
)
if result is None:
raise DagsterInvariantViolationError(
f"Attempting to access partitions def for asset {key}, but it is not" " partitioned"
)

return result

######## Deprecated methods

Expand Down

0 comments on commit 9704201

Please sign in to comment.