From d0ba53710f7c2e934b76c009846eda704b47e872 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Fri, 15 Sep 2023 08:54:31 -0400 Subject: [PATCH] commit --- .../dagster-ext/dagster_ext/__init__.py | 31 +++++++++++++++++ .../dagster_ext_tests/test_context.py | 15 ++++++++ .../test_external_execution.py | 31 +++++++++++++++-- .../dagster/dagster/_core/ext/context.py | 34 +++++++++++++++++++ 4 files changed, 109 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster-ext/dagster_ext/__init__.py b/python_modules/dagster-ext/dagster_ext/__init__.py index 88cd8f99aedff..f85fbdcd65409 100644 --- a/python_modules/dagster-ext/dagster_ext/__init__.py +++ b/python_modules/dagster-ext/dagster_ext/__init__.py @@ -101,6 +101,8 @@ class ExtDataProvenance(TypedDict): is_user_provided: bool +ExtAssetCheckSeverity = Literal["WARN", "ERROR"] + ExtMetadataRawValue = Union[int, float, str, Mapping[str, Any], Sequence[Any], bool, None] @@ -826,6 +828,35 @@ def report_asset_materialization( ) self.materialized_assets.add(asset_key) + def report_asset_check_result( + self, + check_name: str, + success: bool, + severity: ExtAssetCheckSeverity = "ERROR", + metadata: Optional[Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]]] = None, + asset_key: Optional[str] = None, + ) -> None: + asset_key = _resolve_optionally_passed_asset_key( + self._data, asset_key, "report_asset_check_result" + ) + check_name = _assert_param_type(check_name, str, "report_asset_check_result", "check_name") + success = _assert_param_type(success, bool, "report_asset_check_result", "success") + metadata = ( + _normalize_param_metadata(metadata, "report_asset_check_result", "metadata") + if metadata + else None + ) + self._write_message( + "report_asset_check", + { + "asset_key": asset_key, + "check_name": check_name, + "success": success, + "metadata": metadata, + "severity": severity, + }, + ) + def log(self, message: str, level: str = "info") -> None: message = _assert_param_type(message, str, "log", "asset_key") level = _assert_param_value(level, ["info", "warning", "error"], "log", "level") diff --git a/python_modules/dagster-ext/dagster_ext_tests/test_context.py b/python_modules/dagster-ext/dagster_ext_tests/test_context.py index b17c95f117735..bedba04090b65 100644 --- a/python_modules/dagster-ext/dagster_ext_tests/test_context.py +++ b/python_modules/dagster-ext/dagster_ext_tests/test_context.py @@ -82,9 +82,20 @@ def test_single_asset_context(): context.report_asset_metadata("bar", "boo") context.report_asset_metadata("baz", 2, "int") context.report_asset_data_version("bar") + context.report_asset_check_result( + "foo_check", + True, + metadata={ + "meta_1": 1, + "meta_2": {"value": "foo", "metadata_type": "text"}, + }, + ) _assert_unknown_asset_key(context, "report_asset_metadata", "bar", "baz", asset_key="fake") _assert_unknown_asset_key(context, "report_asset_data_version", "bar", asset_key="fake") + _assert_unknown_asset_key( + context, "report_asset_check_result", "foo_check", True, asset_key="fake" + ) def test_multi_asset_context(): @@ -114,6 +125,10 @@ def test_multi_asset_context(): _assert_unknown_asset_key(context, "report_asset_metadata", "bar", "baz", asset_key="fake") _assert_undefined_asset_key(context, "report_asset_data_version", "bar") _assert_unknown_asset_key(context, "report_asset_data_version", "bar", asset_key="fake") + _assert_undefined_asset_key(context, "report_asset_check_result", "foo_check", True) + _assert_unknown_asset_key( + context, "report_asset_check_result", "foo_check", True, asset_key="fake" + ) def test_no_partition_context(): diff --git a/python_modules/dagster-ext/dagster_ext_tests/test_external_execution.py b/python_modules/dagster-ext/dagster_ext_tests/test_external_execution.py index 90f08200a0235..336338285475c 100644 --- a/python_modules/dagster-ext/dagster_ext_tests/test_external_execution.py +++ b/python_modules/dagster-ext/dagster_ext_tests/test_external_execution.py @@ -10,6 +10,7 @@ import boto3 import pytest from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.asset_check_spec import AssetCheckSpec from dagster._core.definitions.data_version import ( DATA_VERSION_IS_USER_PROVIDED_TAG, DATA_VERSION_TAG, @@ -44,6 +45,7 @@ ext_protocol, ) from dagster._core.instance_for_test import instance_for_test +from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus from dagster_aws.ext import ExtS3MessageReader from moto.server import ThreadedMotoServer @@ -94,6 +96,15 @@ def script_fn(): time.sleep(0.1) # sleep to make sure that we encompass multiple intervals for blob store IO context.report_asset_metadata("bar", context.get_extra("bar"), metadata_type="md") context.report_asset_data_version("alpha") + context.report_asset_check_result( + "foo_check", + success=True, + severity="WARN", + metadata={ + "meta_1": 1, + "meta_2": {"value": "foo", "metadata_type": "text"}, + }, + ) with temp_script(script_fn) as script_path: yield script_path @@ -145,7 +156,7 @@ def test_ext_subprocess( else: assert False, "Unreachable" - @asset + @asset(check_specs=[AssetCheckSpec(name="foo_check", asset=AssetKey(["foo"]))]) def foo(context: AssetExecutionContext, ext: ExtSubprocess): extras = {"bar": "baz"} cmd = [_PYTHON_EXECUTABLE, external_script] @@ -174,6 +185,14 @@ def foo(context: AssetExecutionContext, ext: ExtSubprocess): captured = capsys.readouterr() assert re.search(r"dagster - INFO - [^\n]+ - hello world\n", captured.err, re.MULTILINE) + asset_check_executions = instance.event_log_storage.get_asset_check_executions( + asset_key=foo.key, + check_name="foo_check", + limit=1, + ) + assert len(asset_check_executions) == 1 + assert asset_check_executions[0].status == AssetCheckExecutionRecordStatus.SUCCEEDED + def test_ext_multi_asset(): def script_fn(): @@ -327,7 +346,7 @@ def script_fn(): def test_ext_no_client(external_script): - @asset + @asset(check_specs=[AssetCheckSpec(name="foo_check", asset=AssetKey(["subproc_run"]))]) def subproc_run(context: AssetExecutionContext): extras = {"bar": "baz"} cmd = [_PYTHON_EXECUTABLE, external_script] @@ -352,3 +371,11 @@ def subproc_run(context: AssetExecutionContext): assert mat.asset_materialization.tags assert mat.asset_materialization.tags[DATA_VERSION_TAG] == "alpha" assert mat.asset_materialization.tags[DATA_VERSION_IS_USER_PROVIDED_TAG] + + asset_check_executions = instance.event_log_storage.get_asset_check_executions( + asset_key=subproc_run.key, + check_name="foo_check", + limit=1, + ) + assert len(asset_check_executions) == 1 + assert asset_check_executions[0].status == AssetCheckExecutionRecordStatus.SUCCEEDED diff --git a/python_modules/dagster/dagster/_core/ext/context.py b/python_modules/dagster/dagster/_core/ext/context.py index 17cfb8be9cb2d..2573104ac77eb 100644 --- a/python_modules/dagster/dagster/_core/ext/context.py +++ b/python_modules/dagster/dagster/_core/ext/context.py @@ -18,6 +18,8 @@ ) import dagster._check as check +from dagster._core.definitions.asset_check_result import AssetCheckResult +from dagster._core.definitions.asset_check_spec import AssetCheckSeverity from dagster._core.definitions.data_version import DataProvenance, DataVersion from dagster._core.definitions.events import AssetKey from dagster._core.definitions.metadata import MetadataValue, normalize_metadata_value @@ -37,6 +39,7 @@ def __init__(self, context: OpExecutionContext) -> None: self._result_queue: List[MaterializeResult] = [] self._metadata: Dict[AssetKey, Dict[str, MetadataValue]] = {} self._data_versions: Dict[AssetKey, DataVersion] = {} + self._check_results: Dict[AssetKey, List[AssetCheckResult]] = {} # The lock is used to guard modification of the asset materialization tracking data # structures, which are modified by both handler methods (and thus the message reader @@ -75,6 +78,8 @@ def handle_message(self, message: ExtMessage) -> None: self._handle_report_asset_data_version(**message["params"]) # type: ignore elif message["method"] == "report_asset_materialization": self._handle_report_asset_materialization(**message["params"]) # type: ignore + elif message["method"] == "report_asset_check": + self._handle_report_asset_check(**message["params"]) # type: ignore elif message["method"] == "log": self._handle_log(**message["params"]) # type: ignore @@ -152,6 +157,35 @@ def _handle_report_asset_data_version(self, asset_key: str, data_version: str) - resolved_asset_key = AssetKey.from_user_string(asset_key) self._data_versions[resolved_asset_key] = DataVersion(data_version) + def _handle_report_asset_check( + self, + asset_key: str, + check_name: str, + success: bool, + severity: str, + metadata: Mapping[str, ExtMetadataValue], + ) -> None: + check.str_param(asset_key, "asset_key") + check.str_param(check_name, "check_name") + check.bool_param(success, "success") + check.literal_param(severity, "severity", [x.value for x in AssetCheckSeverity]) + metadata = check.opt_mapping_param(metadata, "metadata", key_type=str) + resolved_asset_key = AssetKey.from_user_string(asset_key) + resolved_metadata = { + k: self._resolve_metadata_value(v["value"], v["metadata_type"]) + for k, v in metadata.items() + } + resolved_severity = AssetCheckSeverity(severity) + self._check_results.setdefault(resolved_asset_key, []).append( + AssetCheckResult( + asset_key=resolved_asset_key, + check_name=check_name, + success=success, + severity=resolved_severity, + metadata=resolved_metadata, + ) + ) + def _handle_log(self, message: str, level: str = "info") -> None: check.str_param(message, "message") self._context.log.log(level, message)