Skip to content

Commit

Permalink
commit
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Sep 20, 2023
1 parent 7ccfd18 commit d0ba537
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 2 deletions.
31 changes: 31 additions & 0 deletions python_modules/dagster-ext/dagster_ext/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class ExtDataProvenance(TypedDict):
is_user_provided: bool


ExtAssetCheckSeverity = Literal["WARN", "ERROR"]

ExtMetadataRawValue = Union[int, float, str, Mapping[str, Any], Sequence[Any], bool, None]


Expand Down Expand Up @@ -826,6 +828,35 @@ def report_asset_materialization(
)
self.materialized_assets.add(asset_key)

def report_asset_check_result(
self,
check_name: str,
success: bool,
severity: ExtAssetCheckSeverity = "ERROR",
metadata: Optional[Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]]] = None,
asset_key: Optional[str] = None,
) -> None:
asset_key = _resolve_optionally_passed_asset_key(
self._data, asset_key, "report_asset_check_result"
)
check_name = _assert_param_type(check_name, str, "report_asset_check_result", "check_name")
success = _assert_param_type(success, bool, "report_asset_check_result", "success")
metadata = (
_normalize_param_metadata(metadata, "report_asset_check_result", "metadata")
if metadata
else None
)
self._write_message(
"report_asset_check",
{
"asset_key": asset_key,
"check_name": check_name,
"success": success,
"metadata": metadata,
"severity": severity,
},
)

def log(self, message: str, level: str = "info") -> None:
message = _assert_param_type(message, str, "log", "asset_key")
level = _assert_param_value(level, ["info", "warning", "error"], "log", "level")
Expand Down
15 changes: 15 additions & 0 deletions python_modules/dagster-ext/dagster_ext_tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,20 @@ def test_single_asset_context():
context.report_asset_metadata("bar", "boo")
context.report_asset_metadata("baz", 2, "int")
context.report_asset_data_version("bar")
context.report_asset_check_result(
"foo_check",
True,
metadata={
"meta_1": 1,
"meta_2": {"value": "foo", "metadata_type": "text"},
},
)

_assert_unknown_asset_key(context, "report_asset_metadata", "bar", "baz", asset_key="fake")
_assert_unknown_asset_key(context, "report_asset_data_version", "bar", asset_key="fake")
_assert_unknown_asset_key(
context, "report_asset_check_result", "foo_check", True, asset_key="fake"
)


def test_multi_asset_context():
Expand Down Expand Up @@ -114,6 +125,10 @@ def test_multi_asset_context():
_assert_unknown_asset_key(context, "report_asset_metadata", "bar", "baz", asset_key="fake")
_assert_undefined_asset_key(context, "report_asset_data_version", "bar")
_assert_unknown_asset_key(context, "report_asset_data_version", "bar", asset_key="fake")
_assert_undefined_asset_key(context, "report_asset_check_result", "foo_check", True)
_assert_unknown_asset_key(
context, "report_asset_check_result", "foo_check", True, asset_key="fake"
)


def test_no_partition_context():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import boto3
import pytest
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.data_version import (
DATA_VERSION_IS_USER_PROVIDED_TAG,
DATA_VERSION_TAG,
Expand Down Expand Up @@ -44,6 +45,7 @@
ext_protocol,
)
from dagster._core.instance_for_test import instance_for_test
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus
from dagster_aws.ext import ExtS3MessageReader
from moto.server import ThreadedMotoServer

Expand Down Expand Up @@ -94,6 +96,15 @@ def script_fn():
time.sleep(0.1) # sleep to make sure that we encompass multiple intervals for blob store IO
context.report_asset_metadata("bar", context.get_extra("bar"), metadata_type="md")
context.report_asset_data_version("alpha")
context.report_asset_check_result(
"foo_check",
success=True,
severity="WARN",
metadata={
"meta_1": 1,
"meta_2": {"value": "foo", "metadata_type": "text"},
},
)

