Skip to content

Commit

Permalink
[ext] add asset checks to ext
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Sep 13, 2023
1 parent 355e5bc commit fff8e40
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 2 deletions.
67 changes: 66 additions & 1 deletion python_modules/dagster-ext/dagster_ext/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
TYPE_CHECKING,
Any,
ClassVar,
Dict,
Generic,
Iterator,
Literal,
Expand All @@ -26,6 +27,7 @@
Type,
TypedDict,
TypeVar,
Union,
cast,
get_args,
)
Expand Down Expand Up @@ -98,6 +100,16 @@ class ExtDataProvenance(TypedDict):
is_user_provided: bool


ExtAssetCheckSeverity = Literal["WARN", "ERROR"]

ExtMetadataRawValue = Union[int, float, str, Mapping[str, Any], Sequence[Any], bool, None]


class ExtMetadataValue(TypedDict):
metadata_type: Optional["ExtMetadataType"]
value: ExtMetadataRawValue


ExtMetadataType = Literal[
"text",
"url",
Expand Down Expand Up @@ -248,6 +260,30 @@ def _assert_opt_param_value(
return value


def _normalize_param_metadata(
metadata: Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]], method: str, param: str
) -> Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]]:
_assert_param_type(metadata, dict, method, param)
new_metadata: Dict[str, ExtMetadataValue] = {}
for key, value in metadata.items():
if not isinstance(key, str):
raise DagsterExtError(
f"Invalid type for parameter `{param}` of `{method}`. Expected a dict with string"
f" keys, got a key `{key}` of type `{type(key)}`."
)
elif isinstance(value, dict):
if not {*value.keys()} == {*ExtMetadataValue.__annotations__.keys()}:
raise DagsterExtError(
f"Invalid type for parameter `{param}` of `{method}`. Expected a dict with"
" string keys and values that are either raw metadata values or dictionaries"
f" with schema `{{value: ..., metadata_type: ...}}`. Got a value `{value}`."
)
new_metadata[key] = cast(ExtMetadataValue, value)
else:
new_metadata[key] = {"value": value, "metadata_type": None}
return new_metadata


def _assert_param_json_serializable(value: _T, method: str, param: str) -> _T:
try:
json.dumps(value)
Expand Down Expand Up @@ -701,7 +737,7 @@ def extras(self) -> Mapping[str, Any]:
def report_asset_metadata(
self,
label: str,
value: Any,
value: ExtMetadataRawValue,
metadata_type: Optional[ExtMetadataType] = None,
asset_key: Optional[str] = None,
) -> None:
Expand Down Expand Up @@ -729,6 +765,35 @@ def report_asset_data_version(self, data_version: str, asset_key: Optional[str]
"report_asset_data_version", {"asset_key": asset_key, "data_version": data_version}
)

def report_asset_check_result(
self,
check_name: str,
success: bool,
severity: ExtAssetCheckSeverity = "ERROR",
metadata: Optional[Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]]] = None,
asset_key: Optional[str] = None,
) -> None:
asset_key = _resolve_optionally_passed_asset_key(
self._data, asset_key, "report_asset_data_version"
)
check_name = _assert_param_type(check_name, str, "report_asset_check", "check_name")
success = _assert_param_type(success, bool, "report_asset_check", "success")
metadata = (
_normalize_param_metadata(metadata, "report_asset_check", "metadata")
if metadata
else None
)
self._write_message(
"report_asset_check",
{
"asset_key": asset_key,
"check_name": check_name,
"success": success,
"metadata": metadata,
"severity": severity,
},
)

def log(self, message: str, level: str = "info") -> None:
message = _assert_param_type(message, str, "log", "asset_key")
level = _assert_param_value(level, ["info", "warning", "error"], "log", "level")
Expand Down
11 changes: 11 additions & 0 deletions python_modules/dagster-ext/dagster_ext_tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,18 @@ def test_single_asset_context():
context.report_asset_metadata("bar", "boo")
context.report_asset_metadata("baz", 2, "int")
context.report_asset_data_version("bar")
context.report_asset_check(
"foo_check",
True,
metadata={
"meta_1": 1,
"meta_2": {"value": "foo", "type": "text"},
}
)

_assert_unknown_asset_key(context, "report_asset_metadata", "bar", "baz", asset_key="fake")
_assert_unknown_asset_key(context, "report_asset_data_version", "bar", asset_key="fake")
_assert_unknown_asset_key(context, "report_asset_check", "foo_check", True, asset_key="fake")


def test_multi_asset_context():
Expand Down Expand Up @@ -114,6 +123,8 @@ def test_multi_asset_context():
_assert_unknown_asset_key(context, "report_asset_metadata", "bar", "baz", asset_key="fake")
_assert_undefined_asset_key(context, "report_asset_data_version", "bar")
_assert_unknown_asset_key(context, "report_asset_data_version", "bar", asset_key="fake")
_assert_undefined_asset_key(context, "report_asset_check", "foo_check", True)
_assert_unknown_asset_key(context, "report_asset_check", "foo_check", True, asset_key="fake")


