Skip to content

Commit

Permalink
yield MaterializeResult
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Sep 21, 2023
1 parent 101831d commit f8f184b
Show file tree
Hide file tree
Showing 13 changed files with 220 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def number_y(
context: AssetExecutionContext,
ext_k8s_pod: ExtK8sPod,
):
return ext_k8s_pod.run(
yield from ext_k8s_pod.run(
context=context,
namespace=namespace,
image=docker_image,
Expand Down Expand Up @@ -138,7 +138,7 @@ def number_y(
],
)

return ext_k8s_pod.run(
yield from ext_k8s_pod.run(
context=context,
namespace=namespace,
extras={
Expand Down Expand Up @@ -197,7 +197,7 @@ def number_y_job(context: AssetExecutionContext):
k8s_job_name=job_name,
)
reader.consume_pod_logs(core_api, job_name, namespace)
return ext_context.get_materialize_results()
yield from ext_context.get_materialize_results()

result = materialize(
[number_y_job],
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ required-version = "0.0.289"

[tool.ruff.flake8-builtins]

# We use `id` in many places and almost never want to use the python builtin.
builtins-ignorelist = ["id"]
# Id and type are frequently helpful as local variable or parameter names.
builtins-ignorelist = ["id", "type"]

[tool.ruff.flake8-tidy-imports.banned-api]

Expand Down
82 changes: 75 additions & 7 deletions python_modules/dagster-ext/dagster_ext/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@
TYPE_CHECKING,
Any,
ClassVar,
Dict,
Generic,
Iterator,
Literal,
Mapping,
Optional,
Sequence,
Set,
TextIO,
Type,
TypedDict,
TypeVar,
Union,
cast,
get_args,
)
Expand Down Expand Up @@ -98,6 +101,14 @@ class ExtDataProvenance(TypedDict):
is_user_provided: bool


ExtMetadataRawValue = Union[int, float, str, Mapping[str, Any], Sequence[Any], bool, None]


class ExtMetadataValue(TypedDict):
type: Optional["ExtMetadataType"]
raw_value: ExtMetadataRawValue


ExtMetadataType = Literal[
"text",
"url",
Expand Down Expand Up @@ -148,7 +159,10 @@ def _assert_single_asset(data: ExtContextData, key: str) -> None:


def _resolve_optionally_passed_asset_key(
data: ExtContextData, asset_key: Optional[str], method: str
data: ExtContextData,
asset_key: Optional[str],
method: str,
already_materialized_assets: Set[str],
) -> str:
asset_keys = _assert_defined_asset_property(data["asset_keys"], method)
asset_key = _assert_opt_param_type(asset_key, str, method, "asset_key")
Expand All @@ -163,6 +177,11 @@ def _resolve_optionally_passed_asset_key(
" targets multiple assets."
)
asset_key = asset_keys[0]
if asset_key in already_materialized_assets:
raise DagsterExtError(
f"Calling `{method}` with asset key `{asset_key}` is undefined. Asset has already been"
" materialized, so no additional data can be reported for it."
)
return asset_key


Expand Down Expand Up @@ -259,6 +278,30 @@ def _assert_param_json_serializable(value: _T, method: str, param: str) -> _T:
return value


def _normalize_param_metadata(
metadata: Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]], method: str, param: str
) -> Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]]:
_assert_param_type(metadata, dict, method, param)
new_metadata: Dict[str, ExtMetadataValue] = {}
for key, value in metadata.items():
if not isinstance(key, str):
raise DagsterExtError(
f"Invalid type for parameter `{param}` of `{method}`. Expected a dict with string"
f" keys, got a key `{key}` of type `{type(key)}`."
)
elif isinstance(value, dict):
if not {*value.keys()} == {*ExtMetadataValue.__annotations__.keys()}:
raise DagsterExtError(
f"Invalid type for parameter `{param}` of `{method}`. Expected a dict with"
" string keys and values that are either raw metadata values or dictionaries"
f" with schema `{{raw_value: ..., type: ...}}`. Got a value `{value}`."
)
new_metadata[key] = cast(ExtMetadataValue, value)
else:
new_metadata[key] = {"raw_value": value, "type": None}
return new_metadata


def _param_from_env_var(key: str) -> Any:
raw_value = os.environ.get(_param_name_to_env_var(key))
return decode_env_var(raw_value) if raw_value is not None else None
Expand Down Expand Up @@ -625,6 +668,7 @@ def __init__(
) -> None:
self._data = data
self.message_channel = message_channel
self.materialized_assets: Set[str] = set()

