From b8e748accb8c0b70b34b74379347f8edff3117ab Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Mon, 18 Sep 2023 16:47:47 -0400 Subject: [PATCH] Add ExternalAssetNode.is_executable (#16603) ## Summary & Motivation As was reasonably requested in https://github.com/dagster-io/dagster/pull/16576, adding `ExternalAssetNode.is_executable` and independently testing it, rather than having the logic directly in the graphql resolver. ## How I Tested These Changes BK --------- Co-authored-by: Nicholas Schrock --- .../dagster_graphql/schema/asset_graph.py | 16 +--------- .../host_representation/external_data.py | 17 +++++++++++ .../test_external_data.py | 30 ++++++++++++++++++- 3 files changed, 47 insertions(+), 16 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index 37a0788eb2134..b85889416b23c 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -12,7 +12,6 @@ StaleStatus, ) from dagster._core.definitions.external_asset_graph import ExternalAssetGraph -from dagster._core.definitions.metadata import TextMetadataValue from dagster._core.definitions.partition import CachingDynamicPartitionsLoader, PartitionsDefinition from dagster._core.errors import DagsterInvariantViolationError from dagster._core.event_api import EventRecordsFilter @@ -824,20 +823,7 @@ def resolve_isObservable(self, _graphene_info: ResolveInfo) -> bool: return self._external_asset_node.is_observable def resolve_isExecutable(self, _graphene_info: ResolveInfo) -> bool: - from dagster._core.definitions.asset_spec import ( - SYSTEM_METADATA_KEY_ASSET_VARIETAL, - AssetVarietal, - ) - - metadata_value = self._external_asset_node.metadata.get(SYSTEM_METADATA_KEY_ASSET_VARIETAL) - if not metadata_value: - varietal_text = None - else: - check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error - assert isinstance(metadata_value, TextMetadataValue) # for type checker - varietal_text = metadata_value.value - - return AssetVarietal.is_executable(varietal_text) + return self._external_asset_node.is_executable def resolve_latestMaterializationByPartition( self, diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index e45ab1cef4453..9c89794be8e28 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -47,6 +47,10 @@ ) from dagster._core.definitions.asset_check_spec import AssetCheckSpec from dagster._core.definitions.asset_sensor_definition import AssetSensorDefinition +from dagster._core.definitions.asset_spec import ( + SYSTEM_METADATA_KEY_ASSET_VARIETAL, + AssetVarietal, +) from dagster._core.definitions.assets_job import is_base_asset_job_name from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.backfill_policy import BackfillPolicy @@ -65,6 +69,7 @@ MetadataMapping, MetadataUserInput, MetadataValue, + TextMetadataValue, normalize_metadata, ) from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition @@ -1255,6 +1260,18 @@ def __new__( ), ) + @property + def is_executable(self) -> bool: + metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_VARIETAL) + if not metadata_value: + varietal_text = None + else: + check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error + assert isinstance(metadata_value, TextMetadataValue) # for type checker + varietal_text = metadata_value.value + + return AssetVarietal.is_executable(varietal_text) + ResourceJobUsageMap = Dict[str, List[ResourceJobUsageEntry]] diff --git a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py index f8afd7bce8709..c02fbec69b128 100644 --- a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py +++ b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py @@ -20,9 +20,11 @@ from dagster._check import ParameterCheckError from dagster._core.definitions import AssetIn, SourceAsset, asset, build_assets_job, multi_asset from dagster._core.definitions.asset_graph import AssetGraph +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.backfill_policy import BackfillPolicy -from dagster._core.definitions.metadata import MetadataValue, normalize_metadata +from dagster._core.definitions.metadata import MetadataValue, TextMetadataValue, normalize_metadata from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition +from dagster._core.definitions.observable_asset import create_unexecutable_observable_assets_def from dagster._core.definitions.partition import ScheduleType from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition from dagster._core.definitions.utils import DEFAULT_GROUP_NAME @@ -1183,3 +1185,29 @@ def test_external_time_window_valid_partition_key(): datetime.strptime("2023-03-11-15:00", "%Y-%m-%d-%H:%M"), tz="UTC" ).timestamp() ) + + +def test_unexecutable_external_asset_node() -> None: + asset_one = create_unexecutable_observable_assets_def([AssetSpec("asset_one")]) + + assets_job = build_assets_job("assets_job", [asset_one]) + external_asset_nodes = external_asset_graph_from_defs([assets_job], source_assets_by_key={}) + + assert len(external_asset_nodes) == 1 + assert next(iter(external_asset_nodes)).is_executable is False + + +def test_historical_external_asset_node() -> None: + assert not ExternalAssetNode( + asset_key=AssetKey("asset_one"), + dependencies=[], + depended_by=[], + # purposefully not using constants here so we know when we are breaking ourselves + metadata={"dagster/asset_varietal": TextMetadataValue("UNEXECUTABLE")}, + ).is_executable + + assert ExternalAssetNode( + asset_key=AssetKey("asset_one"), + dependencies=[], + depended_by=[], + ).is_executable