From da34a6a8e33d62597cfdc6920cd46d9f4e91dc0e Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 6 Sep 2023 16:14:06 -0400 Subject: [PATCH] some renaming and clean up --- .../_core/execution/context/compute.py | 6 +- .../dagster/_core/execution/context/system.py | 172 ++---------------- 2 files changed, 21 insertions(+), 157 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 8ff23a2dd0d83..3fe8a0c66c3be 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -615,7 +615,7 @@ def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset): # "2023-08-21" """ - return self._step_execution_context.asset_partition_key_for_asset( + return self._step_execution_context.partition_key_for_asset( asset, is_dependency=is_dependency ) @@ -736,7 +736,7 @@ def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset): # PartitionKeyRange(start="2023-08-21", end="2023-08-25") """ - return self._step_execution_context.asset_partition_key_range_for_asset( + return self._step_execution_context.partition_key_range_for_asset( asset, is_dependency=is_dependency ) @@ -830,7 +830,7 @@ def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset): # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"] """ - return self._step_execution_context.asset_partition_keys_for_asset(asset) + return self._step_execution_context.partition_keys_for_asset(asset) @public @experimental diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 1274f90a8fb21..37d50eeb958dc 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -664,8 +664,7 @@ def for_input_manager( node_handle=self.node_handle, input_name=name ) asset_partitions_subset = ( - # TODO - need to solve the self partition issue with method refactor!! - self.asset_partitions_subset_for_input(asset_key) + self.partitions_subset_for_upstream_asset(asset_key) if self.has_asset_partitions_for_input(name) else None ) @@ -937,7 +936,9 @@ def _fetch_input_asset_version_info(self, key: AssetKey) -> None: # Exclude AllPartitionMapping for now to avoid huge queries # TODO - need to solve the self dependent partition issue with the method refactors if input_name and self.has_asset_partitions_for_input(input_name): - subset = self.asset_partitions_subset_for_input(key, require_valid_partitions=False) + subset = self.partitions_subset_for_upstream_asset( + key, require_valid_partitions=False + ) input_keys = list(subset.get_partition_keys()) # This check represents a temporary constraint that prevents huge query results for upstream @@ -960,24 +961,6 @@ def _fetch_input_asset_version_info(self, key: AssetKey) -> None: storage_id, data_version, event.run_id, event.timestamp ) - # def partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]: - # asset_layer = self.job_def.asset_layer - # upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) - # if upstream_asset_key: - # upstream_asset_partitions_def = asset_layer.partitions_def_for_asset(upstream_asset_key) - # assets_def = asset_layer.assets_def_for_node(self.node_handle) - # partitions_def = assets_def.partitions_def if assets_def else None - # explicit_partition_mapping = self.job_def.asset_layer.partition_mapping_for_node_input( - # self.node_handle, upstream_asset_key - # ) - # return infer_partition_mapping( - # explicit_partition_mapping, - # partitions_def, - # upstream_asset_partitions_def, - # ) - # else: - # return None - def _get_input_asset_event(self, key: AssetKey) -> Optional["EventLogRecord"]: event = self.instance.get_latest_data_version_record(key) if event: @@ -1037,6 +1020,8 @@ def get_output_asset_keys(self) -> AbstractSet[AssetKey]: output_keys.add(asset_info.key) return output_keys + #### Partitions methods + def has_asset_partitions_for_input(self, input_name: str) -> bool: asset_layer = self.job_def.asset_layer upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) @@ -1046,20 +1031,6 @@ def has_asset_partitions_for_input(self, input_name: str) -> bool: and asset_layer.partitions_def_for_asset(upstream_asset_key) is not None ) - # def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: - # subset = self.asset_partitions_subset_for_input(input_name) - # partition_key_ranges = subset.get_partition_key_ranges( - # dynamic_partitions_store=self.instance - # ) - - # if len(partition_key_ranges) != 1: - # check.failed( - # "Tried to access asset partition key range, but there are " - # f"({len(partition_key_ranges)}) key ranges associated with this input.", - # ) - - # return partition_key_ranges[0] - def _load_partition_info_as_upstream_asset( self, current_asset: CoercibleToAssetKey, is_dependency: bool ) -> bool: @@ -1101,23 +1072,23 @@ def partitions_def_for_asset(self, asset: CoercibleToAssetKey) -> PartitionsDefi ) return result - def asset_partition_keys_for_asset( + def partition_keys_for_asset( self, asset: CoercibleToAssetKey, is_dependency: bool = False ) -> Sequence[str]: if self._load_partition_info_as_upstream_asset(asset, is_dependency=is_dependency): - return list(self.asset_partitions_subset_for_input(asset).get_partition_keys()) + return list(self.partitions_subset_for_upstream_asset(asset).get_partition_keys()) return self.partitions_def_for_asset(asset).get_partition_keys_in_range( - self.asset_partition_key_range_for_asset(asset), + self.partition_key_range_for_asset(asset), dynamic_partitions_store=self.instance, ) - def asset_partition_key_range_for_asset( + def partition_key_range_for_asset( self, asset: CoercibleToAssetKey, is_dependency: bool = False ) -> PartitionKeyRange: if self._load_partition_info_as_upstream_asset( current_asset=asset, is_dependency=is_dependency ): - subset = self.asset_partitions_subset_for_input(asset) + subset = self.partitions_subset_for_upstream_asset(asset) partition_key_ranges = subset.get_partition_key_ranges( dynamic_partitions_store=self.instance ) @@ -1132,7 +1103,7 @@ def asset_partition_key_range_for_asset( else: return self.asset_partition_key_range - def asset_partitions_subset_for_input( + def partitions_subset_for_upstream_asset( self, asset: CoercibleToAssetKey, *, require_valid_partitions: bool = True ) -> PartitionsSubset: asset_layer = self.job_def.asset_layer @@ -1181,99 +1152,10 @@ def asset_partitions_subset_for_input( check.failed("The input has no asset partitions") - # def asset_partitions_subset_for_asset( - # self, - # asset: CoercibleToAssetKey, - # *, - # require_valid_partitions: bool = True, - # is_dependency: bool = False, - # ) -> PartitionsSubset: - # asset_layer = self.job_def.asset_layer - # materializing_assets_def = asset_layer.assets_def_for_node(self.node_handle) - # asset_key = AssetKey.from_coercible(asset) - - # if asset_layer.has_assets_def_for_asset(asset_key) or ( - # asset_layer.source_assets_by_key.get(asset_key, None) is not None - # ): - # asset_partitions_def = asset_layer.partitions_def_for_asset(asset_key) - - # if asset_partitions_def is None: - # check.failed(f"The asset {asset_key} has no partitions") - - # is_resulting_asset = ( - # materializing_assets_def - # and asset_key in materializing_assets_def.keys_by_output_name.values() - # ) - # is_upstream_asset = ( - # materializing_assets_def - # and asset_key in materializing_assets_def.keys_by_input_name.values() - # ) - - # get_partition_subset_as_upstream_asset = ( - # is_upstream_asset and not is_resulting_asset - # ) or (is_upstream_asset and is_resulting_asset and is_dependency) - - # materializing_assets_partitions_def = ( - # materializing_assets_def.partitions_def if materializing_assets_def else None - # ) - # partitions_subset = ( - # materializing_assets_partitions_def.empty_subset().with_partition_key_range( - # self.asset_partition_key_range, dynamic_partitions_store=self.instance - # ) - # if materializing_assets_partitions_def - # else None - # ) - - # if get_partition_subset_as_upstream_asset: - - # partition_mapping = infer_partition_mapping( - # asset_layer.partition_mapping_for_node_input(self.node_handle, asset_key), - # materializing_assets_partitions_def, - # asset_partitions_def, - # ) - # mapped_partitions_result = ( - # partition_mapping.get_upstream_mapped_partitions_result_for_partitions( - # partitions_subset, - # asset_partitions_def, - # dynamic_partitions_store=self.instance, - # ) - # ) - - # if ( - # require_valid_partitions - # and mapped_partitions_result.required_but_nonexistent_partition_keys - # ): - # raise DagsterInvariantViolationError( - # f"Partition key range {self.asset_partition_key_range} in" - # f" {self.node_handle.name} depends on invalid partition keys" - # f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in" - # f" asset {asset_key}" - # ) - - # return mapped_partitions_result.partitions_subset - # else: - # return partitions_subset - # check.failed(f"The asset {asset} has no partitions") - - # def asset_partition_key_for_input(self, input_name: str) -> Union[str, MultiPartitionKey]: - # asset_layer = self.job_def.asset_layer - # upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) - # if upstream_asset_key is not None: - # start, end = self.asset_partition_key_range_for_asset(upstream_asset_key) - # else: - # check.failed(f"No asset corresponds to input '{input_name}' or step '{self.step.key}'") - # if start == end: - # return start - # else: - # check.failed( - # f"Tried to access partition key for input '{input_name}' of step '{self.step.key}'," - # f" but the step input has a partition range: '{start}' to '{end}'." - # ) - - def asset_partition_key_for_asset( + def partition_key_for_asset( self, asset: CoercibleToAssetKey, is_dependency: bool = False ) -> Union[str, MultiPartitionKey]: - start, end = self.asset_partition_key_range_for_asset(asset, is_dependency=is_dependency) + start, end = self.partition_key_range_for_asset(asset, is_dependency=is_dependency) if start == end: return start else: @@ -1315,7 +1197,7 @@ def asset_partition_key_for_output(self, output_name: str) -> str: ) def asset_partitions_time_window_for_output(self, output_name: str) -> TimeWindow: - """The time window for the partitions of the asset correponding to the given output. + """The time window for the partitions of the asset corresponding to the given output. Raises an error if either of the following are true: - The output asset has no partitioning. @@ -1326,27 +1208,11 @@ def asset_partitions_time_window_for_output(self, output_name: str) -> TimeWindo node_handle=self.node_handle, output_name=output_name ) if asset_info: - return self.asset_partitions_time_window_for_asset(asset_info.key) + return self.partitions_time_window_for_asset(asset_info.key) raise ValueError("The provided output does not correspond to an asset.") - # def asset_partitions_time_window_for_input(self, input_name: str) -> TimeWindow: - # """The time window for the partitions of the asset corresponding to the given input. - - # Raises an error if either of the following are true: - # - The input asset has no partitioning. - # - The input asset is not partitioned with a TimeWindowPartitionsDefinition or a - # MultiPartitionsDefinition with one time-partitioned dimension. - # """ - # asset_layer = self.job_def.asset_layer - # upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) - - # if upstream_asset_key is not None: - # return self.asset_partitions_time_window_for_asset(upstream_asset_key) - - # raise ValueError("The provided input does not correspond to an asset.") - - def asset_partitions_time_window_for_asset( + def partitions_time_window_for_asset( self, asset: CoercibleToAssetKey, is_dependency: bool = False ) -> TimeWindow: """The time window for the partitions of the asset. @@ -1383,9 +1249,7 @@ def asset_partitions_time_window_for_asset( Union[TimeWindowPartitionsDefinition, MultiPartitionsDefinition], asset_partitions_def, ) - partition_key_range = self.asset_partition_key_range_for_asset( - asset, is_dependency=is_dependency - ) + partition_key_range = self.partition_key_range_for_asset(asset, is_dependency=is_dependency) return TimeWindow( asset_partitions_def.time_window_for_partition_key(partition_key_range.start).start,