From c50e6b463af972ddeb6ff237ca9f1c1e984904d1 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Thu, 21 Sep 2023 09:01:01 -0400 Subject: [PATCH] minor ext fixes --- python_modules/dagster-ext/MANIFEST.in | 2 +- python_modules/dagster-ext/dagster_ext/__init__.py | 6 +++--- python_modules/dagster/dagster/_core/ext/utils.py | 11 ++++------- .../libraries/dagster-aws/dagster_aws/ext.py | 2 +- .../libraries/dagster-k8s/dagster_k8s/ext.py | 4 ++-- 5 files changed, 11 insertions(+), 14 deletions(-) diff --git a/python_modules/dagster-ext/MANIFEST.in b/python_modules/dagster-ext/MANIFEST.in index ed3a9a1853f13..b527c43b7cd88 100644 --- a/python_modules/dagster-ext/MANIFEST.in +++ b/python_modules/dagster-ext/MANIFEST.in @@ -1,3 +1,3 @@ include README.md include LICENSE -include dagster_external/py.typed \ No newline at end of file +include dagster_ext/py.typed diff --git a/python_modules/dagster-ext/dagster_ext/__init__.py b/python_modules/dagster-ext/dagster_ext/__init__.py index a98bc911b118a..dec7db7c56de0 100644 --- a/python_modules/dagster-ext/dagster_ext/__init__.py +++ b/python_modules/dagster-ext/dagster_ext/__init__.py @@ -66,7 +66,7 @@ class ExtMessage(TypedDict): params: Optional[Mapping[str, Any]] -# ##### EXTERNAL EXECUTION CONTEXT +# ##### EXT CONTEXT class ExtContextData(TypedDict): @@ -624,11 +624,11 @@ def __init__( message_channel: ExtMessageWriterChannel, ) -> None: self._data = data - self.message_channel = message_channel + self._message_channel = message_channel def _write_message(self, method: str, params: Optional[Mapping[str, Any]] = None) -> None: message = ExtMessage(method=method, params=params) - self.message_channel.write_message(message) + self._message_channel.write_message(message) # ######################## # ##### PUBLIC API diff --git a/python_modules/dagster/dagster/_core/ext/utils.py b/python_modules/dagster/dagster/_core/ext/utils.py index 96ded10869647..9ea033ee4ea73 100644 --- a/python_modules/dagster/dagster/_core/ext/utils.py +++ b/python_modules/dagster/dagster/_core/ext/utils.py @@ -35,9 +35,6 @@ _CONTEXT_INJECTOR_FILENAME = "context" _MESSAGE_READER_FILENAME = "messages" -_CONTEXT_INJECTOR_FILENAME = "context" -_MESSAGE_READER_FILENAME = "messages" - class ExtFileContextInjector(ExtContextInjector): def __init__(self, path: str): @@ -112,7 +109,7 @@ def read_messages( ) -> Iterator[ExtParams]: with tempfile.TemporaryDirectory() as tempdir: with ExtFileMessageReader( - os.path.join(tempdir, _CONTEXT_INJECTOR_FILENAME) + os.path.join(tempdir, _MESSAGE_READER_FILENAME) ).read_messages(handler) as params: yield params @@ -199,15 +196,15 @@ def ext_protocol( that need to be provided to the external process. """ context_data = build_external_execution_context_data(context, extras) - msg_handler = ExtMessageHandler(context) + message_handler = ExtMessageHandler(context) with context_injector.inject_context( context_data, ) as ci_params, message_reader.read_messages( - msg_handler, + message_handler, ) as mr_params: yield ExtOrchestrationContext( context_data=context_data, - message_handler=msg_handler, + message_handler=message_handler, context_injector_params=ci_params, message_reader_params=mr_params, ) diff --git a/python_modules/libraries/dagster-aws/dagster_aws/ext.py b/python_modules/libraries/dagster-aws/dagster_aws/ext.py index 807c482f02971..5f05e3cc7a447 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws/ext.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/ext.py @@ -13,7 +13,7 @@ class ExtS3MessageReader(ExtBlobStoreMessageReader): - def __init__(self, *, interval: int = 10, bucket: str, client: boto3.client): + def __init__(self, *, interval: float = 10, bucket: str, client: boto3.client): super().__init__(interval=interval) self.bucket = check.str_param(bucket, "bucket") self.key_prefix = "".join(random.choices(string.ascii_letters, k=30)) diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s/ext.py b/python_modules/libraries/dagster-k8s/dagster_k8s/ext.py index 6a37341f770eb..c0a42e123104d 100644 --- a/python_modules/libraries/dagster-k8s/dagster_k8s/ext.py +++ b/python_modules/libraries/dagster-k8s/dagster_k8s/ext.py @@ -158,7 +158,7 @@ def run( extras=extras, context_injector=self.context_injector, message_reader=self.message_reader, - ) as ext_process: + ) as ext_context: namespace = namespace or "default" pod_name = get_pod_name(context.run_id, context.op.name) pod_body = build_pod_body( @@ -166,7 +166,7 @@ def run( image=image, command=command, env_vars={ - **ext_process.get_external_process_env_vars(), + **ext_context.get_external_process_env_vars(), **(self.env or {}), **(env or {}), },