From 18af8068a73fb48ce106097a9f8d657437345036 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 1 Sep 2023 14:36:58 -0400 Subject: [PATCH] wip --- .../dagster/_core/execution/context/system.py | 84 ++++++++++--------- 1 file changed, 45 insertions(+), 39 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index d68a9f82eaa1a..f5fa06304993e 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1095,7 +1095,7 @@ def asset_partitions_subset_for_asset( is_dependency: bool = False, ) -> PartitionsSubset: asset_layer = self.job_def.asset_layer - assets_def = asset_layer.assets_def_for_node(self.node_handle) + materializing_assets_def = asset_layer.assets_def_for_node(self.node_handle) asset_key = AssetKey.from_coercible(asset) if asset_layer.has_assets_def_for_asset(asset_key) or ( @@ -1103,55 +1103,61 @@ def asset_partitions_subset_for_asset( ): asset_partitions_def = asset_layer.partitions_def_for_asset(asset_key) - if asset_partitions_def is not None: - partitions_def = assets_def.partitions_def if assets_def else None + if asset_partitions_def is None: + check.failed(f"The asset {asset_key} has no partitions") + + is_resulting_asset = ( + materializing_assets_def + and asset_key in materializing_assets_def.keys_by_output_name.values() + ) + is_upstream_asset = ( + materializing_assets_def + and asset_key in materializing_assets_def.keys_by_input_name.values() + ) + + get_partition_subset_as_upstream_asset = ( + is_upstream_asset and not is_resulting_asset + ) or (is_upstream_asset and is_resulting_asset and is_dependency) + + if get_partition_subset_as_upstream_asset: + materializing_assets_partitions_def = ( + materializing_assets_def.partitions_def if materializing_assets_def else None + ) partitions_subset = ( - partitions_def.empty_subset().with_partition_key_range( + materializing_assets_partitions_def.empty_subset().with_partition_key_range( self.asset_partition_key_range, dynamic_partitions_store=self.instance ) - if partitions_def + if materializing_assets_partitions_def else None ) - is_resulting_asset = ( - assets_def and asset_key in assets_def.keys_by_output_name.values() - ) - is_upstream_asset = ( - assets_def and asset_key in assets_def.keys_by_input_name.values() + partition_mapping = infer_partition_mapping( + asset_layer.partition_mapping_for_node_input(self.node_handle, asset_key), + materializing_assets_partitions_def, + asset_partitions_def, ) - - get_partition_subset_as_upstream_asset = ( - is_upstream_asset and not is_resulting_asset - ) or (is_upstream_asset and is_resulting_asset and is_dependency) - - if get_partition_subset_as_upstream_asset: - partition_mapping = infer_partition_mapping( - asset_layer.partition_mapping_for_node_input(self.node_handle, asset_key), - partitions_def, + mapped_partitions_result = ( + partition_mapping.get_upstream_mapped_partitions_result_for_partitions( + partitions_subset, asset_partitions_def, + dynamic_partitions_store=self.instance, ) - mapped_partitions_result = ( - partition_mapping.get_upstream_mapped_partitions_result_for_partitions( - partitions_subset, - asset_partitions_def, - dynamic_partitions_store=self.instance, - ) + ) + + if ( + require_valid_partitions + and mapped_partitions_result.required_but_nonexistent_partition_keys + ): + raise DagsterInvariantViolationError( + f"Partition key range {self.asset_partition_key_range} in" + f" {self.node_handle.name} depends on invalid partition keys" + f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in" + f" asset {asset_key}" ) - if ( - require_valid_partitions - and mapped_partitions_result.required_but_nonexistent_partition_keys - ): - raise DagsterInvariantViolationError( - f"Partition key range {self.asset_partition_key_range} in" - f" {self.node_handle.name} depends on invalid partition keys" - f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in" - f" asset {asset_key}" - ) - - return mapped_partitions_result.partitions_subset - else: - return partitions_subset + return mapped_partitions_result.partitions_subset + else: + return partitions_subset check.failed(f"The asset {asset} has no partitions") # def asset_partition_key_for_input(self, input_name: str) -> str: