Skip to content

Commit

Permalink
add back the input methods so we dont make a breaking change
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 29, 2024
1 parent fb10146 commit 6e54a5d
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,11 @@ def has_asset_partitions_for_upstream_asset(self, upstream_asset_key: AssetKey)
asset_layer = self.job_def.asset_layer
return asset_layer.partitions_def_for_asset(upstream_asset_key) is not None

def has_asset_partitions_for_input(self, input_name: str) -> bool:
asset_layer = self.job_def.asset_layer
asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name)
return asset_key is not None and self.has_asset_partitions_for_upstream_asset(asset_key)

def asset_partition_key_range_for_upstream_asset(
self, upstream_asset_key: AssetKey
) -> PartitionKeyRange:
Expand All @@ -1142,6 +1147,15 @@ def asset_partition_key_range_for_upstream_asset(

return partition_key_ranges[0]

def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange:
asset_key = self.job_def.asset_layer.asset_key_for_input(
node_handle=self.node_handle, input_name=input_name
)
if asset_key is not None:
return self.asset_partition_key_range_for_upstream_asset(asset_key)

check.failed("The input has no asset partitions")

def asset_partitions_subset_for_upstream_asset(
self, upstream_asset_key: AssetKey, *, require_valid_partitions: bool = True
) -> PartitionsSubset:
Expand Down Expand Up @@ -1190,6 +1204,17 @@ def asset_partitions_subset_for_upstream_asset(

check.failed(f"The asset {upstream_asset_key.to_user_string()} has no asset partitions")

def asset_partitions_subset_for_input(
self, input_name: str, *, require_valid_partitions: bool = True
) -> PartitionsSubset:
asset_layer = self.job_def.asset_layer
asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name)
if asset_key is not None:
return self.asset_partitions_subset_for_upstream_asset(
upstream_asset_key=asset_key, require_valid_partitions=require_valid_partitions
)
check.failed("The input has no asset partitions")

def asset_partition_key_for_upstream_asset(self, upstream_asset_key: AssetKey) -> str:
start, end = self.asset_partition_key_range_for_upstream_asset(upstream_asset_key)
if start == end:
Expand All @@ -1200,6 +1225,13 @@ def asset_partition_key_for_upstream_asset(self, upstream_asset_key: AssetKey) -
f" but the step input has a partition range: '{start}' to '{end}'."
)

def asset_partition_key_for_input(self, input_name: str) -> str:
asset_layer = self.job_def.asset_layer
asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name)
if asset_key is not None:
return self.asset_partition_key_for_upstream_asset(upstream_asset_key=asset_key)
check.failed("The input has no asset partitions")

def _partitions_def_for_output(self, output_name: str) -> Optional[PartitionsDefinition]:
asset_info = self.job_def.asset_layer.asset_info_for_output(
node_handle=self.node_handle, output_name=output_name
Expand Down Expand Up @@ -1305,6 +1337,15 @@ def asset_partitions_time_window_for_upstream_asset(
).end,
)

def asset_partitions_time_window_for_input(self, input_name: str) -> TimeWindow:
asset_layer = self.job_def.asset_layer
asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name)

if asset_key is None:
raise ValueError("The input has no corresponding asset")

return self.asset_partitions_time_window_for_upstream_asset(upstream_asset_key=asset_key)

def get_type_loader_context(self) -> "DagsterTypeLoaderContext":
return DagsterTypeLoaderContext(
plan_data=self.plan_data,
Expand Down

0 comments on commit 6e54a5d

Please sign in to comment.