def test_no_partition_context():
Expand Down
32 changes: 32 additions & 0 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import dagster._check as check
from dagster._annotations import deprecated, experimental, public
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.data_version import (
Expand Down Expand Up @@ -464,6 +465,37 @@ def get_output_metadata(
output_name=output_name, mapping_key=mapping_key
)

@public
@experimental
def add_asset_check_result(self, asset_check_result: AssetCheckResult) -> None:
"""Add an asset check result for an asset being materialized in the current step.
Args:
asset_check_result (AssetCheckResult): The asset check result to add.
**Examples:**
.. code-block:: python
from dagster import op, AssetKey, AssetCheckSeverity
@asset
def foo_asset(context):
...
context.add_asset_check_result(
AssetCheckResult(
asset_key=AssetKey("my_asset"),
check_name="my_check",
success=True,
severity=AssetCheckSeverity.WARNING,
metadata={"foo": "bar"}
)
)
...
"""
check.inst_param(asset_check_result, "asset_check_result", AssetCheckResult)
self._step_execution_context.add_result_object(asset_check_result)

def get_step_execution_context(self) -> StepExecutionContext:
"""Allows advanced users (e.g. framework authors) to punch through to the underlying
step execution context.
Expand Down
10 changes: 10 additions & 0 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.data_version import (
DATA_VERSION_TAG,
SKIP_PARTITION_DATA_VERSION_DEPENDENCY_THRESHOLD,
Expand Down Expand Up @@ -52,6 +53,7 @@
has_one_dimension_time_window_partitioning,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.plan.compute import OpOutputUnion
from dagster._core.execution.plan.handle import ResolvedFromDynamicStepHandle, StepHandle
from dagster._core.execution.plan.outputs import StepOutputHandle
from dagster._core.execution.plan.step import ExecutionStep
Expand Down Expand Up @@ -553,6 +555,7 @@ def __init__(
self._step_output_capture = {}

self._output_metadata: Dict[str, Any] = {}
self._result_objects: List[OpOutputUnion] = []
self._seen_outputs: Dict[str, Union[str, Set[str]]] = {}

self._input_asset_version_info: Dict[AssetKey, Optional["InputAssetVersionInfo"]] = {}
Expand Down Expand Up @@ -790,6 +793,13 @@ def get_output_metadata(
return metadata.get(mapping_key)
return metadata

def add_result_object(self, obj: OpOutputUnion) -> None:
self._result_objects.append(obj)

@property
def asset_check_results(self) -> Sequence[AssetCheckResult]:
return self._asset_check_results

def _get_source_run_id_from_logs(self, step_output_handle: StepOutputHandle) -> Optional[str]:
# walk through event logs to find the right run_id based on the run lineage

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
from itertools import chain
from typing import (
AbstractSet,
Any,
Expand Down Expand Up @@ -90,7 +91,7 @@ def _process_asset_results_to_events(
- An AssetCheckEvaluation, which combines the check result with information from the context
to create a full picture of the asset check's evaluation.
"""
for user_event in user_event_sequence:
for user_event in chain(user_event_sequence, step_context.asset_check_results):
if isinstance(user_event, MaterializeResult):
assets_def = step_context.job_def.asset_layer.assets_def_for_node(
step_context.node_handle
Expand Down
34 changes: 34 additions & 0 deletions python_modules/dagster/dagster/_core/ext/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
ExtExtras,
ExtMessage,
ExtMetadataType,
ExtMetadataValue,
ExtParams,
ExtTimeWindow,
encode_env_var,
)

import dagster._check as check
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster._core.definitions.data_version import DataProvenance, DataVersion
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.metadata import MetadataValue, normalize_metadata_value
Expand All @@ -34,6 +37,8 @@ def handle_message(self, message: ExtMessage) -> None:
self._handle_report_asset_metadata(**message["params"]) # type: ignore
elif message["method"] == "report_asset_data_version":
self._handle_report_asset_data_version(**message["params"]) # type: ignore
elif message["method"] == "report_asset_check":
self._handle_report_asset_check(**message["params"]) # type: ignore
elif message["method"] == "log":
self._handle_log(**message["params"]) # type: ignore

Expand Down Expand Up @@ -88,6 +93,35 @@ def _handle_report_asset_data_version(self, asset_key: str, data_version: str) -
key = AssetKey.from_user_string(asset_key)
self._context.set_data_version(key, DataVersion(data_version))

def _handle_report_asset_check(
self,
asset_key: str,
check_name: str,
success: bool,
severity: str,
metadata: Mapping[str, ExtMetadataValue],
) -> None:
check.str_param(asset_key, "asset_key")
check.str_param(check_name, "check_name")
check.bool_param(success, "success")
check.literal_param(severity, "severity", [x.value for x in AssetCheckSeverity])
metadata = check.opt_mapping_param(metadata, "metadata", key_type=str)
resolved_asset_key = AssetKey.from_user_string(asset_key)
resolved_metadata = {
k: self._resolve_metadata_value(v["value"], v["metadata_type"])
for k, v in metadata.items()
}
resolved_severity = AssetCheckSeverity(severity)
self._context.add_asset_check_result(
AssetCheckResult(
asset_key=resolved_asset_key,
check_name=check_name,
success=success,
severity=resolved_severity,
metadata=resolved_metadata,
)
)

def _handle_log(self, message: str, level: str = "info") -> None:
check.str_param(message, "message")
self._context.log.log(level, message)
Expand Down

0 comments on commit fff8e40

Please sign in to comment.