From 0de310c859ce44841f9af0f31c8ca1c80527122f Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 19 Jan 2024 14:08:41 -0500 Subject: [PATCH] Revert "add back the input methods so we dont make a breaking change" This reverts commit e249a48f5bef3af9c03bfe89db0a5f75b4849f81. --- .../dagster/_core/execution/context/system.py | 41 ------------------- 1 file changed, 41 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index b90876a3d4bab..62dccaef6a032 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1119,11 +1119,6 @@ def has_asset_partitions_for_upstream_asset(self, upstream_asset_key: AssetKey) asset_layer = self.job_def.asset_layer return asset_layer.partitions_def_for_asset(upstream_asset_key) is not None - def has_asset_partitions_for_input(self, input_name: str) -> bool: - asset_layer = self.job_def.asset_layer - asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) - return asset_key is not None and self.has_asset_partitions_for_upstream_asset(asset_key) - def asset_partition_key_range_for_upstream_asset( self, upstream_asset_key: AssetKey ) -> PartitionKeyRange: @@ -1147,15 +1142,6 @@ def asset_partition_key_range_for_upstream_asset( return partition_key_ranges[0] - def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: - asset_key = self.job_def.asset_layer.asset_key_for_input( - node_handle=self.node_handle, input_name=input_name - ) - if asset_key is not None: - return self.asset_partition_key_range_for_upstream_asset(asset_key) - - check.failed("The input has no asset partitions") - def asset_partitions_subset_for_upstream_asset( self, upstream_asset_key: AssetKey, *, require_valid_partitions: bool = True ) -> PartitionsSubset: @@ -1204,17 +1190,6 @@ def asset_partitions_subset_for_upstream_asset( check.failed(f"The asset {upstream_asset_key.to_user_string()} has no asset partitions") - def asset_partitions_subset_for_input( - self, input_name: str, *, require_valid_partitions: bool = True - ) -> PartitionsSubset: - asset_layer = self.job_def.asset_layer - asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) - if asset_key is not None: - return self.asset_partitions_subset_for_upstream_asset( - upstream_asset_key=asset_key, require_valid_partitions=require_valid_partitions - ) - check.failed("The input has no asset partitions") - def asset_partition_key_for_upstream_asset(self, upstream_asset_key: AssetKey) -> str: start, end = self.asset_partition_key_range_for_upstream_asset(upstream_asset_key) if start == end: @@ -1225,13 +1200,6 @@ def asset_partition_key_for_upstream_asset(self, upstream_asset_key: AssetKey) - f" but the step input has a partition range: '{start}' to '{end}'." ) - def asset_partition_key_for_input(self, input_name: str) -> str: - asset_layer = self.job_def.asset_layer - asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) - if asset_key is not None: - return self.asset_partition_key_for_upstream_asset(upstream_asset_key=asset_key) - check.failed("The input has no asset partitions") - def _partitions_def_for_output(self, output_name: str) -> Optional[PartitionsDefinition]: asset_info = self.job_def.asset_layer.asset_info_for_output( node_handle=self.node_handle, output_name=output_name @@ -1337,15 +1305,6 @@ def asset_partitions_time_window_for_upstream_asset( ).end, ) - def asset_partitions_time_window_for_input(self, input_name: str) -> TimeWindow: - asset_layer = self.job_def.asset_layer - asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) - - if asset_key is None: - raise ValueError("The input has no corresponding asset") - - return self.asset_partitions_time_window_for_upstream_asset(upstream_asset_key=asset_key) - def get_type_loader_context(self) -> "DagsterTypeLoaderContext": return DagsterTypeLoaderContext( plan_data=self.plan_data,