diff --git a/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/pure_assets_dsl/sql_script.py b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/pure_assets_dsl/sql_script.py index ea3a7908b306d..7c9d672859e8f 100644 --- a/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/pure_assets_dsl/sql_script.py +++ b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/pure_assets_dsl/sql_script.py @@ -15,5 +15,5 @@ def query(self, query_str: str) -> None: client = SomeSqlClient() client.query(sql) - context.report_asset_metadata("sql", sql) + context.report_asset_materialization(metadata={"sql": sql}) context.log(f"Ran {sql}") diff --git a/python_modules/dagster-ext/dagster_ext/__init__.py b/python_modules/dagster-ext/dagster_ext/__init__.py index bbe6f196fbd3f..1763ee767ba4f 100644 --- a/python_modules/dagster-ext/dagster_ext/__init__.py +++ b/python_modules/dagster-ext/dagster_ext/__init__.py @@ -16,18 +16,20 @@ TYPE_CHECKING, Any, ClassVar, + Dict, Generic, Iterator, Literal, Mapping, Optional, Sequence, + Set, TextIO, Type, TypedDict, TypeVar, + Union, cast, - get_args, ) if TYPE_CHECKING: @@ -98,6 +100,14 @@ class ExtDataProvenance(TypedDict): is_user_provided: bool +ExtMetadataRawValue = Union[int, float, str, Mapping[str, Any], Sequence[Any], bool, None] + + +class ExtMetadataValue(TypedDict): + type: Optional["ExtMetadataType"] + raw_value: ExtMetadataRawValue + + ExtMetadataType = Literal[ "text", "url", @@ -148,7 +158,10 @@ def _assert_single_asset(data: ExtContextData, key: str) -> None: def _resolve_optionally_passed_asset_key( - data: ExtContextData, asset_key: Optional[str], method: str + data: ExtContextData, + asset_key: Optional[str], + method: str, + already_materialized_assets: Set[str], ) -> str: asset_keys = _assert_defined_asset_property(data["asset_keys"], method) asset_key = _assert_opt_param_type(asset_key, str, method, "asset_key") @@ -163,6 +176,11 @@ def _resolve_optionally_passed_asset_key( " targets multiple assets." ) asset_key = asset_keys[0] + if asset_key in already_materialized_assets: + raise DagsterExtError( + f"Calling `{method}` with asset key `{asset_key}` is undefined. Asset has already been" + " materialized, so no additional data can be reported for it." + ) return asset_key @@ -259,6 +277,33 @@ def _assert_param_json_serializable(value: _T, method: str, param: str) -> _T: return value +_METADATA_VALUE_KEYS = frozenset(ExtMetadataValue.__annotations__.keys()) + + +def _normalize_param_metadata( + metadata: Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]], method: str, param: str +) -> Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]]: + _assert_param_type(metadata, dict, method, param) + new_metadata: Dict[str, ExtMetadataValue] = {} + for key, value in metadata.items(): + if not isinstance(key, str): + raise DagsterExtError( + f"Invalid type for parameter `{param}` of `{method}`. Expected a dict with string" + f" keys, got a key `{key}` of type `{type(key)}`." + ) + elif isinstance(value, dict): + if not {*value.keys()} == _METADATA_VALUE_KEYS: + raise DagsterExtError( + f"Invalid type for parameter `{param}` of `{method}`. Expected a dict with" + " string keys and values that are either raw metadata values or dictionaries" + f" with schema `{{raw_value: ..., type: ...}}`. Got a value `{value}`." + ) + new_metadata[key] = cast(ExtMetadataValue, value) + else: + new_metadata[key] = {"raw_value": value, "type": None} + return new_metadata + + def _param_from_env_var(key: str) -> Any: raw_value = os.environ.get(_param_name_to_env_var(key)) return decode_env_var(raw_value) if raw_value is not None else None @@ -625,6 +670,7 @@ def __init__( ) -> None: self._data = data self._message_channel = message_channel + self._materialized_assets: set[str] = set() def _write_message(self, method: str, params: Optional[Mapping[str, Any]] = None) -> None: message = ExtMessage(method=method, params=params) @@ -727,36 +773,28 @@ def extras(self) -> Mapping[str, Any]: # ##### WRITE - def report_asset_metadata( + def report_asset_materialization( self, - label: str, - value: Any, - metadata_type: Optional[ExtMetadataType] = None, + metadata: Optional[Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]]] = None, + data_version: Optional[str] = None, asset_key: Optional[str] = None, - ) -> None: + ): asset_key = _resolve_optionally_passed_asset_key( - self._data, asset_key, "report_asset_metadata" + self._data, asset_key, "report_asset_materialization", self._materialized_assets ) - label = _assert_param_type(label, str, "report_asset_metadata", "label") - value = _assert_param_json_serializable(value, "report_asset_metadata", "value") - metadata_type = _assert_opt_param_value( - metadata_type, get_args(ExtMetadataType), "report_asset_metadata", "type" - ) - self._write_message( - "report_asset_metadata", - {"asset_key": asset_key, "label": label, "value": value, "type": metadata_type}, - ) - - def report_asset_data_version(self, data_version: str, asset_key: Optional[str] = None) -> None: - asset_key = _resolve_optionally_passed_asset_key( - self._data, asset_key, "report_asset_data_version" + metadata = ( + _normalize_param_metadata(metadata, "report_asset_materialization", "metadata") + if metadata + else None ) - data_version = _assert_param_type( - data_version, str, "report_asset_data_version", "data_version" + data_version = _assert_opt_param_type( + data_version, str, "report_asset_materialization", "data_version" ) self._write_message( - "report_asset_data_version", {"asset_key": asset_key, "data_version": data_version} + "report_asset_materialization", + {"asset_key": asset_key, "data_version": data_version, "metadata": metadata}, ) + self._materialized_assets.add(asset_key) def log(self, message: str, level: str = "info") -> None: message = _assert_param_type(message, str, "log", "asset_key") diff --git a/python_modules/dagster-ext/dagster_ext_tests/test_context.py b/python_modules/dagster-ext/dagster_ext_tests/test_context.py index 9b2ab7aca7ce2..4294e53bbeda7 100644 --- a/python_modules/dagster-ext/dagster_ext_tests/test_context.py +++ b/python_modules/dagster-ext/dagster_ext_tests/test_context.py @@ -79,12 +79,15 @@ def test_single_asset_context(): assert context.code_version_by_asset_key == {"foo": "beta"} assert context.provenance == foo_provenance assert context.provenance_by_asset_key == {"foo": foo_provenance} - context.report_asset_metadata("bar", "boo") - context.report_asset_metadata("baz", 2, "int") - context.report_asset_data_version("bar") + context.report_asset_materialization( + metadata={ + "bar": "boo", + "baz": {"raw_value": 2, "type": "int"}, + }, + data_version="bar", + ) - _assert_unknown_asset_key(context, "report_asset_metadata", "bar", "baz", asset_key="fake") - _assert_unknown_asset_key(context, "report_asset_data_version", "bar", asset_key="fake") + _assert_unknown_asset_key(context, "report_asset_materialization", asset_key="fake") def test_multi_asset_context(): @@ -110,10 +113,8 @@ def test_multi_asset_context(): _assert_undefined(context, "provenance") assert context.provenance_by_asset_key == {"foo": foo_provenance, "bar": bar_provenance} - _assert_undefined_asset_key(context, "report_asset_metadata", "bar", "baz") - _assert_unknown_asset_key(context, "report_asset_metadata", "bar", "baz", asset_key="fake") - _assert_undefined_asset_key(context, "report_asset_data_version", "bar") - _assert_unknown_asset_key(context, "report_asset_data_version", "bar", asset_key="fake") + _assert_undefined_asset_key(context, "report_asset_materialization", "bar") + _assert_unknown_asset_key(context, "report_asset_materialization", "bar", asset_key="fake") def test_no_partition_context(): @@ -162,3 +163,10 @@ def test_extras_context(): assert context.get_extra("foo") == "bar" with pytest.raises(DagsterExtError, match="Extra `bar` is undefined"): context.get_extra("bar") + + +def test_report_twice_materialized(): + context = _make_external_execution_context(asset_keys=["foo"]) + with pytest.raises(DagsterExtError, match="already been materialized"): + context.report_asset_materialization(asset_key="foo") + context.report_asset_materialization(asset_key="foo") diff --git a/python_modules/dagster-ext/dagster_ext_tests/test_external_execution.py b/python_modules/dagster-ext/dagster_ext_tests/test_external_execution.py index 84396dce781b4..2d7c3910f6579 100644 --- a/python_modules/dagster-ext/dagster_ext_tests/test_external_execution.py +++ b/python_modules/dagster-ext/dagster_ext_tests/test_external_execution.py @@ -91,8 +91,10 @@ def script_fn(): context = ExtContext.get() context.log("hello world") time.sleep(0.1) # sleep to make sure that we encompass multiple intervals for blob store IO - context.report_asset_metadata("bar", context.get_extra("bar"), metadata_type="md") - context.report_asset_data_version("alpha") + context.report_asset_materialization( + metadata={"bar": {"raw_value": context.get_extra("bar"), "type": "md"}}, + data_version="alpha", + ) with temp_script(script_fn) as script_path: yield script_path @@ -183,19 +185,23 @@ def script_fn(): from dagster_ext import init_dagster_ext context = init_dagster_ext() - context.report_asset_metadata("untyped_meta", "bar") - context.report_asset_metadata("text_meta", "bar", metadata_type="text") - context.report_asset_metadata("url_meta", "http://bar.com", metadata_type="url") - context.report_asset_metadata("path_meta", "/bar", metadata_type="path") - context.report_asset_metadata("notebook_meta", "/bar.ipynb", metadata_type="notebook") - context.report_asset_metadata("json_meta", ["bar"], metadata_type="json") - context.report_asset_metadata("md_meta", "bar", metadata_type="md") - context.report_asset_metadata("float_meta", 1.0, metadata_type="float") - context.report_asset_metadata("int_meta", 1, metadata_type="int") - context.report_asset_metadata("bool_meta", True, metadata_type="bool") - context.report_asset_metadata("dagster_run_meta", "foo", metadata_type="dagster_run") - context.report_asset_metadata("asset_meta", "bar/baz", metadata_type="asset") - context.report_asset_metadata("null_meta", None, metadata_type="null") + context.report_asset_materialization( + metadata={ + "untyped_meta": "bar", + "text_meta": {"raw_value": "bar", "type": "text"}, + "url_meta": {"raw_value": "http://bar.com", "type": "url"}, + "path_meta": {"raw_value": "/bar", "type": "path"}, + "notebook_meta": {"raw_value": "/bar.ipynb", "type": "notebook"}, + "json_meta": {"raw_value": ["bar"], "type": "json"}, + "md_meta": {"raw_value": "bar", "type": "md"}, + "float_meta": {"raw_value": 1.0, "type": "float"}, + "int_meta": {"raw_value": 1, "type": "int"}, + "bool_meta": {"raw_value": True, "type": "bool"}, + "dagster_run_meta": {"raw_value": "foo", "type": "dagster_run"}, + "asset_meta": {"raw_value": "bar/baz", "type": "asset"}, + "null_meta": {"raw_value": None, "type": "null"}, + } + ) @asset def foo(context: AssetExecutionContext, ext: ExtSubprocess): @@ -286,8 +292,10 @@ def script_fn(): init_dagster_ext() context = ExtContext.get() context.log("hello world") - context.report_asset_metadata("bar", context.get_extra("bar")) - context.report_asset_data_version("alpha") + context.report_asset_materialization( + metadata={"bar": context.get_extra("bar")}, + data_version="alpha", + ) with temp_script(script_fn) as script_path: cmd = ["python", script_path] diff --git a/python_modules/dagster-test/dagster_test/toys/external_execution/numbers_example/number_sum.py b/python_modules/dagster-test/dagster_test/toys/external_execution/numbers_example/number_sum.py index 50f3a7b77608b..65af9f658c5b2 100644 --- a/python_modules/dagster-test/dagster_test/toys/external_execution/numbers_example/number_sum.py +++ b/python_modules/dagster-test/dagster_test/toys/external_execution/numbers_example/number_sum.py @@ -10,4 +10,4 @@ store_asset_value("number_sum", storage_root, value) context.log(f"{context.asset_key}: {number_x} + {number_y} = {value}") -context.report_asset_data_version(compute_data_version(value)) +context.report_asset_materialization(data_version=compute_data_version(value)) diff --git a/python_modules/dagster-test/dagster_test/toys/external_execution/numbers_example/number_x.py b/python_modules/dagster-test/dagster_test/toys/external_execution/numbers_example/number_x.py index 1cd8bda2f3ce8..8e08e7ab71e12 100644 --- a/python_modules/dagster-test/dagster_test/toys/external_execution/numbers_example/number_x.py +++ b/python_modules/dagster-test/dagster_test/toys/external_execution/numbers_example/number_x.py @@ -10,4 +10,4 @@ store_asset_value("number_x", storage_root, value) context.log(f"{context.asset_key}: {2} * {multiplier} = {value}") -context.report_asset_data_version(compute_data_version(value)) +context.report_asset_materialization(data_version=compute_data_version(value)) diff --git a/python_modules/dagster-test/dagster_test/toys/external_execution/numbers_example/number_y.py b/python_modules/dagster-test/dagster_test/toys/external_execution/numbers_example/number_y.py index 7e9bb5e3d70d0..6dbedab591f1e 100644 --- a/python_modules/dagster-test/dagster_test/toys/external_execution/numbers_example/number_y.py +++ b/python_modules/dagster-test/dagster_test/toys/external_execution/numbers_example/number_y.py @@ -11,5 +11,7 @@ store_asset_value("number_y", storage_root, value) context.log(f"{context.asset_key}: {value} read from $NUMBER_Y environment variable.") -context.report_asset_metadata("is_even", value % 2 == 0) -context.report_asset_data_version(compute_data_version(value)) +context.report_asset_materialization( + metadata={"is_even": value % 2 == 0}, + data_version=compute_data_version(value), +) diff --git a/python_modules/dagster/dagster/_core/ext/context.py b/python_modules/dagster/dagster/_core/ext/context.py index 6f7e5c47339ce..ae1a37848beb7 100644 --- a/python_modules/dagster/dagster/_core/ext/context.py +++ b/python_modules/dagster/dagster/_core/ext/context.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Any, Mapping, Optional, get_args +from typing import Any, Mapping, Optional from dagster_ext import ( DAGSTER_EXT_ENV_KEYS, @@ -28,26 +28,6 @@ class ExtMessageHandler: def __init__(self, context: OpExecutionContext) -> None: self._context = context - # Type ignores because we currently validate in individual handlers - def handle_message(self, message: ExtMessage) -> None: - if message["method"] == "report_asset_metadata": - self._handle_report_asset_metadata(**message["params"]) # type: ignore - elif message["method"] == "report_asset_data_version": - self._handle_report_asset_data_version(**message["params"]) # type: ignore - elif message["method"] == "log": - self._handle_log(**message["params"]) # type: ignore - - def _handle_report_asset_metadata( - self, asset_key: str, label: str, value: Any, type: ExtMetadataType # noqa: A002 - ) -> None: - check.str_param(asset_key, "asset_key") - check.str_param(label, "label") - check.opt_literal_param(type, "type", get_args(ExtMetadataType)) - key = AssetKey.from_user_string(asset_key) - output_name = self._context.output_for_asset_key(key) - metadata_value = self._resolve_metadata_value(value, type) - self._context.add_output_metadata({label: metadata_value}, output_name) - def _resolve_metadata_value( self, value: Any, metadata_type: Optional[ExtMetadataType] ) -> MetadataValue: @@ -82,11 +62,28 @@ def _resolve_metadata_value( else: check.failed(f"Unexpected metadata type {metadata_type}") - def _handle_report_asset_data_version(self, asset_key: str, data_version: str) -> None: + # Type ignores because we currently validate in individual handlers + def handle_message(self, message: ExtMessage) -> None: + if message["method"] == "report_asset_materialization": + self._handle_report_asset_materialization(**message["params"]) # type: ignore + elif message["method"] == "log": + self._handle_log(**message["params"]) # type: ignore + + def _handle_report_asset_materialization( + self, asset_key: str, metadata: Optional[Mapping[str, Any]], data_version: Optional[str] + ) -> None: check.str_param(asset_key, "asset_key") - check.str_param(data_version, "data_version") - key = AssetKey.from_user_string(asset_key) - self._context.set_data_version(key, DataVersion(data_version)) + check.opt_str_param(data_version, "data_version") + metadata = check.opt_mapping_param(metadata, "metadata", key_type=str) + resolved_asset_key = AssetKey.from_user_string(asset_key) + resolved_metadata = { + k: self._resolve_metadata_value(v["raw_value"], v["type"]) for k, v in metadata.items() + } + if data_version is not None: + self._context.set_data_version(resolved_asset_key, DataVersion(data_version)) + if resolved_metadata: + output_name = self._context.output_for_asset_key(resolved_asset_key) + self._context.add_output_metadata(resolved_metadata, output_name) def _handle_log(self, message: str, level: str = "info") -> None: check.str_param(message, "message") diff --git a/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_external_asset.py b/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_external_asset.py index 83e763e279be0..a68fdde55e689 100644 --- a/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_external_asset.py +++ b/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_external_asset.py @@ -26,8 +26,10 @@ def script_fn(): value = 2 * multiplier context.log(f"{context.asset_key}: {2} * {multiplier} = {value}") - context.report_asset_metadata("value", value) - context.report_asset_data_version("alpha") + context.report_asset_materialization( + metadata={"value": value}, + data_version="alpha", + ) @contextmanager