diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index f5fa06304993e..3e36bb4ef7c7b 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -662,7 +662,7 @@ def for_input_manager( ) asset_partitions_subset = ( # TODO - need to solve the self partition issue with method refactor!! - self.asset_partitions_subset_for_input(name) + self.asset_partitions_subset_for_input(asset_key) if self.has_asset_partitions_for_input(name) else None ) @@ -934,9 +934,7 @@ def _fetch_input_asset_version_info(self, key: AssetKey) -> None: # Exclude AllPartitionMapping for now to avoid huge queries # TODO - need to solve the self dependent partition issue with the method refactors if input_name and self.has_asset_partitions_for_input(input_name): - subset = self.asset_partitions_subset_for_input( - input_name, require_valid_partitions=False - ) + subset = self.asset_partitions_subset_for_input(key, require_valid_partitions=False) input_keys = list(subset.get_partition_keys()) # This check represents a temporary constraint that prevents huge query results for upstream @@ -1062,84 +1060,77 @@ def has_asset_partitions_for_input(self, input_name: str) -> bool: def asset_partition_key_range_for_asset( self, asset: CoercibleToAssetKey, is_dependency: bool = False ) -> PartitionKeyRange: - subset = self.asset_partitions_subset_for_asset(asset, is_dependency=is_dependency) - partition_key_ranges = subset.get_partition_key_ranges( - dynamic_partitions_store=self.instance + # if the asset we are getting the key_range for is a parent of the asset that is currently + # materializing, we need to load the key_range in a different way. So we must first figure out + # if asset is the parent of the currently materializing asset + + asset_layer = self.job_def.asset_layer + currently_materializing_assets_def = asset_layer.assets_def_for_node(self.node_handle) + asset_key = AssetKey.from_coercible(asset) + + is_resulting_asset = ( + currently_materializing_assets_def + and asset_key in currently_materializing_assets_def.keys_by_output_name.values() + ) + is_parent_asset = ( + currently_materializing_assets_def + and asset_key in currently_materializing_assets_def.keys_by_input_name.values() ) - if len(partition_key_ranges) != 1: - check.failed( - "Tried to access asset partition key range, but there are " - f"({len(partition_key_ranges)}) key ranges associated with this input.", + # when `asset` is a self-dependent partitioned asset, then is_resulting_asset and is_upstream_asset + # are both True. In this case, we defer to the user-provided is_dependency parameter. If + # is_dependency is True, then we return the key range of asset as a parent asset + get_partition_key_range_as_parent_asset = (is_parent_asset and not is_resulting_asset) or ( + is_parent_asset and is_resulting_asset and is_dependency + ) + + if get_partition_key_range_as_parent_asset: + subset = self.asset_partitions_subset_for_input(asset) + partition_key_ranges = subset.get_partition_key_ranges( + dynamic_partitions_store=self.instance ) - return partition_key_ranges[0] + if len(partition_key_ranges) != 1: + check.failed( + "Tried to access asset partition key range, but there are " + f"({len(partition_key_ranges)}) key ranges associated with this input.", + ) + + return partition_key_ranges[0] + else: + return self.asset_partition_key_range def asset_partitions_subset_for_input( - self, input_name: str, *, require_valid_partitions: bool = True + self, asset: CoercibleToAssetKey, *, require_valid_partitions: bool = True ) -> PartitionsSubset: + # TODO - change other callsites to get the asset key before passing asset_layer = self.job_def.asset_layer - upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) + assets_def = asset_layer.assets_def_for_node(self.node_handle) + upstream_asset_key = AssetKey.from_coercible(asset) if upstream_asset_key is not None: - return self.asset_partitions_subset_for_asset(upstream_asset_key, is_dependency=True) - - # this check message isn't quite accurate - it's really that the input doesn't correspond to an asset - check.failed("The input has no asset partitions") - - def asset_partitions_subset_for_asset( - self, - asset: CoercibleToAssetKey, - *, - require_valid_partitions: bool = True, - is_dependency: bool = False, - ) -> PartitionsSubset: - asset_layer = self.job_def.asset_layer - materializing_assets_def = asset_layer.assets_def_for_node(self.node_handle) - asset_key = AssetKey.from_coercible(asset) - - if asset_layer.has_assets_def_for_asset(asset_key) or ( - asset_layer.source_assets_by_key.get(asset_key, None) is not None - ): - asset_partitions_def = asset_layer.partitions_def_for_asset(asset_key) + upstream_asset_partitions_def = asset_layer.partitions_def_for_asset(upstream_asset_key) - if asset_partitions_def is None: - check.failed(f"The asset {asset_key} has no partitions") - - is_resulting_asset = ( - materializing_assets_def - and asset_key in materializing_assets_def.keys_by_output_name.values() - ) - is_upstream_asset = ( - materializing_assets_def - and asset_key in materializing_assets_def.keys_by_input_name.values() - ) - - get_partition_subset_as_upstream_asset = ( - is_upstream_asset and not is_resulting_asset - ) or (is_upstream_asset and is_resulting_asset and is_dependency) - - if get_partition_subset_as_upstream_asset: - materializing_assets_partitions_def = ( - materializing_assets_def.partitions_def if materializing_assets_def else None - ) + if upstream_asset_partitions_def is not None: + partitions_def = assets_def.partitions_def if assets_def else None partitions_subset = ( - materializing_assets_partitions_def.empty_subset().with_partition_key_range( + partitions_def.empty_subset().with_partition_key_range( self.asset_partition_key_range, dynamic_partitions_store=self.instance ) - if materializing_assets_partitions_def + if partitions_def else None ) - partition_mapping = infer_partition_mapping( - asset_layer.partition_mapping_for_node_input(self.node_handle, asset_key), - materializing_assets_partitions_def, - asset_partitions_def, + asset_layer.partition_mapping_for_node_input( + self.node_handle, upstream_asset_key + ), + partitions_def, + upstream_asset_partitions_def, ) mapped_partitions_result = ( partition_mapping.get_upstream_mapped_partitions_result_for_partitions( partitions_subset, - asset_partitions_def, + upstream_asset_partitions_def, dynamic_partitions_store=self.instance, ) ) @@ -1152,13 +1143,86 @@ def asset_partitions_subset_for_asset( f"Partition key range {self.asset_partition_key_range} in" f" {self.node_handle.name} depends on invalid partition keys" f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in" - f" asset {asset_key}" + f" upstream asset {upstream_asset_key}" ) return mapped_partitions_result.partitions_subset - else: - return partitions_subset - check.failed(f"The asset {asset} has no partitions") + + check.failed("The input has no asset partitions") + + # def asset_partitions_subset_for_asset( + # self, + # asset: CoercibleToAssetKey, + # *, + # require_valid_partitions: bool = True, + # is_dependency: bool = False, + # ) -> PartitionsSubset: + # asset_layer = self.job_def.asset_layer + # materializing_assets_def = asset_layer.assets_def_for_node(self.node_handle) + # asset_key = AssetKey.from_coercible(asset) + + # if asset_layer.has_assets_def_for_asset(asset_key) or ( + # asset_layer.source_assets_by_key.get(asset_key, None) is not None + # ): + # asset_partitions_def = asset_layer.partitions_def_for_asset(asset_key) + + # if asset_partitions_def is None: + # check.failed(f"The asset {asset_key} has no partitions") + + # is_resulting_asset = ( + # materializing_assets_def + # and asset_key in materializing_assets_def.keys_by_output_name.values() + # ) + # is_upstream_asset = ( + # materializing_assets_def + # and asset_key in materializing_assets_def.keys_by_input_name.values() + # ) + + # get_partition_subset_as_upstream_asset = ( + # is_upstream_asset and not is_resulting_asset + # ) or (is_upstream_asset and is_resulting_asset and is_dependency) + + # materializing_assets_partitions_def = ( + # materializing_assets_def.partitions_def if materializing_assets_def else None + # ) + # partitions_subset = ( + # materializing_assets_partitions_def.empty_subset().with_partition_key_range( + # self.asset_partition_key_range, dynamic_partitions_store=self.instance + # ) + # if materializing_assets_partitions_def + # else None + # ) + + # if get_partition_subset_as_upstream_asset: + + # partition_mapping = infer_partition_mapping( + # asset_layer.partition_mapping_for_node_input(self.node_handle, asset_key), + # materializing_assets_partitions_def, + # asset_partitions_def, + # ) + # mapped_partitions_result = ( + # partition_mapping.get_upstream_mapped_partitions_result_for_partitions( + # partitions_subset, + # asset_partitions_def, + # dynamic_partitions_store=self.instance, + # ) + # ) + + # if ( + # require_valid_partitions + # and mapped_partitions_result.required_but_nonexistent_partition_keys + # ): + # raise DagsterInvariantViolationError( + # f"Partition key range {self.asset_partition_key_range} in" + # f" {self.node_handle.name} depends on invalid partition keys" + # f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in" + # f" asset {asset_key}" + # ) + + # return mapped_partitions_result.partitions_subset + # else: + # return partitions_subset + # check.failed(f"The asset {asset} has no partitions") # def asset_partition_key_for_input(self, input_name: str) -> str: # asset_layer = self.job_def.asset_layer