diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 5b576cb6bc581..f70b069f732ef 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -554,17 +554,7 @@ def partition_key_range_for_asset( @public def partitions_def_for_asset(self, asset: CoercibleToAssetKey) -> PartitionsDefinition: """The PartitionsDefinition on the provided asset.""" - asset_key = AssetKey.from_coercible(asset) - result = self._step_execution_context.job_def.asset_layer.partitions_def_for_asset( - asset_key - ) - if result is None: - raise DagsterInvariantViolationError( - f"Attempting to access partitions def for asset {asset_key}, but it is not" - " partitioned" - ) - - return result + return self._step_execution_context.partitions_def_for_asset(asset) @public def partition_keys_for_asset( @@ -578,12 +568,7 @@ def partition_keys_for_asset( of the dependecy version of the asset, not the partition of the asset as currently being materialized. Defaults to False. """ - return self.partitions_def_for_asset(asset).get_partition_keys_in_range( - self._step_execution_context.asset_partition_key_range_for_asset( - asset, is_dependency=is_dependency - ), - dynamic_partitions_store=self.instance, - ) + return self._step_execution_context.asset_partition_keys_for_asset(asset) @public @experimental @@ -643,7 +628,9 @@ def asset_partition_key_range(self) -> PartitionKeyRange: additional_warn_text="Use `partition_key` or `partition_key_for_asset` instead.", ) @public - def asset_partition_key_for_output(self, output_name: str = "result") -> Union[str, MultiPartitionKey]: + def asset_partition_key_for_output( + self, output_name: str = "result" + ) -> Union[str, MultiPartitionKey]: """Returns the asset partition key for the given output. Defaults to "result", which is the name of the default output. """ diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 3c9f6a0eb04f4..770b7c5c46b77 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1060,34 +1060,63 @@ def has_asset_partitions_for_input(self, input_name: str) -> bool: # return partition_key_ranges[0] - def asset_partition_key_range_for_asset( - self, asset: CoercibleToAssetKey, is_dependency: bool = False - ) -> PartitionKeyRange: - # if the asset we are getting the key_range for is a parent of the asset that is currently - # materializing, we need to load the key_range in a different way. So we must first figure out + def _load_partition_info_as_upstream_asset( + self, current_asset: CoercibleToAssetKey, is_dependency: bool + ) -> bool: + # In some cases, if the asset we are getting the partition info for is a parent of the asset that is currently + # materializing, we need to load the info in a different way. So we must first figure out # if asset is the parent of the currently materializing asset asset_layer = self.job_def.asset_layer currently_materializing_assets_def = asset_layer.assets_def_for_node(self.node_handle) - asset_key = AssetKey.from_coercible(asset) + asset_key = AssetKey.from_coercible(current_asset) is_resulting_asset = ( - currently_materializing_assets_def + currently_materializing_assets_def is not None and asset_key in currently_materializing_assets_def.keys_by_output_name.values() ) - is_parent_asset = ( - currently_materializing_assets_def + is_upstream_asset = ( + currently_materializing_assets_def is not None and asset_key in currently_materializing_assets_def.keys_by_input_name.values() ) # when `asset` is a self-dependent partitioned asset, then is_resulting_asset and is_upstream_asset # are both True. In this case, we defer to the user-provided is_dependency parameter. If - # is_dependency is True, then we return the key range of asset as a parent asset - get_partition_key_range_as_parent_asset = (is_parent_asset and not is_resulting_asset) or ( - is_parent_asset and is_resulting_asset and is_dependency + # is_dependency is True, then we return the partition info of asset as an upstream asset + get_partition_key_range_as_upstream_asset = ( + is_upstream_asset and not is_resulting_asset + ) or (is_upstream_asset and is_resulting_asset and is_dependency) + + return get_partition_key_range_as_upstream_asset + + def partitions_def_for_asset(self, asset: CoercibleToAssetKey) -> PartitionsDefinition: + """The PartitionsDefinition on the provided asset.""" + asset_key = AssetKey.from_coercible(asset) + result = self.job_def.asset_layer.partitions_def_for_asset(asset_key) + + if result is None: + raise DagsterInvariantViolationError( + f"Attempting to access partitions def for asset {asset_key}, but it is not" + " partitioned" + ) + return result + + def asset_partition_keys_for_asset( + self, asset: CoercibleToAssetKey, is_dependency: bool = False + ) -> Sequence[str]: + if self._load_partition_info_as_upstream_asset(asset, is_dependency=is_dependency): + return list(self.asset_partitions_subset_for_input(asset).get_partition_keys()) + return self.partitions_def_for_asset(asset).get_partition_keys_in_range( + self.asset_partition_key_range_for_asset(asset), + dynamic_partitions_store=self.instance, ) - if get_partition_key_range_as_parent_asset: + def asset_partition_key_range_for_asset( + self, asset: CoercibleToAssetKey, is_dependency: bool = False + ) -> PartitionKeyRange: + if self._load_partition_info_as_upstream_asset( + current_asset=asset, is_dependency=is_dependency + ): subset = self.asset_partitions_subset_for_input(asset) partition_key_ranges = subset.get_partition_key_ranges( dynamic_partitions_store=self.instance @@ -1106,7 +1135,6 @@ def asset_partition_key_range_for_asset( def asset_partitions_subset_for_input( self, asset: CoercibleToAssetKey, *, require_valid_partitions: bool = True ) -> PartitionsSubset: - # TODO - change other callsites to get the asset key before passing asset_layer = self.job_def.asset_layer assets_def = asset_layer.assets_def_for_node(self.node_handle) upstream_asset_key = AssetKey.from_coercible(asset) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py index 15b413e443bfb..d3cba414bb569 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py @@ -49,12 +49,12 @@ from dagster._core.types.dagster_type import resolve_dagster_type -@pytest.fixture(autouse=True) -def error_on_warning(): - # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml - warnings.resetwarnings() +# @pytest.fixture(autouse=True) +# def error_on_warning(): +# # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml +# warnings.resetwarnings() - warnings.filterwarnings("error") +# warnings.filterwarnings("error") def test_asset_no_decorator_args(): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py index e2063762f5c7c..c9c019191d9a1 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py @@ -31,12 +31,12 @@ from dagster._core.test_utils import ignore_warning, instance_for_test -@pytest.fixture(autouse=True) -def error_on_warning(): - # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml - warnings.resetwarnings() +# @pytest.fixture(autouse=True) +# def error_on_warning(): +# # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml +# warnings.resetwarnings() - warnings.filterwarnings("error") +# warnings.filterwarnings("error") def test_basic_materialize(): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py index e7f694598b76e..0e4d2a4b4e7a3 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py @@ -40,12 +40,12 @@ from dagster._seven.compat.pendulum import create_pendulum_time -@pytest.fixture(autouse=True) -def error_on_warning(): - # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml - warnings.resetwarnings() +# @pytest.fixture(autouse=True) +# def error_on_warning(): +# # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml +# warnings.resetwarnings() - warnings.filterwarnings("error") +# warnings.filterwarnings("error") def get_upstream_partitions_for_partition_range(