Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exploration of complete separation of metadata and compute: observable assets #16424

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions external_lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Optional

from dagster import AssetMaterialization, DagsterInstance
from dagster._core.definitions.events import AssetObservation
from dagster._core.events import (
AssetObservationData,
DagsterEvent,
DagsterEventType,
StepMaterializationData,
)


# This is used by external computations to report materializations
# Right now this hits the DagsterInstance directly, but we would
# change this to hit the Dagster GraphQL API, a REST API, or some
# sort of ext-esque channel
def report_asset_materialization(
asset_materialization: AssetMaterialization,
instance: Optional[DagsterInstance] = None,
run_id: Optional[str] = None,
job_name: Optional[str] = None,
):
instance = instance or DagsterInstance.get()
dagster_event = DagsterEvent.from_external(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
event_specific_data=StepMaterializationData(asset_materialization),
job_name=job_name,
)
instance.report_dagster_event(dagster_event, run_id=run_id or "runless")


def report_asset_observation(
asset_observation: AssetObservation,
instance: Optional[DagsterInstance] = None,
run_id: Optional[str] = None,
job_name: Optional[str] = None,
):
instance = instance or DagsterInstance.get()
dagster_event = DagsterEvent.from_external(
event_type=DagsterEventType.ASSET_OBSERVATION,
event_specific_data=AssetObservationData(asset_observation),
job_name=job_name,
)
instance.report_dagster_event(dagster_event, run_id=run_id or "runless")
11 changes: 11 additions & 0 deletions materialize_one.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from dagster import AssetMaterialization, DagsterInstance

from external_lib import report_asset_materialization

if __name__ == "__main__":
report_asset_materialization(
AssetMaterialization(
asset_key="observable_asset_one", metadata={"foo_metadata_label": "metadata_value"}
),
instance=DagsterInstance.get(),
)
10 changes: 10 additions & 0 deletions materialize_two.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dagster import AssetMaterialization

from external_lib import report_asset_materialization

if __name__ == "__main__":
report_asset_materialization(
AssetMaterialization(
asset_key="observable_asset_two", metadata={"foo_metadata_label": "metadata_value"}
)
)
75 changes: 75 additions & 0 deletions observable_assets_hello_world.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from dagster import Definitions, materialize, sensor
from dagster._core.definitions.asset_spec import ObservableAssetSpec
from dagster._core.definitions.data_version import DataVersion
from dagster._core.definitions.events import AssetMaterialization, AssetObservation
from dagster._core.definitions.sensor_definition import SensorEvaluationContext

from external_lib import (
report_asset_materialization,
report_asset_observation,
)


@sensor()
def sensor_that_emits_materializations(context: SensorEvaluationContext):
report_asset_materialization(
instance=context.instance,
asset_materialization=AssetMaterialization(
asset_key="observable_asset_one", metadata={"source": "from_sensor"}
),
)


@sensor()
def sensor_that_observes(context: SensorEvaluationContext):
report_asset_observation(
instance=context.instance,
asset_observation=AssetObservation(
asset_key="observable_asset_one", metadata={"source": "from_sensor"}
),
)


def observable_asset(fn, **kwargs):
return fn


def observe_function():
yield DataVersion("a_version")


# todo move out of decorator and push down
# @asset(materializeable=False)
# def actively_observed_asset():
# yield from observe_function()


@sensor()
def an_observing_sensor():
yield from observe_function()

# @observable_asset
# # This function is invocable in the context of a sensor _or_ a run
# def actively_observed_asset():
# yield DataVersion("a_version")

# This code location defines metadata exclusively. It expects execution to happen elsewhere.
asset_one = ObservableAssetSpec(key="observable_asset_one")
asset_two = ObservableAssetSpec(key="observable_asset_two", deps=[asset_one])
# defs = Definitions(assets=[asset_one, asset_two, actively_observed_asset])




defs = Definitions(
assets=[asset_one, asset_two],
# assets=[actively_observed_asset],
# sensors=[actively_observed_asset.to_sensor(...)], # could do sensory things here
)


# sensors=[sensor_that_emits_materializations, sensor_that_observes],

# actively_observed_asset()

materialize(defs.get_asset_graph().assets)
24 changes: 24 additions & 0 deletions ops_over_unmanaged_assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from dagster import DagsterInstance, job, op
from dagster._core.definitions.events import AssetMaterialization
from dagster._core.execution.context.compute import OpExecutionContext

# This is an example of how a user who has a lot of existing ops that did
# not want to migrate could still plug into the asset graph


@op
def op_produces_asset_one(context: OpExecutionContext):
context.log_event(
AssetMaterialization(
asset_key="observable_asset_one", metadata={"foo_metadata_label": "metadata_value_1"}
)
)


@job
def my_job():
op_produces_asset_one()


if __name__ == "__main__":
my_job.execute_in_process(instance=DagsterInstance.get())
6 changes: 3 additions & 3 deletions python_modules/dagster/dagster/_core/definitions/asset_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import dagster._check as check
from dagster._annotations import PublicAttr, experimental
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.asset_spec import ObservableAssetSpec
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.partition_mapping import PartitionMapping
from dagster._core.definitions.source_asset import SourceAsset
Expand Down Expand Up @@ -49,10 +49,10 @@ class AssetDep(

def __new__(
cls,
asset: Union[CoercibleToAssetKey, AssetSpec, AssetsDefinition, SourceAsset],
asset: Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset, ObservableAssetSpec],
partition_mapping: Optional[PartitionMapping] = None,
):
if isinstance(asset, AssetSpec):
if isinstance(asset, ObservableAssetSpec):
asset_key = asset.key
else:
asset_key = AssetKey.from_coercible_or_definition(asset)
Expand Down
Loading