diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 62dccaef6a032..b90876a3d4bab 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1119,6 +1119,11 @@ def has_asset_partitions_for_upstream_asset(self, upstream_asset_key: AssetKey) asset_layer = self.job_def.asset_layer return asset_layer.partitions_def_for_asset(upstream_asset_key) is not None + def has_asset_partitions_for_input(self, input_name: str) -> bool: + asset_layer = self.job_def.asset_layer + asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) + return asset_key is not None and self.has_asset_partitions_for_upstream_asset(asset_key) + def asset_partition_key_range_for_upstream_asset( self, upstream_asset_key: AssetKey ) -> PartitionKeyRange: @@ -1142,6 +1147,15 @@ def asset_partition_key_range_for_upstream_asset( return partition_key_ranges[0] + def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: + asset_key = self.job_def.asset_layer.asset_key_for_input( + node_handle=self.node_handle, input_name=input_name + ) + if asset_key is not None: + return self.asset_partition_key_range_for_upstream_asset(asset_key) + + check.failed("The input has no asset partitions") + def asset_partitions_subset_for_upstream_asset( self, upstream_asset_key: AssetKey, *, require_valid_partitions: bool = True ) -> PartitionsSubset: @@ -1190,6 +1204,17 @@ def asset_partitions_subset_for_upstream_asset( check.failed(f"The asset {upstream_asset_key.to_user_string()} has no asset partitions") + def asset_partitions_subset_for_input( + self, input_name: str, *, require_valid_partitions: bool = True + ) -> PartitionsSubset: + asset_layer = self.job_def.asset_layer + asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) + if asset_key is not None: + return self.asset_partitions_subset_for_upstream_asset( + upstream_asset_key=asset_key, require_valid_partitions=require_valid_partitions + ) + check.failed("The input has no asset partitions") + def asset_partition_key_for_upstream_asset(self, upstream_asset_key: AssetKey) -> str: start, end = self.asset_partition_key_range_for_upstream_asset(upstream_asset_key) if start == end: @@ -1200,6 +1225,13 @@ def asset_partition_key_for_upstream_asset(self, upstream_asset_key: AssetKey) - f" but the step input has a partition range: '{start}' to '{end}'." ) + def asset_partition_key_for_input(self, input_name: str) -> str: + asset_layer = self.job_def.asset_layer + asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) + if asset_key is not None: + return self.asset_partition_key_for_upstream_asset(upstream_asset_key=asset_key) + check.failed("The input has no asset partitions") + def _partitions_def_for_output(self, output_name: str) -> Optional[PartitionsDefinition]: asset_info = self.job_def.asset_layer.asset_info_for_output( node_handle=self.node_handle, output_name=output_name @@ -1305,6 +1337,15 @@ def asset_partitions_time_window_for_upstream_asset( ).end, ) + def asset_partitions_time_window_for_input(self, input_name: str) -> TimeWindow: + asset_layer = self.job_def.asset_layer + asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) + + if asset_key is None: + raise ValueError("The input has no corresponding asset") + + return self.asset_partitions_time_window_for_upstream_asset(upstream_asset_key=asset_key) + def get_type_loader_context(self) -> "DagsterTypeLoaderContext": return DagsterTypeLoaderContext( plan_data=self.plan_data,