Skip to content

Commit

Permalink
Add ExternalAssetNode.is_executable (#16603)
Browse files Browse the repository at this point in the history
## Summary & Motivation

As was reasonably requested in
#16576, adding
`ExternalAssetNode.is_executable` and independently testing it, rather
than having the logic directly in the graphql resolver.

## How I Tested These Changes

BK

---------

Co-authored-by: Nicholas Schrock <[email protected]>
  • Loading branch information
schrockn and schrockn authored Sep 18, 2023
1 parent df1105c commit b8e748a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
StaleStatus,
)
from dagster._core.definitions.external_asset_graph import ExternalAssetGraph
from dagster._core.definitions.metadata import TextMetadataValue
from dagster._core.definitions.partition import CachingDynamicPartitionsLoader, PartitionsDefinition
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.event_api import EventRecordsFilter
Expand Down Expand Up @@ -824,20 +823,7 @@ def resolve_isObservable(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_observable

def resolve_isExecutable(self, _graphene_info: ResolveInfo) -> bool:
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_VARIETAL,
AssetVarietal,
)

metadata_value = self._external_asset_node.metadata.get(SYSTEM_METADATA_KEY_ASSET_VARIETAL)
if not metadata_value:
varietal_text = None
else:
check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error
assert isinstance(metadata_value, TextMetadataValue) # for type checker
varietal_text = metadata_value.value

return AssetVarietal.is_executable(varietal_text)
return self._external_asset_node.is_executable

def resolve_latestMaterializationByPartition(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
)
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.asset_sensor_definition import AssetSensorDefinition
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_VARIETAL,
AssetVarietal,
)
from dagster._core.definitions.assets_job import is_base_asset_job_name
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy
Expand All @@ -65,6 +69,7 @@
MetadataMapping,
MetadataUserInput,
MetadataValue,
TextMetadataValue,
normalize_metadata,
)
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition
Expand Down Expand Up @@ -1255,6 +1260,18 @@ def __new__(
),
)

@property
def is_executable(self) -> bool:
metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_VARIETAL)
if not metadata_value:
varietal_text = None
else:
check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error
assert isinstance(metadata_value, TextMetadataValue) # for type checker
varietal_text = metadata_value.value

return AssetVarietal.is_executable(varietal_text)


ResourceJobUsageMap = Dict[str, List[ResourceJobUsageEntry]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
from dagster._check import ParameterCheckError
from dagster._core.definitions import AssetIn, SourceAsset, asset, build_assets_job, multi_asset
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.metadata import MetadataValue, normalize_metadata
from dagster._core.definitions.metadata import MetadataValue, TextMetadataValue, normalize_metadata
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition
from dagster._core.definitions.observable_asset import create_unexecutable_observable_assets_def
from dagster._core.definitions.partition import ScheduleType
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition
from dagster._core.definitions.utils import DEFAULT_GROUP_NAME
Expand Down Expand Up @@ -1183,3 +1185,29 @@ def test_external_time_window_valid_partition_key():
datetime.strptime("2023-03-11-15:00", "%Y-%m-%d-%H:%M"), tz="UTC"
).timestamp()
)


def test_unexecutable_external_asset_node() -> None:
asset_one = create_unexecutable_observable_assets_def([AssetSpec("asset_one")])

assets_job = build_assets_job("assets_job", [asset_one])
external_asset_nodes = external_asset_graph_from_defs([assets_job], source_assets_by_key={})

assert len(external_asset_nodes) == 1
assert next(iter(external_asset_nodes)).is_executable is False


def test_historical_external_asset_node() -> None:
assert not ExternalAssetNode(
asset_key=AssetKey("asset_one"),
dependencies=[],
depended_by=[],
# purposefully not using constants here so we know when we are breaking ourselves
metadata={"dagster/asset_varietal": TextMetadataValue("UNEXECUTABLE")},
).is_executable

assert ExternalAssetNode(
asset_key=AssetKey("asset_one"),
dependencies=[],
depended_by=[],
).is_executable

0 comments on commit b8e748a

Please sign in to comment.