Skip to content

Commit

Permalink
make it a class
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 29, 2024
1 parent 89c84ff commit a44e758
Showing 1 changed file with 39 additions and 7 deletions.
46 changes: 39 additions & 7 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
Iterator,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Expand Down Expand Up @@ -1416,12 +1415,45 @@ def _get_deprecation_kwargs(attr: str):
return deprecation_kwargs


class PartitionInfo(NamedTuple): # TODO - this is a bad name, figure out something else
key: Optional[str]
keys: Sequence[str]
key_range: PartitionKeyRange
time_window: Optional[TimeWindow]
definition: PartitionsDefinition
class PartitionInfo: # TODO - this is a bad name, figure out something else
def __init__(
self,
key: Optional[str],
keys: Sequence[str],
key_range: PartitionKeyRange,
time_window: Optional[TimeWindow],
definition: PartitionsDefinition,
):
self._key = key
self._keys = keys
self._key_range = key_range
self._time_window = time_window
self._definition = definition

@property
def key(self) -> str:
if self._key is None:
raise DagsterInvariantViolationError(
"Cannot access partition key for a partitioned run with a range of partitions."
" Call key_range instead."
)
return self._key

@property
def keys(self) -> Sequence[str]:
return self._keys

@property
def key_range(self) -> PartitionKeyRange:
return self._key_range

@property
def time_window(self) -> TimeWindow:
if self._time_window is None:
raise ValueError(
"Tried to get partition time window for an asset that is not time-partitioned."
)
return self._time_window


class AssetExecutionContext(OpExecutionContext):
Expand Down

0 comments on commit a44e758

Please sign in to comment.