diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py b/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py index fd675a039b87a..20d918c45b2bc 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py @@ -1,6 +1,6 @@ import logging import os -from dataclasses import dataclass +from dataclasses import dataclass, field from pathlib import Path from typing import Callable, List, Mapping, Optional, Union @@ -24,8 +24,10 @@ _CORE_PACKAGES = [ "python_modules/dagster", - "python_modules/dagit", + "python_modules/dagster-ext", "python_modules/dagster-graphql", + "python_modules/dagster-webserver", + "python_modules/dagit", "js_modules/dagster-ui", ] @@ -108,6 +110,8 @@ class PackageSpec: timeout_in_minutes (int, optional): Fail after this many minutes. queue (BuildkiteQueue, optional): Schedule steps to this queue. run_pytest (bool, optional): Whether to run pytest. Enabled by default. + other_tox_envs (List[str], optional): Other tox testenvs to run as part of the package step + group. Defaults to []. """ directory: str @@ -126,6 +130,7 @@ class PackageSpec: queue: Optional[BuildkiteQueue] = None run_pytest: bool = True always_run_if: Optional[Callable[[], bool]] = None + other_tox_envs: List[str] = field(default_factory=list) def __post_init__(self): if not self.name: @@ -208,6 +213,9 @@ def build_steps(self) -> List[BuildkiteTopLevelStep]: ) ) + for testenv in self.other_tox_envs: + steps.append(build_tox_step(self.directory, testenv, base_label=testenv)) + emoji = _PACKAGE_TYPE_TO_EMOJI_MAP[self.package_type] # type: ignore[index] if len(steps) >= 2: return [ diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py b/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py index 19ce1fc130eb5..bba1fd29c39ee 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py @@ -354,6 +354,7 @@ def k8s_extra_cmds(version: str, _) -> List[str]: LIBRARY_PACKAGES_WITH_CUSTOM_CONFIG: List[PackageSpec] = [ PackageSpec("python_modules/automation"), + PackageSpec("python_modules/dagster-ext", other_tox_envs=["jsonschema"]), PackageSpec("python_modules/dagster-webserver", pytest_extra_cmds=ui_extra_cmds), PackageSpec( "python_modules/dagster", diff --git a/Makefile b/Makefile index 8f5d2c548aef8..68a41f7502878 100644 --- a/Makefile +++ b/Makefile @@ -93,3 +93,6 @@ check_manifest: check-manifest python_modules/dagster-webserver check-manifest python_modules/dagster-graphql ls python_modules/libraries | xargs -n 1 -Ipkg check-manifest python_modules/libraries/pkg + +ext_json_schema: + tox -c python_modules/dagster-ext -e jsonschema diff --git a/python_modules/dagster-ext/MANIFEST.in b/python_modules/dagster-ext/MANIFEST.in index b527c43b7cd88..639c8b99bb044 100644 --- a/python_modules/dagster-ext/MANIFEST.in +++ b/python_modules/dagster-ext/MANIFEST.in @@ -1,3 +1,4 @@ include README.md include LICENSE include dagster_ext/py.typed +include json_schema/*.json diff --git a/python_modules/dagster-ext/dagster_ext/__init__.py b/python_modules/dagster-ext/dagster_ext/__init__.py index e5bda2935c7e0..4167031aca9fe 100644 --- a/python_modules/dagster-ext/dagster_ext/__init__.py +++ b/python_modules/dagster-ext/dagster_ext/__init__.py @@ -134,6 +134,24 @@ class ExtMetadataValue(TypedDict): ] +# ##### JSON SCHEMA + +_JsonSchemaName = Literal["context", "message"] +_schema_root = os.path.join(os.path.dirname(__file__), "../json_schema") +_json_schemas: Dict[Literal["context", "message"], Mapping[str, Any]] = {} + + +def get_ext_json_schema_path(name: _JsonSchemaName) -> str: + return os.path.join(_schema_root, f"{name}.json") + + +def get_ext_json_schema(name: _JsonSchemaName) -> Mapping[str, Any]: + if name not in _json_schemas: + with open(get_ext_json_schema_path(name)) as f: + _json_schemas[name] = json.load(f) + return _json_schemas[name] + + # ######################## # ##### UTIL # ######################## diff --git a/python_modules/dagster-ext/dagster_ext_tests/test_context.py b/python_modules/dagster-ext/dagster_ext_tests/test_context.py index 4294e53bbeda7..9e03ca06489dc 100644 --- a/python_modules/dagster-ext/dagster_ext_tests/test_context.py +++ b/python_modules/dagster-ext/dagster_ext_tests/test_context.py @@ -1,13 +1,17 @@ from unittest.mock import MagicMock +import jsonschema import pytest from dagster_ext import ( + EXT_PROTOCOL_VERSION_FIELD, + PROTOCOL_VERSION, DagsterExtError, ExtContext, ExtContextData, ExtDataProvenance, ExtPartitionKeyRange, ExtTimeWindow, + get_ext_json_schema, ) TEST_EXT_CONTEXT_DEFAULTS = ExtContextData( @@ -25,9 +29,10 @@ def _make_external_execution_context(**kwargs): - kwargs = {**TEST_EXT_CONTEXT_DEFAULTS, **kwargs} + data = ExtContextData({**TEST_EXT_CONTEXT_DEFAULTS, **kwargs}) + jsonschema.validate(data, get_ext_json_schema("context")) return ExtContext( - data=ExtContextData(**kwargs), + data=data, message_channel=MagicMock(), ) @@ -170,3 +175,19 @@ def test_report_twice_materialized(): with pytest.raises(DagsterExtError, match="already been materialized"): context.report_asset_materialization(asset_key="foo") context.report_asset_materialization(asset_key="foo") + + +def test_message_json_schema_validation(): + message = { + EXT_PROTOCOL_VERSION_FIELD: PROTOCOL_VERSION, + "method": "foo", + "params": {"bar": "baz"}, + } + jsonschema.validate(message, get_ext_json_schema("message")) + + +def test_json_schema_rejects_invalid(): + with pytest.raises(jsonschema.ValidationError): + jsonschema.validate({"foo": "bar"}, get_ext_json_schema("context")) + with pytest.raises(jsonschema.ValidationError): + jsonschema.validate({"foo": "bar"}, get_ext_json_schema("message")) diff --git a/python_modules/dagster-ext/json_schema/context.json b/python_modules/dagster-ext/json_schema/context.json new file mode 100644 index 0000000000000..e086abae57944 --- /dev/null +++ b/python_modules/dagster-ext/json_schema/context.json @@ -0,0 +1,192 @@ +{ + "$defs": { + "ExtDataProvenance": { + "properties": { + "code_version": { + "title": "Code Version", + "type": "string" + }, + "input_data_versions": { + "additionalProperties": { + "type": "string" + }, + "title": "Input Data Versions", + "type": "object" + }, + "is_user_provided": { + "title": "Is User Provided", + "type": "boolean" + } + }, + "required": [ + "code_version", + "input_data_versions", + "is_user_provided" + ], + "title": "ExtDataProvenance", + "type": "object" + }, + "ExtPartitionKeyRange": { + "properties": { + "start": { + "title": "Start", + "type": "string" + }, + "end": { + "title": "End", + "type": "string" + } + }, + "required": [ + "start", + "end" + ], + "title": "ExtPartitionKeyRange", + "type": "object" + }, + "ExtTimeWindow": { + "properties": { + "start": { + "title": "Start", + "type": "string" + }, + "end": { + "title": "End", + "type": "string" + } + }, + "required": [ + "start", + "end" + ], + "title": "ExtTimeWindow", + "type": "object" + } + }, + "properties": { + "asset_keys": { + "anyOf": [ + { + "items": { + "type": "string" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "title": "Asset Keys" + }, + "code_version_by_asset_key": { + "anyOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ] + }, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Code Version By Asset Key" + }, + "provenance_by_asset_key": { + "anyOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "$ref": "#/$defs/ExtDataProvenance" + }, + { + "type": "null" + } + ] + }, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Provenance By Asset Key" + }, + "partition_key": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Partition Key" + }, + "partition_key_range": { + "anyOf": [ + { + "$ref": "#/$defs/ExtPartitionKeyRange" + }, + { + "type": "null" + } + ] + }, + "partition_time_window": { + "anyOf": [ + { + "$ref": "#/$defs/ExtTimeWindow" + }, + { + "type": "null" + } + ] + }, + "run_id": { + "title": "Run Id", + "type": "string" + }, + "job_name": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Job Name" + }, + "retry_number": { + "title": "Retry Number", + "type": "integer" + }, + "extras": { + "title": "Extras", + "type": "object" + } + }, + "required": [ + "asset_keys", + "code_version_by_asset_key", + "provenance_by_asset_key", + "partition_key", + "partition_key_range", + "partition_time_window", + "run_id", + "job_name", + "retry_number", + "extras" + ], + "title": "ExtContextData", + "type": "object" +} \ No newline at end of file diff --git a/python_modules/dagster-ext/json_schema/message.json b/python_modules/dagster-ext/json_schema/message.json new file mode 100644 index 0000000000000..83a3d51b89f23 --- /dev/null +++ b/python_modules/dagster-ext/json_schema/message.json @@ -0,0 +1,28 @@ +{ + "properties": { + "method": { + "title": "Method", + "type": "string" + }, + "params": { + "anyOf": [ + { + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Params" + }, + "__dagster_ext_version": { + "type": "string" + } + }, + "required": [ + "method", + "params" + ], + "title": "ExtMessage", + "type": "object" +} \ No newline at end of file diff --git a/python_modules/dagster-ext/setup.py b/python_modules/dagster-ext/setup.py index 7315d21f4832d..1722cba261a4b 100644 --- a/python_modules/dagster-ext/setup.py +++ b/python_modules/dagster-ext/setup.py @@ -32,11 +32,13 @@ def get_version() -> str: "Operating System :: OS Independent", ], packages=find_packages(exclude=["dagster_ext_tests*"]), + package_data={"dagster_ext": ["json_schema/*.json"]}, extras_require={ "test": [ f"dagster{pin}", "boto3", "botocore", + "jsonschema", "moto[s3,server]", ], }, diff --git a/python_modules/dagster-ext/tox.ini b/python_modules/dagster-ext/tox.ini index e8245b0178924..4b2b0678c621a 100644 --- a/python_modules/dagster-ext/tox.ini +++ b/python_modules/dagster-ext/tox.ini @@ -17,3 +17,13 @@ allowlist_externals = commands = !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster' pytest -c ../../pyproject.toml -vv ./dagster_ext_tests + +[testenv:jsonschema] +deps = + -e . + jsonschema + pydantic>2 +allowlist_externals = + git +commands = + python ../../scripts/generate_ext_json_schema.py diff --git a/scripts/generate_ext_json_schema.py b/scripts/generate_ext_json_schema.py new file mode 100644 index 0000000000000..2e8ed91f868cd --- /dev/null +++ b/scripts/generate_ext_json_schema.py @@ -0,0 +1,112 @@ +import json +import os +from typing import ( + Any, + Dict, + List, + Tuple, + Type, + Union, + get_args, + get_origin, + get_type_hints, +) + +import jsonschema +from dagster_ext import ( + ExtContextData, + ExtMessage, + get_ext_json_schema_path, +) +from pydantic import BaseModel, create_model +from typing_extensions import TypedDict, TypeGuard + +OUTPUT_FILEPATH = os.path.join( + os.path.dirname(__file__), "../python_modules/dagster-ext/ext_protocol_schema.json" +) + +MODEL_CACHE: Dict[str, Any] = {} + + +def main(): + context_schema = create_pydantic_model_from_typeddict(ExtContextData).model_json_schema() + message_schema = create_pydantic_model_from_typeddict(ExtMessage).model_json_schema() + inject_dagster_ext_version_field(message_schema) + jsonschema.Draft7Validator.check_schema(context_schema) + jsonschema.Draft7Validator.check_schema(message_schema) + + with open(get_ext_json_schema_path("context"), "w") as f: + f.write(json.dumps(context_schema, indent=2)) + with open(get_ext_json_schema_path("message"), "w") as f: + f.write(json.dumps(message_schema, indent=2)) + + +def create_pydantic_model_from_typeddict(typed_dict_cls: Type[TypedDict]) -> Type[BaseModel]: + """Create a Pydantic model from a TypedDict class. + + We use this instead of the Pydantic-provided `create_model_from_typeddict` because we need to + handle nested `TypedDict`. This funciton will convert any child `TypedDict` to a Pydantic model, + which is necessary for Pydantic JSON schema generation to work correctly. + """ + if typed_dict_cls.__name__ not in MODEL_CACHE: + annotations = get_type_hints(typed_dict_cls) + fields: Dict[str, Tuple[Type, ...]] = {} + for name, field_type in annotations.items(): + pydantic_type = normalize_field_type(field_type) + fields[name] = (pydantic_type, ...) + MODEL_CACHE[typed_dict_cls.__name__] = create_model(typed_dict_cls.__name__, **fields) # type: ignore + return MODEL_CACHE[typed_dict_cls.__name__] + + +def inject_dagster_ext_version_field(schema: Dict[str, Any]) -> None: + """Add `__dagster_ext_version` field to the schema. + This field is excluded from the Pydantic-constructed schema because it is underscore-prefixed, + which means it is not treated as a field by Pydantic. + """ + schema["properties"]["__dagster_ext_version"] = {"type": "string"} + + +def normalize_field_type(field_type: Type) -> Type: + origin = get_origin(field_type) + if origin is not None: # composite type + new_args = tuple(normalize_field_type(arg) for arg in get_args(field_type)) + return origin[new_args] + elif is_typed_dict_class(field_type): + return create_pydantic_model_from_typeddict(field_type) + else: + return field_type + + +def is_typed_dict_class(cls: Type) -> TypeGuard[Type[TypedDict]]: + x = isinstance(cls, type) and issubclass(cls, dict) and get_type_hints(cls) is not None + return x + + +def parse_type(typ) -> Tuple[bool, Type]: + origin = get_origin(typ) + args = get_args(typ) + is_optional = origin is Union and len(args) == 2 and any(arg is type(None) for arg in args) + unwrapped_type = args[0] if is_optional else typ + return is_optional, unwrapped_type + + +def merge_schemas(*schemas: Dict[str, Any]) -> Dict[str, Any]: + """Merge multiple JSON schemas into a single schema with a top-level `oneOf` property. + + This function is necessary because Pydantic does not support merging schemas. + """ + one_of: List[Any] = [] + defs = {} + for schema in schemas: + defs.update(schema.get("$defs", {})) + defs[schema["title"]] = {k: v for k, v in schema.items() if k != "definitions"} + one_of.append({"$ref": f"#/$defs/{schema['title']}"}) + return { + "$schema": "http://json-schema.org/draft-07/schema#", + "oneOf": one_of, + "$defs": defs, + } + + +if __name__ == "__main__": + main()