Skip to content

Commit

Permalink
s/AssetVarietal/AssetExecutionType (#16604)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Renaming AssetVarietal to AssetExecutionType as AssetVarietal is obtuse
and arguably pretentious

## How I Tested These Changes

BK

---------

Co-authored-by: Nicholas Schrock <[email protected]>
  • Loading branch information
schrockn and schrockn authored Sep 19, 2023
1 parent 373510e commit fe57d60
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 20 deletions.
18 changes: 10 additions & 8 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,29 @@
if TYPE_CHECKING:
from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep

# SYSTEM_METADATA_KEY_ASSET_VARIETAL lives on the metadata of an asset
# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE lives on the metadata of an asset
# (which currently ends up on the Output associated with the asset key)
# whih encodes the "varietal" of asset. "Unexecutable" assets are assets
# whih encodes the execution type the of asset. "Unexecutable" assets are assets
# that cannot be materialized in Dagster, but can have events in the event
# log keyed off of them, making Dagster usable as a observability and lineage tool
# for externally materialized assets.
SYSTEM_METADATA_KEY_ASSET_VARIETAL = "dagster/asset_varietal"
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE = "dagster/asset_execution_type"


class AssetVarietal(Enum):
class AssetExecutionType(Enum):
UNEXECUTABLE = "UNEXECUTABLE"
MATERIALIZEABLE = "MATERIALIZEABLE"
MATERIALIZATION = "MATERIALIZATION"

@staticmethod
def is_executable(varietal_str: Optional[str]) -> bool:
return AssetVarietal.str_to_enum(varietal_str) in {AssetVarietal.MATERIALIZEABLE}
return AssetExecutionType.str_to_enum(varietal_str) in {AssetExecutionType.MATERIALIZATION}

@staticmethod
def str_to_enum(varietal_str: Optional[str]) -> "AssetVarietal":
def str_to_enum(varietal_str: Optional[str]) -> "AssetExecutionType":
return (
AssetVarietal.MATERIALIZEABLE if varietal_str is None else AssetVarietal(varietal_str)
AssetExecutionType.MATERIALIZATION
if varietal_str is None
else AssetExecutionType(varietal_str)
)


Expand Down
8 changes: 4 additions & 4 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,12 +869,12 @@ def is_asset_executable(self, asset_key: AssetKey) -> bool:
bool: True if the asset key is materializable by this AssetsDefinition.
"""
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_VARIETAL,
AssetVarietal,
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
)

return AssetVarietal.is_executable(
self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_VARIETAL)
return AssetExecutionType.is_executable(
self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
)

def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from dagster import _check as check
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_VARIETAL,
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
AssetSpec,
AssetVarietal,
)
from dagster._core.definitions.decorators.asset_decorator import multi_asset
from dagster._core.errors import DagsterInvariantViolationError
Expand Down Expand Up @@ -33,7 +33,11 @@ def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]):
group_name=spec.group_name,
metadata={
**(spec.metadata or {}),
**{SYSTEM_METADATA_KEY_ASSET_VARIETAL: AssetVarietal.UNEXECUTABLE.value},
**{
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: (
AssetExecutionType.UNEXECUTABLE.value
)
},
},
deps=spec.deps,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.asset_sensor_definition import AssetSensorDefinition
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_VARIETAL,
AssetVarietal,
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
)
from dagster._core.definitions.assets_job import is_base_asset_job_name
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
Expand Down Expand Up @@ -1262,15 +1262,15 @@ def __new__(

@property
def is_executable(self) -> bool:
metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_VARIETAL)
metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
if not metadata_value:
varietal_text = None
else:
check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error
assert isinstance(metadata_value, TextMetadataValue) # for type checker
varietal_text = metadata_value.value

return AssetVarietal.is_executable(varietal_text)
return AssetExecutionType.is_executable(varietal_text)


ResourceJobUsageMap = Dict[str, List[ResourceJobUsageEntry]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1203,7 +1203,7 @@ def test_historical_external_asset_node() -> None:
dependencies=[],
depended_by=[],
# purposefully not using constants here so we know when we are breaking ourselves
metadata={"dagster/asset_varietal": TextMetadataValue("UNEXECUTABLE")},
metadata={"dagster/asset_execution_type": TextMetadataValue("UNEXECUTABLE")},
).is_executable

assert ExternalAssetNode(
Expand Down

0 comments on commit fe57d60

Please sign in to comment.