diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 51a632f36fb44..c57c7ee554021 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -12,7 +12,6 @@ Iterator, List, Mapping, - NamedTuple, Optional, Sequence, Set, @@ -1416,12 +1415,45 @@ def _get_deprecation_kwargs(attr: str): return deprecation_kwargs -class PartitionInfo(NamedTuple): # TODO - this is a bad name, figure out something else - key: Optional[str] - keys: Sequence[str] - key_range: PartitionKeyRange - time_window: Optional[TimeWindow] - definition: PartitionsDefinition +class PartitionInfo: # TODO - this is a bad name, figure out something else + def __init__( + self, + key: Optional[str], + keys: Sequence[str], + key_range: PartitionKeyRange, + time_window: Optional[TimeWindow], + definition: PartitionsDefinition, + ): + self._key = key + self._keys = keys + self._key_range = key_range + self._time_window = time_window + self._definition = definition + + @property + def key(self) -> str: + if self._key is None: + raise DagsterInvariantViolationError( + "Cannot access partition key for a partitioned run with a range of partitions." + " Call key_range instead." + ) + return self._key + + @property + def keys(self) -> Sequence[str]: + return self._keys + + @property + def key_range(self) -> PartitionKeyRange: + return self._key_range + + @property + def time_window(self) -> TimeWindow: + if self._time_window is None: + raise ValueError( + "Tried to get partition time window for an asset that is not time-partitioned." + ) + return self._time_window class AssetExecutionContext(OpExecutionContext):