diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py b/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py index 20d918c45b2bc..29a2bc05988ca 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py @@ -24,8 +24,8 @@ _CORE_PACKAGES = [ "python_modules/dagster", - "python_modules/dagster-ext", "python_modules/dagster-graphql", + "python_modules/dagster-pipes", "python_modules/dagster-webserver", "python_modules/dagit", "js_modules/dagster-ui", diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py b/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py index bba1fd29c39ee..55c4dcc967e81 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py @@ -354,7 +354,7 @@ def k8s_extra_cmds(version: str, _) -> List[str]: LIBRARY_PACKAGES_WITH_CUSTOM_CONFIG: List[PackageSpec] = [ PackageSpec("python_modules/automation"), - PackageSpec("python_modules/dagster-ext", other_tox_envs=["jsonschema"]), + PackageSpec("python_modules/dagster-pipes", other_tox_envs=["jsonschema"]), PackageSpec("python_modules/dagster-webserver", pytest_extra_cmds=ui_extra_cmds), PackageSpec( "python_modules/dagster", diff --git a/Makefile b/Makefile index 68a41f7502878..4486419a5fae0 100644 --- a/Makefile +++ b/Makefile @@ -94,5 +94,5 @@ check_manifest: check-manifest python_modules/dagster-graphql ls python_modules/libraries | xargs -n 1 -Ipkg check-manifest python_modules/libraries/pkg -ext_json_schema: - tox -c python_modules/dagster-ext -e jsonschema +pipes_json_schema: + tox -c python_modules/dagster-pipes -e jsonschema diff --git a/python_modules/dagster-ext/dagster_ext/__init__.py b/python_modules/dagster-ext/dagster_ext/__init__.py deleted file mode 100644 index c2f0ac7eff313..0000000000000 --- a/python_modules/dagster-ext/dagster_ext/__init__.py +++ /dev/null @@ -1,863 +0,0 @@ -import atexit -import base64 -import datetime -import json -import os -import sys -import time -import warnings -import zlib -from abc import ABC, abstractmethod -from contextlib import ExitStack, contextmanager -from io import StringIO -from threading import Event, Lock, Thread -from typing import ( - IO, - TYPE_CHECKING, - Any, - ClassVar, - Dict, - Generic, - Iterator, - Literal, - Mapping, - Optional, - Sequence, - TextIO, - Type, - TypedDict, - TypeVar, - Union, - cast, -) - -if TYPE_CHECKING: - from unittest.mock import MagicMock - -# ######################## -# ##### PROTOCOL -# ######################## - -# This represents the version of the protocol, rather than the version of the package. It must be -# manually updated whenever there are changes to the protocol. -EXT_PROTOCOL_VERSION = "0.1" - -ExtExtras = Mapping[str, Any] -ExtParams = Mapping[str, Any] - - -_ENV_KEY_PREFIX = "DAGSTER_EXT_" - - -def _param_name_to_env_key(key: str) -> str: - return f"{_ENV_KEY_PREFIX}{key.upper()}" - - -# ##### PARAMETERS - -IS_DAGSTER_EXT_PROCESS_ENV_VAR = "IS_DAGSTER_EXT_PROCESS" - -DAGSTER_EXT_ENV_KEYS = { - k: _param_name_to_env_key(k) for k in (IS_DAGSTER_EXT_PROCESS_ENV_VAR, "context", "messages") -} - - -# ##### MESSAGE - -# Can't use a constant for TypedDict key so this value is repeated in `ExtMessage` defn. -EXT_PROTOCOL_VERSION_FIELD = "__dagster_ext_version" - - -class ExtMessage(TypedDict): - __dagster_ext_version: str - method: str - params: Optional[Mapping[str, Any]] - - -# ##### EXT CONTEXT - - -class ExtContextData(TypedDict): - asset_keys: Optional[Sequence[str]] - code_version_by_asset_key: Optional[Mapping[str, Optional[str]]] - provenance_by_asset_key: Optional[Mapping[str, Optional["ExtDataProvenance"]]] - partition_key: Optional[str] - partition_key_range: Optional["ExtPartitionKeyRange"] - partition_time_window: Optional["ExtTimeWindow"] - run_id: str - job_name: Optional[str] - retry_number: int - extras: Mapping[str, Any] - - -class ExtPartitionKeyRange(TypedDict): - start: str - end: str - - -class ExtTimeWindow(TypedDict): - start: str # timestamp - end: str # timestamp - - -class ExtDataProvenance(TypedDict): - code_version: str - input_data_versions: Mapping[str, str] - is_user_provided: bool - - -ExtAssetCheckSeverity = Literal["WARN", "ERROR"] - -ExtMetadataRawValue = Union[int, float, str, Mapping[str, Any], Sequence[Any], bool, None] - - -class ExtMetadataValue(TypedDict): - type: "ExtMetadataType" - raw_value: ExtMetadataRawValue - - -# Infer the type from the raw value on the orchestration end -EXT_METADATA_TYPE_INFER = "__infer__" - -ExtMetadataType = Literal[ - "__infer__", - "text", - "url", - "path", - "notebook", - "json", - "md", - "float", - "int", - "bool", - "dagster_run", - "asset", - "null", -] - - -# ##### JSON SCHEMA - -_JsonSchemaName = Literal["context", "message"] -_schema_root = os.path.join(os.path.dirname(__file__), "../json_schema") -_json_schemas: Dict[Literal["context", "message"], Mapping[str, Any]] = {} - - -def get_ext_json_schema_path(name: _JsonSchemaName) -> str: - return os.path.join(_schema_root, f"{name}.json") - - -def get_ext_json_schema(name: _JsonSchemaName) -> Mapping[str, Any]: - if name not in _json_schemas: - with open(get_ext_json_schema_path(name)) as f: - _json_schemas[name] = json.load(f) - return _json_schemas[name] - - -# ######################## -# ##### UTIL -# ######################## - -_T = TypeVar("_T") - - -class DagsterExtError(Exception): - pass - - -class DagsterExtWarning(Warning): - pass - - -def _assert_not_none(value: Optional[_T], desc: Optional[str] = None) -> _T: - if value is None: - raise DagsterExtError(f"Missing required property: {desc}") - return value - - -def _assert_defined_asset_property(value: Optional[_T], key: str) -> _T: - return _assert_not_none(value, f"`{key}` is undefined. Current step does not target an asset.") - - -# This should only be called under the precondition that the current step targets assets. -def _assert_single_asset(data: ExtContextData, key: str) -> None: - asset_keys = data["asset_keys"] - assert asset_keys is not None - if len(asset_keys) != 1: - raise DagsterExtError(f"`{key}` is undefined. Current step targets multiple assets.") - - -def _resolve_optionally_passed_asset_key( - data: ExtContextData, - asset_key: Optional[str], - method: str, -) -> str: - asset_keys = _assert_defined_asset_property(data["asset_keys"], method) - asset_key = _assert_opt_param_type(asset_key, str, method, "asset_key") - if asset_key and asset_key not in asset_keys: - raise DagsterExtError( - f"Invalid asset key. Expected one of `{asset_keys}`, got `{asset_key}`." - ) - if not asset_key: - if len(asset_keys) != 1: - raise DagsterExtError( - f"Calling `{method}` without passing an asset key is undefined. Current step" - " targets multiple assets." - ) - asset_key = asset_keys[0] - return asset_key - - -def _assert_defined_partition_property(value: Optional[_T], key: str) -> _T: - return _assert_not_none( - value, f"`{key}` is undefined. Current step does not target any partitions." - ) - - -# This should only be called under the precondition that the current steps targets assets. -def _assert_single_partition(data: ExtContextData, key: str) -> None: - partition_key_range = data["partition_key_range"] - assert partition_key_range is not None - if partition_key_range["start"] != partition_key_range["end"]: - raise DagsterExtError(f"`{key}` is undefined. Current step targets multiple partitions.") - - -def _assert_defined_extra(extras: ExtExtras, key: str) -> Any: - if key not in extras: - raise DagsterExtError(f"Extra `{key}` is undefined. Extras must be provided by user.") - return extras[key] - - -def _assert_param_type(value: _T, expected_type: Any, method: str, param: str) -> _T: - if not isinstance(value, expected_type): - raise DagsterExtError( - f"Invalid type for parameter `{param}` of `{method}`. Expected `{expected_type}`, got" - f" `{type(value)}`." - ) - return value - - -def _assert_opt_param_type(value: _T, expected_type: Any, method: str, param: str) -> _T: - if not (isinstance(value, expected_type) or value is None): - raise DagsterExtError( - f"Invalid type for parameter `{param}` of `{method}`. Expected" - f" `Optional[{expected_type}]`, got `{type(value)}`." - ) - return value - - -def _assert_env_param_type( - env_params: ExtParams, key: str, expected_type: Type[_T], cls: Type -) -> _T: - value = env_params.get(key) - if not isinstance(value, expected_type): - raise DagsterExtError( - f"Invalid type for parameter `{key}` passed from orchestration side to" - f" `{cls.__name__}`. Expected `{expected_type}`, got `{type(value)}`." - ) - return value - - -def _assert_opt_env_param_type( - env_params: ExtParams, key: str, expected_type: Type[_T], cls: Type -) -> Optional[_T]: - value = env_params.get(key) - if value is not None and not isinstance(value, expected_type): - raise DagsterExtError( - f"Invalid type for parameter `{key}` passed from orchestration side to" - f" `{cls.__name__}`. Expected `Optional[{expected_type}]`, got `{type(value)}`." - ) - return value - - -def _assert_param_value(value: _T, expected_values: Sequence[_T], method: str, param: str) -> _T: - if value not in expected_values: - raise DagsterExtError( - f"Invalid value for parameter `{param}` of `{method}`. Expected one of" - f" `{expected_values}`, got `{value}`." - ) - return value - - -def _assert_opt_param_value( - value: _T, expected_values: Sequence[_T], method: str, param: str -) -> _T: - if value is not None and value not in expected_values: - raise DagsterExtError( - f"Invalid value for parameter `{param}` of `{method}`. Expected one of" - f" `{expected_values}`, got `{value}`." - ) - return value - - -def _assert_param_json_serializable(value: _T, method: str, param: str) -> _T: - try: - json.dumps(value) - except (TypeError, OverflowError): - raise DagsterExtError( - f"Invalid type for parameter `{param}` of `{method}`. Expected a JSON-serializable" - f" type, got `{type(value)}`." - ) - return value - - -_METADATA_VALUE_KEYS = frozenset(ExtMetadataValue.__annotations__.keys()) - - -def _normalize_param_metadata( - metadata: Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]], method: str, param: str -) -> Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]]: - _assert_param_type(metadata, dict, method, param) - new_metadata: Dict[str, ExtMetadataValue] = {} - for key, value in metadata.items(): - if not isinstance(key, str): - raise DagsterExtError( - f"Invalid type for parameter `{param}` of `{method}`. Expected a dict with string" - f" keys, got a key `{key}` of type `{type(key)}`." - ) - elif isinstance(value, dict): - if not {*value.keys()} == _METADATA_VALUE_KEYS: - raise DagsterExtError( - f"Invalid type for parameter `{param}` of `{method}`. Expected a dict with" - " string keys and values that are either raw metadata values or dictionaries" - f" with schema `{{raw_value: ..., type: ...}}`. Got a value `{value}`." - ) - new_metadata[key] = cast(ExtMetadataValue, value) - else: - new_metadata[key] = {"raw_value": value, "type": EXT_METADATA_TYPE_INFER} - return new_metadata - - -def _param_from_env_var(key: str) -> Any: - raw_value = os.environ.get(_param_name_to_env_var(key)) - return decode_env_var(raw_value) if raw_value is not None else None - - -def encode_env_var(value: Any) -> str: - serialized = json.dumps(value) - compressed = zlib.compress(serialized.encode("utf-8")) - encoded = base64.b64encode(compressed) - return encoded.decode("utf-8") # as string - - -def decode_env_var(value: Any) -> str: - decoded = base64.b64decode(value) - decompressed = zlib.decompress(decoded) - return json.loads(decompressed.decode("utf-8")) - - -def _param_name_to_env_var(param_name: str) -> str: - return f"{_ENV_KEY_PREFIX}{param_name.upper()}" - - -def _env_var_to_param_name(env_var: str) -> str: - return env_var[len(_ENV_KEY_PREFIX) :].lower() - - -def is_dagster_ext_process() -> bool: - return _param_from_env_var(IS_DAGSTER_EXT_PROCESS_ENV_VAR) - - -def _emit_orchestration_inactive_warning() -> None: - warnings.warn( - "This process was not launched by a Dagster orchestration process. All calls to the" - " `dagster-ext` context or attempts to initialize `dagster-ext` abstractions" - " are no-ops.", - category=DagsterExtWarning, - ) - - -def _get_mock() -> "MagicMock": - from unittest.mock import MagicMock - - return MagicMock() - - -# ######################## -# ##### IO - BASE -# ######################## - - -class ExtContextLoader(ABC): - @abstractmethod - @contextmanager - def load_context(self, params: ExtParams) -> Iterator[ExtContextData]: ... - - -T_MessageChannel = TypeVar("T_MessageChannel", bound="ExtMessageWriterChannel") - - -class ExtMessageWriter(ABC, Generic[T_MessageChannel]): - @abstractmethod - @contextmanager - def open(self, params: ExtParams) -> Iterator[T_MessageChannel]: ... - - -class ExtMessageWriterChannel(ABC, Generic[T_MessageChannel]): - @abstractmethod - def write_message(self, message: ExtMessage) -> None: ... - - -class ExtParamLoader(ABC): - @abstractmethod - def load_context_params(self) -> ExtParams: ... - - @abstractmethod - def load_messages_params(self) -> ExtParams: ... - - -T_BlobStoreMessageWriterChannel = TypeVar( - "T_BlobStoreMessageWriterChannel", bound="ExtBlobStoreMessageWriterChannel" -) - - -class ExtBlobStoreMessageWriter(ExtMessageWriter[T_BlobStoreMessageWriterChannel]): - def __init__(self, *, interval: float = 10): - self.interval = interval - - @contextmanager - def open(self, params: ExtParams) -> Iterator[T_BlobStoreMessageWriterChannel]: - channel = self.make_channel(params) - with channel.buffered_upload_loop(): - yield channel - - @abstractmethod - def make_channel(self, params: ExtParams) -> T_BlobStoreMessageWriterChannel: ... - - -class ExtBlobStoreMessageWriterChannel(ExtMessageWriterChannel): - def __init__(self, *, interval: float = 10): - self._interval = interval - self._lock = Lock() - self._buffer = [] - self._counter = 1 - - def write_message(self, message: ExtMessage) -> None: - with self._lock: - self._buffer.append(message) - - def flush_messages(self) -> Sequence[ExtMessage]: - with self._lock: - messages = list(self._buffer) - self._buffer.clear() - return messages - - @abstractmethod - def upload_messages_chunk(self, payload: StringIO, index: int) -> None: ... - - @contextmanager - def buffered_upload_loop(self) -> Iterator[None]: - thread = None - is_task_complete = Event() - try: - thread = Thread(target=self._upload_loop, args=(is_task_complete,), daemon=True) - thread.start() - yield - finally: - is_task_complete.set() - if thread: - thread.join(timeout=60) - - def _upload_loop(self, is_task_complete: Event) -> None: - start_or_last_upload = datetime.datetime.now() - while True: - num_pending = len(self._buffer) - now = datetime.datetime.now() - if num_pending == 0 and is_task_complete.is_set(): - break - elif is_task_complete.is_set() or (now - start_or_last_upload).seconds > self._interval: - payload = "\n".join([json.dumps(message) for message in self.flush_messages()]) - self.upload_messages_chunk(StringIO(payload), self._counter) - start_or_last_upload = now - self._counter += 1 - time.sleep(1) - - -class ExtBufferedFilesystemMessageWriterChannel(ExtBlobStoreMessageWriterChannel): - def __init__(self, path: str, *, interval: float = 10): - super().__init__(interval=interval) - self._path = path - - def upload_messages_chunk(self, payload: IO, index: int) -> None: - message_path = os.path.join(self._path, f"{index}.json") - with open(message_path, "w") as f: - f.write(payload.read()) - - -# ######################## -# ##### IO - DEFAULT -# ######################## - - -class ExtDefaultContextLoader(ExtContextLoader): - FILE_PATH_KEY = "path" - DIRECT_KEY = "data" - - @contextmanager - def load_context(self, params: ExtParams) -> Iterator[ExtContextData]: - if self.FILE_PATH_KEY in params: - path = _assert_env_param_type(params, self.FILE_PATH_KEY, str, self.__class__) - with open(path, "r") as f: - data = json.load(f) - yield data - elif self.DIRECT_KEY in params: - data = _assert_env_param_type(params, self.DIRECT_KEY, dict, self.__class__) - yield cast(ExtContextData, data) - else: - raise DagsterExtError( - f'Invalid params for {self.__class__.__name__}, expected key "{self.FILE_PATH_KEY}"' - f' or "{self.DIRECT_KEY}", received {params}', - ) - - -class ExtDefaultMessageWriter(ExtMessageWriter): - FILE_PATH_KEY = "path" - STDIO_KEY = "stdio" - STDERR = "stderr" - STDOUT = "stdout" - - @contextmanager - def open(self, params: ExtParams) -> Iterator[ExtMessageWriterChannel]: - if self.FILE_PATH_KEY in params: - path = _assert_env_param_type(params, self.FILE_PATH_KEY, str, self.__class__) - yield ExtFileMessageWriterChannel(path) - elif self.STDIO_KEY in params: - stream = _assert_env_param_type(params, self.STDIO_KEY, str, self.__class__) - if stream == self.STDERR: - yield ExtStreamMessageWriterChannel(sys.stderr) - elif stream == self.STDOUT: - yield ExtStreamMessageWriterChannel(sys.stdout) - else: - raise DagsterExtError( - f'Invalid value for key "std", expected "{self.STDERR}" or "{self.STDOUT}" but' - f" received {stream}" - ) - else: - raise DagsterExtError( - f'Invalid params for {self.__class__.__name__}, expected key "path" or "std",' - f" received {params}" - ) - - -class ExtFileMessageWriterChannel(ExtMessageWriterChannel): - def __init__(self, path: str): - self._path = path - - def write_message(self, message: ExtMessage) -> None: - with open(self._path, "a") as f: - f.write(json.dumps(message) + "\n") - - -class ExtStreamMessageWriterChannel(ExtMessageWriterChannel): - def __init__(self, stream: TextIO): - self._stream = stream - - def write_message(self, message: ExtMessage) -> None: - self._stream.writelines((json.dumps(message), "\n")) - - -class ExtEnvVarParamLoader(ExtParamLoader): - def load_context_params(self) -> ExtParams: - return _param_from_env_var("context") - - def load_messages_params(self) -> ExtParams: - return _param_from_env_var("messages") - - -# ######################## -# ##### IO - S3 -# ######################## - - -class ExtS3MessageWriter(ExtBlobStoreMessageWriter): - # client is a boto3.client("s3") object - def __init__(self, client: Any, *, interval: float = 10): - super().__init__(interval=interval) - # Not checking client type for now because it's a boto3.client object and we don't want to - # depend on boto3. - self._client = client - - def make_channel( - self, - params: ExtParams, - ) -> "ExtS3MessageChannel": - bucket = _assert_env_param_type(params, "bucket", str, self.__class__) - key_prefix = _assert_opt_env_param_type(params, "key_prefix", str, self.__class__) - return ExtS3MessageChannel( - client=self._client, - bucket=bucket, - key_prefix=key_prefix, - interval=self.interval, - ) - - -class ExtS3MessageChannel(ExtBlobStoreMessageWriterChannel): - # client is a boto3.client("s3") object - def __init__( - self, client: Any, bucket: str, key_prefix: Optional[str], *, interval: float = 10 - ): - super().__init__(interval=interval) - self._client = client - self._bucket = bucket - self._key_prefix = key_prefix - - def upload_messages_chunk(self, payload: IO, index: int) -> None: - key = f"{self._key_prefix}/{index}.json" if self._key_prefix else f"{index}.json" - self._client.put_object( - Body=payload.read(), - Bucket=self._bucket, - Key=key, - ) - - -# ######################## -# ##### IO - DBFS -# ######################## - - -class ExtDbfsContextLoader(ExtContextLoader): - @contextmanager - def load_context(self, params: ExtParams) -> Iterator[ExtContextData]: - unmounted_path = _assert_env_param_type(params, "path", str, self.__class__) - path = os.path.join("/dbfs", unmounted_path.lstrip("/")) - with open(path, "r") as f: - yield json.load(f) - - -class ExtDbfsMessageWriter(ExtBlobStoreMessageWriter): - def make_channel( - self, - params: ExtParams, - ) -> "ExtBufferedFilesystemMessageWriterChannel": - unmounted_path = _assert_env_param_type(params, "path", str, self.__class__) - return ExtBufferedFilesystemMessageWriterChannel( - path=os.path.join("/dbfs", unmounted_path.lstrip("/")), - interval=self.interval, - ) - - -# ######################## -# ##### CONTEXT -# ######################## - - -def init_dagster_ext( - *, - context_loader: Optional[ExtContextLoader] = None, - message_writer: Optional[ExtMessageWriter] = None, - param_loader: Optional[ExtParamLoader] = None, -) -> "ExtContext": - if ExtContext.is_initialized(): - return ExtContext.get() - - if is_dagster_ext_process(): - param_loader = param_loader or ExtEnvVarParamLoader() - context_params = param_loader.load_context_params() - messages_params = param_loader.load_messages_params() - context_loader = context_loader or ExtDefaultContextLoader() - message_writer = message_writer or ExtDefaultMessageWriter() - stack = ExitStack() - context_data = stack.enter_context(context_loader.load_context(context_params)) - message_channel = stack.enter_context(message_writer.open(messages_params)) - atexit.register(stack.__exit__, None, None, None) - context = ExtContext(context_data, message_channel) - else: - _emit_orchestration_inactive_warning() - context = _get_mock() - ExtContext.set(context) - return context - - -class ExtContext: - _instance: ClassVar[Optional["ExtContext"]] = None - - @classmethod - def is_initialized(cls) -> bool: - return cls._instance is not None - - @classmethod - def set(cls, context: "ExtContext") -> None: - cls._instance = context - - @classmethod - def get(cls) -> "ExtContext": - if cls._instance is None: - raise Exception( - "ExtContext has not been initialized. You must call `init_dagster_ext()`." - ) - return cls._instance - - def __init__( - self, - data: ExtContextData, - message_channel: ExtMessageWriterChannel, - ) -> None: - self._data = data - self._message_channel = message_channel - self._materialized_assets: set[str] = set() - - def _write_message(self, method: str, params: Optional[Mapping[str, Any]] = None) -> None: - message = ExtMessage( - {EXT_PROTOCOL_VERSION_FIELD: EXT_PROTOCOL_VERSION, "method": method, "params": params} - ) - self._message_channel.write_message(message) - - # ######################## - # ##### PUBLIC API - # ######################## - - @property - def is_asset_step(self) -> bool: - return self._data["asset_keys"] is not None - - @property - def asset_key(self) -> str: - asset_keys = _assert_defined_asset_property(self._data["asset_keys"], "asset_key") - _assert_single_asset(self._data, "asset_key") - return asset_keys[0] - - @property - def asset_keys(self) -> Sequence[str]: - asset_keys = _assert_defined_asset_property(self._data["asset_keys"], "asset_keys") - return asset_keys - - @property - def provenance(self) -> Optional[ExtDataProvenance]: - provenance_by_asset_key = _assert_defined_asset_property( - self._data["provenance_by_asset_key"], "provenance" - ) - _assert_single_asset(self._data, "provenance") - return next(iter(provenance_by_asset_key.values())) - - @property - def provenance_by_asset_key(self) -> Mapping[str, Optional[ExtDataProvenance]]: - provenance_by_asset_key = _assert_defined_asset_property( - self._data["provenance_by_asset_key"], "provenance_by_asset_key" - ) - return provenance_by_asset_key - - @property - def code_version(self) -> Optional[str]: - code_version_by_asset_key = _assert_defined_asset_property( - self._data["code_version_by_asset_key"], "code_version" - ) - _assert_single_asset(self._data, "code_version") - return next(iter(code_version_by_asset_key.values())) - - @property - def code_version_by_asset_key(self) -> Mapping[str, Optional[str]]: - code_version_by_asset_key = _assert_defined_asset_property( - self._data["code_version_by_asset_key"], "code_version_by_asset_key" - ) - return code_version_by_asset_key - - @property - def is_partition_step(self) -> bool: - return self._data["partition_key_range"] is not None - - @property - def partition_key(self) -> str: - partition_key = _assert_defined_partition_property( - self._data["partition_key"], "partition_key" - ) - return partition_key - - @property - def partition_key_range(self) -> Optional["ExtPartitionKeyRange"]: - partition_key_range = _assert_defined_partition_property( - self._data["partition_key_range"], "partition_key_range" - ) - return partition_key_range - - @property - def partition_time_window(self) -> Optional["ExtTimeWindow"]: - # None is a valid value for partition_time_window, but we check that a partition key range - # is defined. - _assert_defined_partition_property( - self._data["partition_key_range"], "partition_time_window" - ) - return self._data["partition_time_window"] - - @property - def run_id(self) -> str: - return self._data["run_id"] - - @property - def job_name(self) -> Optional[str]: - return self._data["job_name"] - - @property - def retry_number(self) -> int: - return self._data["retry_number"] - - def get_extra(self, key: str) -> Any: - return _assert_defined_extra(self._data["extras"], key) - - @property - def extras(self) -> Mapping[str, Any]: - return self._data["extras"] - - # ##### WRITE - - def report_asset_materialization( - self, - metadata: Optional[Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]]] = None, - data_version: Optional[str] = None, - asset_key: Optional[str] = None, - ): - asset_key = _resolve_optionally_passed_asset_key( - self._data, asset_key, "report_asset_materialization" - ) - if asset_key in self._materialized_assets: - raise DagsterExtError( - f"Calling `report_asset_materialization` with asset key `{asset_key}` is undefined." - " Asset has already been materialized, so no additional data can be reported" - " for it." - ) - metadata = ( - _normalize_param_metadata(metadata, "report_asset_materialization", "metadata") - if metadata - else None - ) - data_version = _assert_opt_param_type( - data_version, str, "report_asset_materialization", "data_version" - ) - self._write_message( - "report_asset_materialization", - {"asset_key": asset_key, "data_version": data_version, "metadata": metadata}, - ) - self._materialized_assets.add(asset_key) - - def report_asset_check( - self, - check_name: str, - success: bool, - severity: ExtAssetCheckSeverity = "ERROR", - metadata: Optional[Mapping[str, Union[ExtMetadataRawValue, ExtMetadataValue]]] = None, - asset_key: Optional[str] = None, - ) -> None: - asset_key = _resolve_optionally_passed_asset_key( - self._data, asset_key, "report_asset_check" - ) - check_name = _assert_param_type(check_name, str, "report_asset_check", "check_name") - success = _assert_param_type(success, bool, "report_asset_check", "success") - metadata = ( - _normalize_param_metadata(metadata, "report_asset_check", "metadata") - if metadata - else None - ) - self._write_message( - "report_asset_check", - { - "asset_key": asset_key, - "check_name": check_name, - "success": success, - "metadata": metadata, - "severity": severity, - }, - ) - - def log(self, message: str, level: str = "info") -> None: - message = _assert_param_type(message, str, "log", "asset_key") - level = _assert_param_value(level, ["info", "warning", "error"], "log", "level") - self._write_message("log", {"message": message, "level": level}) diff --git a/python_modules/dagster-pipes/MANIFEST.in b/python_modules/dagster-pipes/MANIFEST.in index 49defae2bfd50..b9f6352f7fa4a 100644 --- a/python_modules/dagster-pipes/MANIFEST.in +++ b/python_modules/dagster-pipes/MANIFEST.in @@ -1,3 +1,4 @@ include README.md include LICENSE include dagster_pipes/py.typed +include json_schema/*.json diff --git a/python_modules/dagster-pipes/dagster_pipes/__init__.py b/python_modules/dagster-pipes/dagster_pipes/__init__.py index f810a3c26514c..1d23b5df769e7 100644 --- a/python_modules/dagster-pipes/dagster_pipes/__init__.py +++ b/python_modules/dagster-pipes/dagster_pipes/__init__.py @@ -167,16 +167,17 @@ class PipesMetadataValue(TypedDict): _json_schemas: Dict[Literal["context", "message"], Mapping[str, Any]] = {} -def get_ext_json_schema_path(name: _JsonSchemaName) -> str: +def get_pipes_json_schema_path(name: _JsonSchemaName) -> str: return os.path.join(_schema_root, f"{name}.json") -def get_ext_json_schema(name: _JsonSchemaName) -> Mapping[str, Any]: +def get_pipes_json_schema(name: _JsonSchemaName) -> Mapping[str, Any]: if name not in _json_schemas: - with open(get_ext_json_schema_path(name)) as f: + with open(get_pipes_json_schema_path(name)) as f: _json_schemas[name] = json.load(f) return _json_schemas[name] + # ######################## # ##### UTIL # ######################## diff --git a/python_modules/dagster-pipes/dagster_pipes_tests/test_context.py b/python_modules/dagster-pipes/dagster_pipes_tests/test_context.py index 6982cbe0de24a..9acd80e9f366a 100644 --- a/python_modules/dagster-pipes/dagster_pipes_tests/test_context.py +++ b/python_modules/dagster-pipes/dagster_pipes_tests/test_context.py @@ -4,7 +4,6 @@ import jsonschema import pytest -<<<<<<< HEAD:python_modules/dagster-pipes/dagster_pipes_tests/test_context.py from dagster_pipes import ( PIPES_PROTOCOL_VERSION, PIPES_PROTOCOL_VERSION_FIELD, @@ -17,18 +16,7 @@ PipesParams, PipesPartitionKeyRange, PipesTimeWindow, -======= -from dagster_ext import ( - EXT_PROTOCOL_VERSION_FIELD, - PROTOCOL_VERSION, - DagsterExtError, - ExtContext, - ExtContextData, - ExtDataProvenance, - ExtPartitionKeyRange, - ExtTimeWindow, - get_ext_json_schema, ->>>>>>> c16b2ba0d1 ([externals] JsonSchema):python_modules/dagster-ext/dagster_ext_tests/test_context.py + get_pipes_json_schema, ) TEST_PIPES_CONTEXT_DEFAULTS = PipesContextData( @@ -55,19 +43,12 @@ def load_context(self, params: PipesParams) -> Iterator[PipesContextData]: def _make_external_execution_context(**kwargs): -<<<<<<< HEAD:python_modules/dagster-pipes/dagster_pipes_tests/test_context.py - kwargs = {**TEST_PIPES_CONTEXT_DEFAULTS, **kwargs} + data = PipesContextData(**{**TEST_PIPES_CONTEXT_DEFAULTS, **kwargs}) + jsonschema.validate(data, get_pipes_json_schema("context")) return PipesContext( params_loader=MagicMock(), - context_loader=_DirectContextLoader(PipesContextData(**kwargs)), + context_loader=_DirectContextLoader(data), message_writer=MagicMock(), -======= - data = ExtContextData({**TEST_EXT_CONTEXT_DEFAULTS, **kwargs}) - jsonschema.validate(data, get_ext_json_schema("context")) - return ExtContext( - data=data, - message_channel=MagicMock(), ->>>>>>> c16b2ba0d1 ([externals] JsonSchema):python_modules/dagster-ext/dagster_ext_tests/test_context.py ) @@ -223,7 +204,6 @@ def test_report_twice_materialized(): context.report_asset_materialization(asset_key="foo") -<<<<<<< HEAD:python_modules/dagster-pipes/dagster_pipes_tests/test_context.py def _make_pipes_message(method, params): return PipesMessage( { @@ -272,19 +252,19 @@ def test_multiple_close(): # `close` is idempotent, multiple calls should not raise an error context.close() context.close() -======= + + def test_message_json_schema_validation(): message = { - EXT_PROTOCOL_VERSION_FIELD: PROTOCOL_VERSION, + PIPES_PROTOCOL_VERSION_FIELD: PIPES_PROTOCOL_VERSION, "method": "foo", "params": {"bar": "baz"}, } - jsonschema.validate(message, get_ext_json_schema("message")) + jsonschema.validate(message, get_pipes_json_schema("message")) def test_json_schema_rejects_invalid(): with pytest.raises(jsonschema.ValidationError): - jsonschema.validate({"foo": "bar"}, get_ext_json_schema("context")) + jsonschema.validate({"foo": "bar"}, get_pipes_json_schema("context")) with pytest.raises(jsonschema.ValidationError): - jsonschema.validate({"foo": "bar"}, get_ext_json_schema("message")) ->>>>>>> c16b2ba0d1 ([externals] JsonSchema):python_modules/dagster-ext/dagster_ext_tests/test_context.py + jsonschema.validate({"foo": "bar"}, get_pipes_json_schema("message")) diff --git a/python_modules/dagster-ext/json_schema/context.json b/python_modules/dagster-pipes/json_schema/context.json similarity index 89% rename from python_modules/dagster-ext/json_schema/context.json rename to python_modules/dagster-pipes/json_schema/context.json index e086abae57944..4ccbff6e167d4 100644 --- a/python_modules/dagster-ext/json_schema/context.json +++ b/python_modules/dagster-pipes/json_schema/context.json @@ -1,6 +1,6 @@ { "$defs": { - "ExtDataProvenance": { + "PipesDataProvenance": { "properties": { "code_version": { "title": "Code Version", @@ -23,10 +23,10 @@ "input_data_versions", "is_user_provided" ], - "title": "ExtDataProvenance", + "title": "PipesDataProvenance", "type": "object" }, - "ExtPartitionKeyRange": { + "PipesPartitionKeyRange": { "properties": { "start": { "title": "Start", @@ -41,10 +41,10 @@ "start", "end" ], - "title": "ExtPartitionKeyRange", + "title": "PipesPartitionKeyRange", "type": "object" }, - "ExtTimeWindow": { + "PipesTimeWindow": { "properties": { "start": { "title": "Start", @@ -59,7 +59,7 @@ "start", "end" ], - "title": "ExtTimeWindow", + "title": "PipesTimeWindow", "type": "object" } }, @@ -105,7 +105,7 @@ "additionalProperties": { "anyOf": [ { - "$ref": "#/$defs/ExtDataProvenance" + "$ref": "#/$defs/PipesDataProvenance" }, { "type": "null" @@ -134,7 +134,7 @@ "partition_key_range": { "anyOf": [ { - "$ref": "#/$defs/ExtPartitionKeyRange" + "$ref": "#/$defs/PipesPartitionKeyRange" }, { "type": "null" @@ -144,7 +144,7 @@ "partition_time_window": { "anyOf": [ { - "$ref": "#/$defs/ExtTimeWindow" + "$ref": "#/$defs/PipesTimeWindow" }, { "type": "null" @@ -187,6 +187,6 @@ "retry_number", "extras" ], - "title": "ExtContextData", + "title": "PipesContextData", "type": "object" } \ No newline at end of file diff --git a/python_modules/dagster-ext/json_schema/message.json b/python_modules/dagster-pipes/json_schema/message.json similarity index 85% rename from python_modules/dagster-ext/json_schema/message.json rename to python_modules/dagster-pipes/json_schema/message.json index 83a3d51b89f23..2b3a14edecc32 100644 --- a/python_modules/dagster-ext/json_schema/message.json +++ b/python_modules/dagster-pipes/json_schema/message.json @@ -15,7 +15,7 @@ ], "title": "Params" }, - "__dagster_ext_version": { + "__dagster_pipes_version": { "type": "string" } }, @@ -23,6 +23,6 @@ "method", "params" ], - "title": "ExtMessage", + "title": "PipesMessage", "type": "object" } \ No newline at end of file diff --git a/python_modules/dagster-pipes/setup.py b/python_modules/dagster-pipes/setup.py index 5a1a1461e4877..7497153a57cb9 100644 --- a/python_modules/dagster-pipes/setup.py +++ b/python_modules/dagster-pipes/setup.py @@ -33,5 +33,10 @@ def get_version() -> str: ], packages=find_packages(exclude=["dagster_pipes_tests*"]), package_data={"dagster_pipes": ["json_schema/*.json"]}, + extras_require={ + "test": [ + "jsonschema", + ], + }, zip_safe=False, ) diff --git a/scripts/generate_ext_json_schema.py b/scripts/generate_ext_json_schema.py index 2e8ed91f868cd..4bd858b37b84a 100644 --- a/scripts/generate_ext_json_schema.py +++ b/scripts/generate_ext_json_schema.py @@ -13,31 +13,32 @@ ) import jsonschema -from dagster_ext import ( - ExtContextData, - ExtMessage, - get_ext_json_schema_path, +from dagster_pipes import ( + PIPES_PROTOCOL_VERSION_FIELD, + PipesContextData, + PipesMessage, + get_pipes_json_schema_path, ) from pydantic import BaseModel, create_model from typing_extensions import TypedDict, TypeGuard OUTPUT_FILEPATH = os.path.join( - os.path.dirname(__file__), "../python_modules/dagster-ext/ext_protocol_schema.json" + os.path.dirname(__file__), "../python_modules/dagster-pipes/pipes_protocol_schema.json" ) MODEL_CACHE: Dict[str, Any] = {} def main(): - context_schema = create_pydantic_model_from_typeddict(ExtContextData).model_json_schema() - message_schema = create_pydantic_model_from_typeddict(ExtMessage).model_json_schema() - inject_dagster_ext_version_field(message_schema) + context_schema = create_pydantic_model_from_typeddict(PipesContextData).model_json_schema() + message_schema = create_pydantic_model_from_typeddict(PipesMessage).model_json_schema() + inject_dagster_pipes_version_field(message_schema) jsonschema.Draft7Validator.check_schema(context_schema) jsonschema.Draft7Validator.check_schema(message_schema) - with open(get_ext_json_schema_path("context"), "w") as f: + with open(get_pipes_json_schema_path("context"), "w") as f: f.write(json.dumps(context_schema, indent=2)) - with open(get_ext_json_schema_path("message"), "w") as f: + with open(get_pipes_json_schema_path("message"), "w") as f: f.write(json.dumps(message_schema, indent=2)) @@ -58,12 +59,12 @@ def create_pydantic_model_from_typeddict(typed_dict_cls: Type[TypedDict]) -> Typ return MODEL_CACHE[typed_dict_cls.__name__] -def inject_dagster_ext_version_field(schema: Dict[str, Any]) -> None: - """Add `__dagster_ext_version` field to the schema. +def inject_dagster_pipes_version_field(schema: Dict[str, Any]) -> None: + f"""Add `{PIPES_PROTOCOL_VERSION_FIELD}` field to the schema. This field is excluded from the Pydantic-constructed schema because it is underscore-prefixed, which means it is not treated as a field by Pydantic. """ - schema["properties"]["__dagster_ext_version"] = {"type": "string"} + schema["properties"]["__dagster_pipes_version"] = {"type": "string"} def normalize_field_type(field_type: Type) -> Type: