diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py b/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py index 4bfd021278c01..96b8f25e79dd9 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py @@ -1,6 +1,6 @@ import logging import os -from dataclasses import dataclass +from dataclasses import dataclass, field from pathlib import Path from typing import Callable, List, Mapping, Optional, Union @@ -24,8 +24,10 @@ _CORE_PACKAGES = [ "python_modules/dagster", - "python_modules/dagit", + "python_modules/dagster-externals", "python_modules/dagster-graphql", + "python_modules/dagster-webserver", + "python_modules/dagit", "js_modules/dagster-ui", ] @@ -108,6 +110,8 @@ class PackageSpec: timeout_in_minutes (int, optional): Fail after this many minutes. queue (BuildkiteQueue, optional): Schedule steps to this queue. run_pytest (bool, optional): Whether to run pytest. Enabled by default. + other_tox_envs (List[str], optional): Other tox testenvs to run as part of the package step + group. Defaults to []. """ directory: str @@ -126,6 +130,7 @@ class PackageSpec: queue: Optional[BuildkiteQueue] = None run_pytest: bool = True always_run_if: Optional[Callable[[], bool]] = None + other_tox_envs: List[str] = field(default_factory=list) def __post_init__(self): if not self.name: @@ -208,6 +213,9 @@ def build_steps(self) -> List[BuildkiteTopLevelStep]: ) ) + for testenv in self.other_tox_envs: + steps.append(build_tox_step(self.directory, testenv, base_label=testenv)) + emoji = _PACKAGE_TYPE_TO_EMOJI_MAP[self.package_type] # type: ignore[index] if len(steps) >= 2: return [ diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py b/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py index 72add556a22ed..93cf34e221bad 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py @@ -352,6 +352,7 @@ def k8s_extra_cmds(version: str, _) -> List[str]: LIBRARY_PACKAGES_WITH_CUSTOM_CONFIG: List[PackageSpec] = [ PackageSpec("python_modules/automation"), + PackageSpec("python_modules/dagster-externals", other_tox_envs=["jsonschema"]), PackageSpec("python_modules/dagster-webserver", pytest_extra_cmds=ui_extra_cmds), PackageSpec( "python_modules/dagster", diff --git a/Makefile b/Makefile index 78bb952d69a6c..c062f8adc4b49 100644 --- a/Makefile +++ b/Makefile @@ -92,3 +92,6 @@ check_manifest: check-manifest python_modules/dagster-webserver check-manifest python_modules/dagster-graphql ls python_modules/libraries | xargs -n 1 -Ipkg check-manifest python_modules/libraries/pkg + +externals_json_schema: + python scripts/generate_externals_json_schema.py diff --git a/python_modules/dagster-externals/MANIFEST.in b/python_modules/dagster-externals/MANIFEST.in index ed3a9a1853f13..ad6ce14c5bd7f 100644 --- a/python_modules/dagster-externals/MANIFEST.in +++ b/python_modules/dagster-externals/MANIFEST.in @@ -1,3 +1,4 @@ include README.md include LICENSE -include dagster_external/py.typed \ No newline at end of file +include dagster_externals/py.typed +include externals_protocol_schema.json diff --git a/python_modules/dagster-externals/dagster_externals_tests/test_context.py b/python_modules/dagster-externals/dagster_externals_tests/test_context.py index 872efe68072c7..2796f1866e291 100644 --- a/python_modules/dagster-externals/dagster_externals_tests/test_context.py +++ b/python_modules/dagster-externals/dagster_externals_tests/test_context.py @@ -1,5 +1,8 @@ +import json +import os from unittest.mock import MagicMock +import jsonschema import pytest from dagster_externals._context import ExternalExecutionContext from dagster_externals._protocol import ( @@ -23,11 +26,18 @@ extras={}, ) +JSON_SCHEMA_PATH = os.path.join(os.path.dirname(__file__), "../externals_protocol_schema.json") + +with open(JSON_SCHEMA_PATH) as f: + JSON_SCHEMA = json.load(f) + def _make_external_execution_context(**kwargs): - kwargs = {**TEST_EXTERNAL_EXECUTION_CONTEXT_DEFAULTS, **kwargs} + data = ExternalExecutionContextData(**{**TEST_EXTERNAL_EXECUTION_CONTEXT_DEFAULTS, **kwargs}) + # This will error if the context doesn't match the schema + jsonschema.validate(data, JSON_SCHEMA) return ExternalExecutionContext( - data=ExternalExecutionContextData(**kwargs), + data=data, output_stream=MagicMock(), ) @@ -139,3 +149,13 @@ def test_extras_context(): assert context.get_extra("foo") == "bar" with pytest.raises(DagsterExternalError, match="Extra `bar` is undefined"): context.get_extra("bar") + + +def test_notification_json_schema_validation(): + notification = {"method": "foo", "params": {"bar": "baz"}} + jsonschema.validate(notification, JSON_SCHEMA) + + +def test_json_schema_rejects_invalid(): + with pytest.raises(jsonschema.ValidationError): + jsonschema.validate({"foo": "bar"}, JSON_SCHEMA) diff --git a/python_modules/dagster-externals/externals_protocol_schema.json b/python_modules/dagster-externals/externals_protocol_schema.json new file mode 100644 index 0000000000000..8ab2e077a6557 --- /dev/null +++ b/python_modules/dagster-externals/externals_protocol_schema.json @@ -0,0 +1,292 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "oneOf": [ + { + "$ref": "#/$defs/ExternalExecutionContextData" + }, + { + "$ref": "#/$defs/Notification" + } + ], + "$defs": { + "ExternalDataProvenance": { + "properties": { + "code_version": { + "title": "Code Version", + "type": "string" + }, + "input_data_versions": { + "additionalProperties": { + "type": "string" + }, + "title": "Input Data Versions", + "type": "object" + }, + "is_user_provided": { + "title": "Is User Provided", + "type": "boolean" + } + }, + "required": [ + "code_version", + "input_data_versions", + "is_user_provided" + ], + "title": "ExternalDataProvenance", + "type": "object" + }, + "ExternalPartitionKeyRange": { + "properties": { + "start": { + "title": "Start", + "type": "string" + }, + "end": { + "title": "End", + "type": "string" + } + }, + "required": [ + "start", + "end" + ], + "title": "ExternalPartitionKeyRange", + "type": "object" + }, + "ExternalTimeWindow": { + "properties": { + "start": { + "title": "Start", + "type": "string" + }, + "end": { + "title": "End", + "type": "string" + } + }, + "required": [ + "start", + "end" + ], + "title": "ExternalTimeWindow", + "type": "object" + }, + "ExternalExecutionContextData": { + "$defs": { + "ExternalDataProvenance": { + "properties": { + "code_version": { + "title": "Code Version", + "type": "string" + }, + "input_data_versions": { + "additionalProperties": { + "type": "string" + }, + "title": "Input Data Versions", + "type": "object" + }, + "is_user_provided": { + "title": "Is User Provided", + "type": "boolean" + } + }, + "required": [ + "code_version", + "input_data_versions", + "is_user_provided" + ], + "title": "ExternalDataProvenance", + "type": "object" + }, + "ExternalPartitionKeyRange": { + "properties": { + "start": { + "title": "Start", + "type": "string" + }, + "end": { + "title": "End", + "type": "string" + } + }, + "required": [ + "start", + "end" + ], + "title": "ExternalPartitionKeyRange", + "type": "object" + }, + "ExternalTimeWindow": { + "properties": { + "start": { + "title": "Start", + "type": "string" + }, + "end": { + "title": "End", + "type": "string" + } + }, + "required": [ + "start", + "end" + ], + "title": "ExternalTimeWindow", + "type": "object" + } + }, + "properties": { + "asset_keys": { + "anyOf": [ + { + "items": { + "type": "string" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "title": "Asset Keys" + }, + "code_version_by_asset_key": { + "anyOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ] + }, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Code Version By Asset Key" + }, + "provenance_by_asset_key": { + "anyOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "$ref": "#/$defs/ExternalDataProvenance" + }, + { + "type": "null" + } + ] + }, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Provenance By Asset Key" + }, + "partition_key": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Partition Key" + }, + "partition_key_range": { + "anyOf": [ + { + "$ref": "#/$defs/ExternalPartitionKeyRange" + }, + { + "type": "null" + } + ] + }, + "partition_time_window": { + "anyOf": [ + { + "$ref": "#/$defs/ExternalTimeWindow" + }, + { + "type": "null" + } + ] + }, + "run_id": { + "title": "Run Id", + "type": "string" + }, + "job_name": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Job Name" + }, + "retry_number": { + "title": "Retry Number", + "type": "integer" + }, + "extras": { + "title": "Extras", + "type": "object" + } + }, + "required": [ + "asset_keys", + "code_version_by_asset_key", + "provenance_by_asset_key", + "partition_key", + "partition_key_range", + "partition_time_window", + "run_id", + "job_name", + "retry_number", + "extras" + ], + "title": "ExternalExecutionContextData", + "type": "object" + }, + "Notification": { + "properties": { + "method": { + "title": "Method", + "type": "string" + }, + "params": { + "anyOf": [ + { + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Params" + } + }, + "required": [ + "method", + "params" + ], + "title": "Notification", + "type": "object" + } + } +} \ No newline at end of file diff --git a/python_modules/dagster-externals/setup.py b/python_modules/dagster-externals/setup.py index 793c97f7272ba..f64ed4a997d81 100644 --- a/python_modules/dagster-externals/setup.py +++ b/python_modules/dagster-externals/setup.py @@ -38,6 +38,7 @@ def get_version() -> str: extras_require={ "test": [ f"dagster{pin}", + "jsonschema", ] }, zip_safe=False, diff --git a/python_modules/dagster-externals/tox.ini b/python_modules/dagster-externals/tox.ini index a48293e39e8d5..f33757c1851e9 100644 --- a/python_modules/dagster-externals/tox.ini +++ b/python_modules/dagster-externals/tox.ini @@ -16,3 +16,14 @@ allowlist_externals = commands = !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster' pytest -c ../../pyproject.toml -vv ./dagster_externals_tests + +[testenv:jsonschema] +# When dagster is updated to support pydantic 2, we can just inherit dagster[test] +deps = + -e . + jsonschema + pydantic>2 +allowlist_externals = + git +commands = + python ../../scripts/generate_externals_json_schema.py diff --git a/scripts/generate_externals_json_schema.py b/scripts/generate_externals_json_schema.py new file mode 100644 index 0000000000000..2762407c71c5e --- /dev/null +++ b/scripts/generate_externals_json_schema.py @@ -0,0 +1,69 @@ +import json +import os +from typing import Any, Dict, List, Tuple, Type, get_type_hints + +import jsonschema +from dagster_externals import ExternalExecutionContextData, Notification +from pydantic import BaseModel, create_model +from typing_extensions import TypedDict, TypeGuard + +OUTPUT_FILEPATH = os.path.join( + os.path.dirname(__file__), "../python_modules/dagster-externals/externals_protocol_schema.json" +) + + +def main(): + context_schema = create_pydantic_model_from_typeddict( + ExternalExecutionContextData + ).model_json_schema() + notification_schema = create_pydantic_model_from_typeddict(Notification).model_json_schema() + merged_schema = merge_schemas(context_schema, notification_schema) + jsonschema.Draft7Validator.check_schema(merged_schema) + + with open(OUTPUT_FILEPATH, "w") as f: + f.write(json.dumps(merged_schema, indent=2)) + + +def create_pydantic_model_from_typeddict(typed_dict_cls: Type[TypedDict]) -> Type[BaseModel]: + """Create a Pydantic model from a TypedDict class. + + We use this instead of the Pydantic-provided `create_model_from_typeddict` because we need to + handle nested `TypedDict`. This funciton will convert any child `TypedDict` to a Pydantic model, + which is necessary for Pydantic JSON schema generation to work correctly. + """ + annotations = get_type_hints(typed_dict_cls) + fields: Dict[str, Tuple[Type, ...]] = {} + for name, field_type in annotations.items(): + pydantic_type = ( + create_pydantic_model_from_typeddict(field_type) + if is_typed_dict_class(field_type) + else field_type + ) + fields[name] = (pydantic_type, ...) + return create_model(typed_dict_cls.__name__, **fields) # type: ignore + + +def is_typed_dict_class(cls: Type) -> TypeGuard[Type[TypedDict]]: + return isinstance(cls, type) and issubclass(cls, dict) and get_type_hints(cls) is not None + + +def merge_schemas(*schemas: Dict[str, Any]) -> Dict[str, Any]: + """Merge multiple JSON schemas into a single schema with a top-level `oneOf` property. + + This function is necessary because Pydantic does not support merging schemas. + """ + one_of: List[Any] = [] + defs = {} + for schema in schemas: + defs.update(schema.get("$defs", {})) + defs[schema["title"]] = {k: v for k, v in schema.items() if k != "definitions"} + one_of.append({"$ref": f"#/$defs/{schema['title']}"}) + return { + "$schema": "http://json-schema.org/draft-07/schema#", + "oneOf": one_of, + "$defs": defs, + } + + +if __name__ == "__main__": + main()