with temp_script(script_fn) as script_path:
yield script_path
Expand Down Expand Up @@ -145,7 +156,7 @@ def test_ext_subprocess(
else:
assert False, "Unreachable"

@asset
@asset(check_specs=[AssetCheckSpec(name="foo_check", asset=AssetKey(["foo"]))])
def foo(context: AssetExecutionContext, ext: ExtSubprocess):
extras = {"bar": "baz"}
cmd = [_PYTHON_EXECUTABLE, external_script]
Expand Down Expand Up @@ -174,6 +185,14 @@ def foo(context: AssetExecutionContext, ext: ExtSubprocess):
captured = capsys.readouterr()
assert re.search(r"dagster - INFO - [^\n]+ - hello world\n", captured.err, re.MULTILINE)

asset_check_executions = instance.event_log_storage.get_asset_check_executions(
asset_key=foo.key,
check_name="foo_check",
limit=1,
)
assert len(asset_check_executions) == 1
assert asset_check_executions[0].status == AssetCheckExecutionRecordStatus.SUCCEEDED


def test_ext_multi_asset():
def script_fn():
Expand Down Expand Up @@ -327,7 +346,7 @@ def script_fn():


def test_ext_no_client(external_script):
@asset
@asset(check_specs=[AssetCheckSpec(name="foo_check", asset=AssetKey(["subproc_run"]))])
def subproc_run(context: AssetExecutionContext):
extras = {"bar": "baz"}
cmd = [_PYTHON_EXECUTABLE, external_script]
Expand All @@ -352,3 +371,11 @@ def subproc_run(context: AssetExecutionContext):
assert mat.asset_materialization.tags
assert mat.asset_materialization.tags[DATA_VERSION_TAG] == "alpha"
assert mat.asset_materialization.tags[DATA_VERSION_IS_USER_PROVIDED_TAG]

asset_check_executions = instance.event_log_storage.get_asset_check_executions(
asset_key=subproc_run.key,
check_name="foo_check",
limit=1,
)
assert len(asset_check_executions) == 1
assert asset_check_executions[0].status == AssetCheckExecutionRecordStatus.SUCCEEDED
34 changes: 34 additions & 0 deletions python_modules/dagster/dagster/_core/ext/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
)

import dagster._check as check
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster._core.definitions.data_version import DataProvenance, DataVersion
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.metadata import MetadataValue, normalize_metadata_value
Expand All @@ -37,6 +39,7 @@ def __init__(self, context: OpExecutionContext) -> None:
self._result_queue: List[MaterializeResult] = []
self._metadata: Dict[AssetKey, Dict[str, MetadataValue]] = {}
self._data_versions: Dict[AssetKey, DataVersion] = {}
self._check_results: Dict[AssetKey, List[AssetCheckResult]] = {}

# The lock is used to guard modification of the asset materialization tracking data
# structures, which are modified by both handler methods (and thus the message reader
Expand Down Expand Up @@ -75,6 +78,8 @@ def handle_message(self, message: ExtMessage) -> None:
self._handle_report_asset_data_version(**message["params"]) # type: ignore
elif message["method"] == "report_asset_materialization":
self._handle_report_asset_materialization(**message["params"]) # type: ignore
elif message["method"] == "report_asset_check":
self._handle_report_asset_check(**message["params"]) # type: ignore
elif message["method"] == "log":
self._handle_log(**message["params"]) # type: ignore

Expand Down Expand Up @@ -152,6 +157,35 @@ def _handle_report_asset_data_version(self, asset_key: str, data_version: str) -
resolved_asset_key = AssetKey.from_user_string(asset_key)
self._data_versions[resolved_asset_key] = DataVersion(data_version)

def _handle_report_asset_check(
self,
asset_key: str,
check_name: str,
success: bool,
severity: str,
metadata: Mapping[str, ExtMetadataValue],
) -> None:
check.str_param(asset_key, "asset_key")
check.str_param(check_name, "check_name")
check.bool_param(success, "success")
check.literal_param(severity, "severity", [x.value for x in AssetCheckSeverity])
metadata = check.opt_mapping_param(metadata, "metadata", key_type=str)
resolved_asset_key = AssetKey.from_user_string(asset_key)
resolved_metadata = {
k: self._resolve_metadata_value(v["value"], v["metadata_type"])
for k, v in metadata.items()
}
resolved_severity = AssetCheckSeverity(severity)
self._check_results.setdefault(resolved_asset_key, []).append(
AssetCheckResult(
asset_key=resolved_asset_key,
check_name=check_name,
success=success,
severity=resolved_severity,
metadata=resolved_metadata,
)
)

def _handle_log(self, message: str, level: str = "info") -> None:
check.str_param(message, "message")
self._context.log.log(level, message)
Expand Down

0 comments on commit d0ba537

Please sign in to comment.