Skip to content

Commit

Permalink
try observable asset instead of spec
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Sep 12, 2023
1 parent 9ca12c3 commit f31ca5e
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 22 deletions.
12 changes: 4 additions & 8 deletions observable_assets_hello_world.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dagster import Definitions, sensor
from dagster._core.definitions.asset_spec import ObservableAssetSpec
from dagster._core.definitions.asset_spec import ObservableAsset
from dagster._core.definitions.events import AssetMaterialization, AssetObservation
from dagster._core.definitions.sensor_definition import SensorEvaluationContext

Expand Down Expand Up @@ -30,13 +30,9 @@ def sensor_that_observes(context: SensorEvaluationContext):


# This code location defines metadata exclusively. It expects execution to happen elsewhere.
observable_asset_one_spec = ObservableAssetSpec(asset_key="observable_asset_one")
observable_asset_two_spec = ObservableAssetSpec(
asset_key="observable_asset_two", deps=[observable_asset_one_spec]
)
defs = Definitions(
assets=[observable_asset_one_spec, observable_asset_two_spec],
)
asset_one = ObservableAsset(asset_key="observable_asset_one")
asset_two = ObservableAsset(asset_key="observable_asset_two", deps=[asset_one])
defs = Definitions(assets=[asset_one, asset_two])


# sensors=[sensor_that_emits_materializations, sensor_that_observes],
6 changes: 3 additions & 3 deletions python_modules/dagster/dagster/_core/definitions/asset_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import dagster._check as check
from dagster._annotations import PublicAttr, experimental
from dagster._core.definitions.asset_spec import ObservableAssetSpec
from dagster._core.definitions.asset_spec import ObservableAsset
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.partition_mapping import PartitionMapping
from dagster._core.definitions.source_asset import SourceAsset
Expand Down Expand Up @@ -49,10 +49,10 @@ class AssetDep(

def __new__(
cls,
asset: Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset, ObservableAssetSpec],
asset: Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset, ObservableAsset],
partition_mapping: Optional[PartitionMapping] = None,
):
if isinstance(asset, ObservableAssetSpec):
if isinstance(asset, ObservableAsset):
asset_key = asset.asset_key
else:
asset_key = AssetKey.from_coercible_or_definition(asset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


@experimental
class ObservableAssetSpec:
class ObservableAsset:
"""Specifies the core attributes of an observable asset.
Attributes:
Expand Down Expand Up @@ -48,7 +48,7 @@ def __init__(
Iterable[
Union[
CoercibleToAssetKey,
"ObservableAssetSpec",
"ObservableAsset",
AssetsDefinition,
"SourceAsset",
"AssetDep",
Expand Down Expand Up @@ -87,7 +87,7 @@ def __init__(


@experimental
class AssetSpec(ObservableAssetSpec):
class AssetSpec(ObservableAsset):
"""Specifies the core attributes of an asset. This object is attached to the decorated
function that defines how it materialized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_graph import InternalAssetGraph
from dagster._core.definitions.asset_spec import AssetSpec, ObservableAssetSpec
from dagster._core.definitions.asset_spec import ObservableAsset
from dagster._core.definitions.decorators import asset
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.executor_definition import ExecutorDefinition
Expand Down Expand Up @@ -245,7 +245,7 @@ def _attach_resources_to_jobs_and_instigator_jobs(
# button in the UI (it would act more like a source asset). However
# this allows Dagster to act as a data observability tool and lineage
# tool for assets defined elsewhere
def create_observable_asset(asset_spec: ObservableAssetSpec) -> AssetsDefinition:
def create_observable_asset(asset_spec: ObservableAsset) -> AssetsDefinition:
@asset(key=asset_spec.asset_key, deps=[dep.asset_key for dep in asset_spec.deps])
def _observable_asset(_) -> None:
raise Exception("Illegal to materialize an observable asset")
Expand All @@ -256,10 +256,11 @@ def _observable_asset(_) -> None:
# raise Exception("illegal to materialize this asset")
return _observable_asset


def _create_repository_using_definitions_args(
name: str,
assets: Optional[
Iterable[Union[AssetsDefinition, ObservableAssetSpec, CacheableAssetsDefinition]]
Iterable[Union[AssetsDefinition, ObservableAsset, CacheableAssetsDefinition]]
] = None,
schedules: Optional[
Iterable[Union[ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition]]
Expand All @@ -272,7 +273,7 @@ def _create_repository_using_definitions_args(
asset_checks: Optional[Iterable[AssetChecksDefinition]] = None,
):
check.opt_iterable_param(
assets, "assets", (AssetsDefinition, ObservableAssetSpec, CacheableAssetsDefinition)
assets, "assets", (AssetsDefinition, ObservableAsset, CacheableAssetsDefinition)
)
check.opt_iterable_param(
schedules, "schedules", (ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition)
Expand All @@ -289,7 +290,7 @@ def _create_repository_using_definitions_args(

new_assets = []
for asset_ in assets or []:
if isinstance(asset_, ObservableAssetSpec) and not isinstance(asset, SourceAsset):
if isinstance(asset_, ObservableAsset) and not isinstance(asset, SourceAsset):
new_assets.append(create_observable_asset(asset_))
else:
new_assets.append(asset_)
Expand Down Expand Up @@ -443,7 +444,7 @@ class Definitions:
def __init__(
self,
assets: Optional[
Iterable[Union[AssetsDefinition, ObservableAssetSpec, CacheableAssetsDefinition]]
Iterable[Union[AssetsDefinition, ObservableAsset, CacheableAssetsDefinition]]
] = None,
schedules: Optional[
Iterable[Union[ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import dagster._check as check
from dagster._annotations import PublicAttr, experimental_param, public
from dagster._core.decorator_utils import get_function_params
from dagster._core.definitions.asset_spec import ObservableAssetSpec
from dagster._core.definitions.asset_spec import ObservableAsset
from dagster._core.definitions.data_version import (
DATA_VERSION_TAG,
DataVersion,
Expand Down Expand Up @@ -55,7 +55,7 @@

@experimental_param(param="resource_defs")
@experimental_param(param="io_manager_def")
class SourceAsset(ObservableAssetSpec, ResourceAddable):
class SourceAsset(ObservableAsset, ResourceAddable):
"""A SourceAsset represents an asset that will be loaded by (but not updated by) Dagster.
Attributes:
Expand Down

0 comments on commit f31ca5e

Please sign in to comment.