def _write_message(self, method: str, params: Optional[Mapping[str, Any]] = None) -> None:
message = ExtMessage(method=method, params=params)
Expand Down Expand Up @@ -730,26 +774,27 @@ def extras(self) -> Mapping[str, Any]:
def report_asset_metadata(
self,
label: str,
value: Any,
value: ExtMetadataRawValue,
metadata_type: Optional[ExtMetadataType] = None,
asset_key: Optional[str] = None,
) -> None:
asset_key = _resolve_optionally_passed_asset_key(
self._data, asset_key, "report_asset_metadata"
self._data, asset_key, "report_asset_metadata", self.materialized_assets
)
label = _assert_param_type(label, str, "report_asset_metadata", "label")
value = _assert_param_json_serializable(value, "report_asset_metadata", "value")
metadata_type = _assert_opt_param_value(
metadata_type, get_args(ExtMetadataType), "report_asset_metadata", "type"
type = _assert_opt_param_value(
metadata_type, get_args(ExtMetadataType), "report_asset_metadata", "metadata_type"
)

self._write_message(
"report_asset_metadata",
{"asset_key": asset_key, "label": label, "value": value, "type": metadata_type},
{"asset_key": asset_key, "label": label, "value": {"raw_value": value, "type": type}},
)

def report_asset_data_version(self, data_version: str, asset_key: Optional[str] = None) -> None:
asset_key = _resolve_optionally_passed_asset_key(
self._data, asset_key, "report_asset_data_version"
self._data, asset_key, "report_asset_data_version", self.materialized_assets
)
data_version = _assert_param_type(
data_version, str, "report_asset_data_version", "data_version"
Expand All @@ -758,6 +803,29 @@ def report_asset_data_version(self, data_version: str, asset_key: Optional[str]
"report_asset_data_version", {"asset_key": asset_key, "data_version": data_version}
)

def report_asset_materialization(
self,
metadata: Optional[Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]]] = None,
data_version: Optional[str] = None,
asset_key: Optional[str] = None,
):
asset_key = _resolve_optionally_passed_asset_key(
self._data, asset_key, "report_asset_materialization", self.materialized_assets
)
metadata = (
_normalize_param_metadata(metadata, "report_asset_check_result", "metadata")
if metadata
else None
)
data_version = _assert_opt_param_type(
data_version, str, "report_asset_data_version", "data_version"
)
self._write_message(
"report_asset_materialization",
{"asset_key": asset_key, "data_version": data_version, "metadata": metadata},
)
self.materialized_assets.add(asset_key)

def log(self, message: str, level: str = "info") -> None:
message = _assert_param_type(message, str, "log", "asset_key")
level = _assert_param_value(level, ["info", "warning", "error"], "log", "level")
Expand Down
7 changes: 7 additions & 0 deletions python_modules/dagster-ext/dagster_ext_tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,10 @@ def test_extras_context():
assert context.get_extra("foo") == "bar"
with pytest.raises(DagsterExtError, match="Extra `bar` is undefined"):
context.get_extra("bar")


def test_report_after_materialization():
context = _make_external_execution_context(asset_keys=["foo"])
with pytest.raises(DagsterExtError, match="already been materialized"):
context.report_asset_materialization(asset_key="foo")
context.report_asset_data_version("alpha", asset_key="foo")
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@

import boto3
import pytest
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.data_version import (
DATA_VERSION_IS_USER_PROVIDED_TAG,
DATA_VERSION_TAG,
)
from dagster._core.definitions.decorators.asset_decorator import asset
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.materialize import materialize
from dagster._core.definitions.metadata import (
Expand Down Expand Up @@ -148,7 +149,7 @@ def test_ext_subprocess(
def foo(context: AssetExecutionContext, ext: ExtSubprocess):
extras = {"bar": "baz"}
cmd = [_PYTHON_EXECUTABLE, external_script]
return ext.run(
yield from ext.run(
cmd,
context=context,
extras=extras,
Expand All @@ -161,11 +162,7 @@ def foo(context: AssetExecutionContext, ext: ExtSubprocess):
resource = ExtSubprocess(context_injector=context_injector, message_reader=message_reader)

with instance_for_test() as instance:
materialize(
[foo],
instance=instance,
resources={"ext": resource},
)
materialize([foo], instance=instance, resources={"ext": resource})
mat = instance.get_latest_materialization_event(foo.key)
assert mat and mat.asset_materialization
assert isinstance(mat.asset_materialization.metadata["bar"], MarkdownMetadataValue)
Expand All @@ -178,6 +175,35 @@ def foo(context: AssetExecutionContext, ext: ExtSubprocess):
assert re.search(r"dagster - INFO - [^\n]+ - hello world\n", captured.err, re.MULTILINE)


def test_ext_multi_asset():
def script_fn():
from dagster_ext import init_dagster_ext

context = init_dagster_ext()
context.report_asset_materialization(
{"foo_meta": "ok"}, data_version="alpha", asset_key="foo"
)
context.report_asset_data_version("alpha", asset_key="bar")

@multi_asset(specs=[AssetSpec("foo"), AssetSpec("bar")])
def foo_bar(context: AssetExecutionContext, ext: ExtSubprocess):
with temp_script(script_fn) as script_path:
cmd = [_PYTHON_EXECUTABLE, script_path]
yield from ext.run(cmd, context=context)

with instance_for_test() as instance:
materialize([foo_bar], instance=instance, resources={"ext": ExtSubprocess()})
foo_mat = instance.get_latest_materialization_event(AssetKey(["foo"]))
assert foo_mat and foo_mat.asset_materialization
assert foo_mat.asset_materialization.metadata["foo_meta"].value == "ok"
assert foo_mat.asset_materialization.tags
assert foo_mat.asset_materialization.tags[DATA_VERSION_TAG] == "alpha"
bar_mat = instance.get_latest_materialization_event(AssetKey(["foo"]))
assert bar_mat and bar_mat.asset_materialization
assert bar_mat.asset_materialization.tags
assert bar_mat.asset_materialization.tags[DATA_VERSION_TAG] == "alpha"


def test_ext_typed_metadata():
def script_fn():
from dagster_ext import init_dagster_ext
Expand All @@ -201,7 +227,7 @@ def script_fn():
def foo(context: AssetExecutionContext, ext: ExtSubprocess):
with temp_script(script_fn) as script_path:
cmd = [_PYTHON_EXECUTABLE, script_path]
return ext.run(cmd, context=context)
yield from ext.run(cmd, context=context)

with instance_for_test() as instance:
materialize(
Expand Down Expand Up @@ -248,7 +274,7 @@ def script_fn():
def foo(context: AssetExecutionContext, ext: ExtSubprocess):
with temp_script(script_fn) as script_path:
cmd = [_PYTHON_EXECUTABLE, script_path]
ext.run(cmd, context=context)
yield from ext.run(cmd, context=context)

with pytest.raises(DagsterExternalExecutionError):
materialize([foo], resources={"ext": ExtSubprocess()})
Expand Down Expand Up @@ -313,9 +339,7 @@ def subproc_run(context: AssetExecutionContext):
extras=extras,
) as ext_context:
subprocess.run(cmd, env=ext_context.get_external_process_env_vars(), check=False)
_ext_context = ext_context
mat_results = _ext_context.get_materialize_results()
return mat_results[0] if len(mat_results) == 1 else mat_results
yield from ext_context.get_materialize_results()

with instance_for_test() as instance:
materialize(
Expand All @@ -328,30 +352,3 @@ def subproc_run(context: AssetExecutionContext):
assert mat.asset_materialization.tags
assert mat.asset_materialization.tags[DATA_VERSION_TAG] == "alpha"
assert mat.asset_materialization.tags[DATA_VERSION_IS_USER_PROVIDED_TAG]


def test_ext_no_client_premature_get_results(external_script):
@asset
def subproc_run(context: AssetExecutionContext):
extras = {"bar": "baz"}
cmd = [_PYTHON_EXECUTABLE, external_script]

with ext_protocol(
context,
ExtTempFileContextInjector(),
ExtTempFileMessageReader(),
extras=extras,
) as ext_context:
subprocess.run(cmd, env=ext_context.get_external_process_env_vars(), check=False)
return ext_context.get_materialize_results()

with pytest.raises(
DagsterExternalExecutionError,
match=(
"`get_materialize_results` must be called after the `ext_protocol` context manager has"
" exited."
),
):
materialize(
[subproc_run],
)
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class TableColumn(
def __new__(
cls,
name: str,
type: str = "string", # noqa: A002
type: str = "string",
description: Optional[str] = None,
constraints: Optional["TableColumnConstraints"] = None,
):
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_core/ext/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import TYPE_CHECKING, Iterator, Optional, Tuple, Union
from typing import TYPE_CHECKING, Iterator, Optional

from dagster_ext import (
ExtContextData,
Expand All @@ -23,7 +23,7 @@ def run(
*,
context: OpExecutionContext,
extras: Optional[ExtExtras] = None,
) -> Union["MaterializeResult", Tuple["MaterializeResult", ...]]: ...
) -> Iterator["MaterializeResult"]: ...


class ExtContextInjector(ABC):
Expand Down
Loading

0 comments on commit f8f184b

Please sign in to comment.