From e850e68e8efa2654f172755405807a70f2933b5a Mon Sep 17 00:00:00 2001 From: Filip Cacky Date: Sun, 20 Oct 2024 00:42:56 +0200 Subject: [PATCH] Use fifo for attribute file --- .../plugins/argo/argo_workflows_deployer.py | 10 +- .../step_functions/step_functions_deployer.py | 9 +- metaflow/runner/deployer.py | 12 +- metaflow/runner/metaflow_runner.py | 57 ++--- metaflow/runner/utils.py | 201 ++++++++++++------ 5 files changed, 173 insertions(+), 116 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows_deployer.py b/metaflow/plugins/argo/argo_workflows_deployer.py index 1a3056bc28d..b8c9500c702 100644 --- a/metaflow/plugins/argo/argo_workflows_deployer.py +++ b/metaflow/plugins/argo/argo_workflows_deployer.py @@ -1,5 +1,4 @@ import sys -import tempfile from typing import Optional, ClassVar from metaflow.plugins.argo.argo_workflows import ArgoWorkflows @@ -9,6 +8,7 @@ TriggeredRun, get_lower_level_group, handle_timeout, + temporary_fifo, ) @@ -207,16 +207,14 @@ def trigger(instance: DeployedFlow, **kwargs): Exception If there is an error during the trigger process. """ - with tempfile.TemporaryDirectory() as temp_dir: - tfp_runner_attribute = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False) - + with temporary_fifo() as (attribute_file_path, attribute_file_fd): # every subclass needs to have `self.deployer_kwargs` command = get_lower_level_group( instance.deployer.api, instance.deployer.top_level_kwargs, instance.deployer.TYPE, instance.deployer.deployer_kwargs, - ).trigger(deployer_attribute_file=tfp_runner_attribute.name, **kwargs) + ).trigger(deployer_attribute_file=attribute_file_path, **kwargs) pid = instance.deployer.spm.run_command( [sys.executable, *command], @@ -227,7 +225,7 @@ def trigger(instance: DeployedFlow, **kwargs): command_obj = instance.deployer.spm.get(pid) content = handle_timeout( - tfp_runner_attribute, command_obj, instance.deployer.file_read_timeout + attribute_file_fd, command_obj, instance.deployer.file_read_timeout ) if command_obj.process.returncode == 0: diff --git a/metaflow/plugins/aws/step_functions/step_functions_deployer.py b/metaflow/plugins/aws/step_functions/step_functions_deployer.py index d9186e771cb..433d4a8f2b1 100644 --- a/metaflow/plugins/aws/step_functions/step_functions_deployer.py +++ b/metaflow/plugins/aws/step_functions/step_functions_deployer.py @@ -10,6 +10,7 @@ TriggeredRun, get_lower_level_group, handle_timeout, + temporary_fifo, ) @@ -174,16 +175,14 @@ def trigger(instance: DeployedFlow, **kwargs): Exception If there is an error during the trigger process. """ - with tempfile.TemporaryDirectory() as temp_dir: - tfp_runner_attribute = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False) - + with temporary_fifo() as (attribute_file_path, attribute_file_fd): # every subclass needs to have `self.deployer_kwargs` command = get_lower_level_group( instance.deployer.api, instance.deployer.top_level_kwargs, instance.deployer.TYPE, instance.deployer.deployer_kwargs, - ).trigger(deployer_attribute_file=tfp_runner_attribute.name, **kwargs) + ).trigger(deployer_attribute_file=attribute_file_path, **kwargs) pid = instance.deployer.spm.run_command( [sys.executable, *command], @@ -194,7 +193,7 @@ def trigger(instance: DeployedFlow, **kwargs): command_obj = instance.deployer.spm.get(pid) content = handle_timeout( - tfp_runner_attribute, command_obj, instance.deployer.file_read_timeout + attribute_file_fd, command_obj, instance.deployer.file_read_timeout ) if command_obj.process.returncode == 0: diff --git a/metaflow/runner/deployer.py b/metaflow/runner/deployer.py index d2019856209..6aab46d5dd5 100644 --- a/metaflow/runner/deployer.py +++ b/metaflow/runner/deployer.py @@ -4,13 +4,12 @@ import time import importlib import functools -import tempfile from typing import Optional, Dict, ClassVar from metaflow.exception import MetaflowNotFound from metaflow.runner.subprocess_manager import SubprocessManager -from metaflow.runner.utils import handle_timeout +from metaflow.runner.utils import handle_timeout, temporary_fifo def get_lower_level_group( @@ -332,14 +331,11 @@ def create(self, **kwargs) -> DeployedFlow: Exception If there is an error during deployment. """ - with tempfile.TemporaryDirectory() as temp_dir: - tfp_runner_attribute = tempfile.NamedTemporaryFile( - dir=temp_dir, delete=False - ) + with temporary_fifo() as (attribute_file_path, attribute_file_fd): # every subclass needs to have `self.deployer_kwargs` command = get_lower_level_group( self.api, self.top_level_kwargs, self.TYPE, self.deployer_kwargs - ).create(deployer_attribute_file=tfp_runner_attribute.name, **kwargs) + ).create(deployer_attribute_file=attribute_file_path, **kwargs) pid = self.spm.run_command( [sys.executable, *command], @@ -350,7 +346,7 @@ def create(self, **kwargs) -> DeployedFlow: command_obj = self.spm.get(pid) content = handle_timeout( - tfp_runner_attribute, command_obj, self.file_read_timeout + attribute_file_fd, command_obj, self.file_read_timeout ) content = json.loads(content) self.name = content.get("name") diff --git a/metaflow/runner/metaflow_runner.py b/metaflow/runner/metaflow_runner.py index f1f0f68bb66..6323bf23a2f 100644 --- a/metaflow/runner/metaflow_runner.py +++ b/metaflow/runner/metaflow_runner.py @@ -2,13 +2,16 @@ import os import sys import json -import tempfile from typing import Dict, Iterator, Optional, Tuple from metaflow import Run, metadata -from .utils import handle_timeout, async_handle_timeout +from .utils import ( + temporary_fifo, + handle_timeout, + async_handle_timeout, +) from .subprocess_manager import CommandManager, SubprocessManager @@ -266,10 +269,8 @@ def __enter__(self) -> "Runner": async def __aenter__(self) -> "Runner": return self - def __get_executing_run(self, tfp_runner_attribute, command_obj): - content = handle_timeout( - tfp_runner_attribute, command_obj, self.file_read_timeout - ) + def __get_executing_run(self, attribute_file_fd, command_obj): + content = handle_timeout(attribute_file_fd, command_obj, self.file_read_timeout) content = json.loads(content) pathspec = "%s/%s" % (content.get("flow_name"), content.get("run_id")) @@ -280,9 +281,9 @@ def __get_executing_run(self, tfp_runner_attribute, command_obj): run_object = Run(pathspec, _namespace_check=False) return ExecutingRun(self, command_obj, run_object) - async def __async_get_executing_run(self, tfp_runner_attribute, command_obj): + async def __async_get_executing_run(self, attribute_file_fd, command_obj): content = await async_handle_timeout( - tfp_runner_attribute, command_obj, self.file_read_timeout + attribute_file_fd, command_obj, self.file_read_timeout ) content = json.loads(content) pathspec = "%s/%s" % (content.get("flow_name"), content.get("run_id")) @@ -310,12 +311,9 @@ def run(self, **kwargs) -> ExecutingRun: ExecutingRun ExecutingRun containing the results of the run. """ - with tempfile.TemporaryDirectory() as temp_dir: - tfp_runner_attribute = tempfile.NamedTemporaryFile( - dir=temp_dir, delete=False - ) + with temporary_fifo() as (attribute_file_path, attribute_file_fd): command = self.api(**self.top_level_kwargs).run( - runner_attribute_file=tfp_runner_attribute.name, **kwargs + runner_attribute_file=attribute_file_path, **kwargs ) pid = self.spm.run_command( @@ -326,7 +324,7 @@ def run(self, **kwargs) -> ExecutingRun: ) command_obj = self.spm.get(pid) - return self.__get_executing_run(tfp_runner_attribute, command_obj) + return self.__get_executing_run(attribute_file_fd, command_obj) def resume(self, **kwargs): """ @@ -344,12 +342,9 @@ def resume(self, **kwargs): ExecutingRun ExecutingRun containing the results of the resumed run. """ - with tempfile.TemporaryDirectory() as temp_dir: - tfp_runner_attribute = tempfile.NamedTemporaryFile( - dir=temp_dir, delete=False - ) + with temporary_fifo() as (attribute_file_path, attribute_file_fd): command = self.api(**self.top_level_kwargs).resume( - runner_attribute_file=tfp_runner_attribute.name, **kwargs + runner_attribute_file=attribute_file_path, **kwargs ) pid = self.spm.run_command( @@ -360,7 +355,7 @@ def resume(self, **kwargs): ) command_obj = self.spm.get(pid) - return self.__get_executing_run(tfp_runner_attribute, command_obj) + return self.__get_executing_run(attribute_file_fd, command_obj) async def async_run(self, **kwargs) -> ExecutingRun: """ @@ -380,12 +375,9 @@ async def async_run(self, **kwargs) -> ExecutingRun: ExecutingRun ExecutingRun representing the run that was started. """ - with tempfile.TemporaryDirectory() as temp_dir: - tfp_runner_attribute = tempfile.NamedTemporaryFile( - dir=temp_dir, delete=False - ) + with temporary_fifo() as (attribute_file_path, attribute_file_fd): command = self.api(**self.top_level_kwargs).run( - runner_attribute_file=tfp_runner_attribute.name, **kwargs + runner_attribute_file=attribute_file_path, **kwargs ) pid = await self.spm.async_run_command( @@ -395,9 +387,7 @@ async def async_run(self, **kwargs) -> ExecutingRun: ) command_obj = self.spm.get(pid) - return await self.__async_get_executing_run( - tfp_runner_attribute, command_obj - ) + return await self.__async_get_executing_run(attribute_file_fd, command_obj) async def async_resume(self, **kwargs): """ @@ -417,12 +407,9 @@ async def async_resume(self, **kwargs): ExecutingRun ExecutingRun representing the resumed run that was started. """ - with tempfile.TemporaryDirectory() as temp_dir: - tfp_runner_attribute = tempfile.NamedTemporaryFile( - dir=temp_dir, delete=False - ) + with temporary_fifo() as (attribute_file_path, attribute_file_fd): command = self.api(**self.top_level_kwargs).resume( - runner_attribute_file=tfp_runner_attribute.name, **kwargs + runner_attribute_file=attribute_file_path, **kwargs ) pid = await self.spm.async_run_command( @@ -432,9 +419,7 @@ async def async_resume(self, **kwargs): ) command_obj = self.spm.get(pid) - return await self.__async_get_executing_run( - tfp_runner_attribute, command_obj - ) + return await self.__async_get_executing_run(attribute_file_fd, command_obj) def __exit__(self, exc_type, exc_value, traceback): self.spm.cleanup() diff --git a/metaflow/runner/utils.py b/metaflow/runner/utils.py index a1467e51d3b..5d9aed138b2 100644 --- a/metaflow/runner/utils.py +++ b/metaflow/runner/utils.py @@ -2,9 +2,12 @@ import ast import time import asyncio - +import tempfile +import select +from contextlib import contextmanager from subprocess import CalledProcessError -from typing import Dict, TYPE_CHECKING + +from typing import TYPE_CHECKING if TYPE_CHECKING: from .subprocess_manager import CommandManager @@ -36,60 +39,137 @@ def format_flowfile(cell): return "\n".join(lines) -def check_process_exited(command_obj: "CommandManager"): +def check_process_exited(command_obj: "CommandManager") -> bool: if isinstance(command_obj.process, asyncio.subprocess.Process): return command_obj.process.returncode is not None else: return command_obj.process.poll() is not None -def read_from_file_when_ready( - file_path: str, command_obj: "CommandManager", timeout: float = 5 -): - start_time = time.time() - with open(file_path, "r", encoding="utf-8") as file_pointer: - content = file_pointer.read() - while not content: - if check_process_exited(command_obj): - # Check to make sure the file hasn't been read yet to avoid a race - # where the file is written between the end of this while loop and the - # poll call above. - content = file_pointer.read() - if content: - break - raise CalledProcessError( - command_obj.process.returncode, command_obj.command - ) - if time.time() - start_time > timeout: - raise TimeoutError( - "Timeout while waiting for file content from '%s'" % file_path - ) - time.sleep(0.1) - content = file_pointer.read() - return content - - -async def async_read_from_file_when_ready( - file_path: str, command_obj: "CommandManager", timeout: float = 5 -): - start_time = time.time() - with open(file_path, "r", encoding="utf-8") as file_pointer: - content = file_pointer.read() - while not content: - if check_process_exited(command_obj): - content = file_pointer.read() - if content: - break - raise CalledProcessError( - command_obj.process.returncode, command_obj.command - ) - if time.time() - start_time > timeout: - raise TimeoutError( - "Timeout while waiting for file content from '%s'" % file_path - ) - await asyncio.sleep(0.1) - content = file_pointer.read() - return content +@contextmanager +def temporary_fifo() -> (str, int): + """ + Create and open the read side of a temporary FIFO in a non-blocking mode. + + Returns + ------- + str + Path to the temporary FIFO. + int + File descriptor of the temporary FIFO. + """ + with tempfile.TemporaryDirectory() as temp_dir: + path = os.path.join(temp_dir, "fifo") + os.mkfifo(path) + # Blocks until the write side is opened unless in non-blocking mode + fd = os.open(path, os.O_RDONLY | os.O_NONBLOCK) + try: + yield path, fd + finally: + os.close(fd) + + +def read_from_fifo_when_ready( + fifo_fd: int, + command_obj: "CommandManager", + encoding: str = "utf-8", + timeout: int = 3600, +) -> str: + """ + Read the content from the FIFO file descriptor when it is ready. + + Parameters + ---------- + fifo_fd : int + File descriptor of the FIFO. + command_obj : CommandManager + Command manager object that handles the write side of the FIFO. + encoding : str, optional + Encoding to use while reading the file, by default "utf-8". + timeout : int, optional + Timeout for reading the file in milliseconds, by default 3600. + + Returns + ------- + str + Content read from the FIFO. + + Raises + ------ + TimeoutError + If no event occurs on the FIFO within the timeout. + CalledProcessError + If the process managed by `command_obj` has exited without writing any + content to the FIFO. + """ + content = bytearray() + + poll = select.poll() + poll.register(fifo_fd, select.POLLIN) + + while True: + poll_begin = time.time() + poll.poll(timeout) + timeout -= 1000 * (time.time() - poll_begin) + + if timeout <= 0: + raise TimeoutError("Timeout while waiting for the file content") + + try: + data = os.read(fifo_fd, 128) + while data: + content += data + data = os.read(fifo_fd, 128) + + # Read from a non-blocking closed FIFO returns an empty byte array + break + + except BlockingIOError: + # FIFO is open but no data is available yet + continue + + if not content and check_process_exited(command_obj): + raise CalledProcessError(command_obj.process.returncode, command_obj.command) + + return content.decode(encoding) + + +async def async_read_from_fifo_when_ready( + fifo_fd: int, + command_obj: "CommandManager", + encoding: str = "utf-8", + timeout: int = 3600, +) -> str: + """ + Read the content from the FIFO file descriptor when it is ready. + + Parameters + ---------- + fifo_fd : int + File descriptor of the FIFO. + command_obj : CommandManager + Command manager object that handles the write side of the FIFO. + encoding : str, optional + Encoding to use while reading the file, by default "utf-8". + timeout : int, optional + Timeout for reading the file in milliseconds, by default 3600. + + Returns + ------- + str + Content read from the FIFO. + + Raises + ------ + TimeoutError + If no event occurs on the FIFO within the timeout. + CalledProcessError + If the process managed by `command_obj` has exited without writing any + content to the FIFO. + """ + return await asyncio.to_thread( + read_from_fifo_when_ready, fifo_fd, command_obj, encoding, timeout + ) def make_process_error_message(command_obj: "CommandManager"): @@ -105,7 +185,7 @@ def make_process_error_message(command_obj: "CommandManager"): def handle_timeout( - tfp_runner_attribute, command_obj: "CommandManager", file_read_timeout: int + attribute_file_fd: int, command_obj: "CommandManager", file_read_timeout: int ): """ Handle the timeout for a running subprocess command that reads a file @@ -113,8 +193,8 @@ def handle_timeout( Parameters ---------- - tfp_runner_attribute : NamedTemporaryFile - Temporary file that stores runner attribute data. + attribute_file_fd : int + File descriptor belonging to the FIFO containing the attribute data. command_obj : CommandManager Command manager object that encapsulates the running command details. file_read_timeout : int @@ -132,16 +212,15 @@ def handle_timeout( stdout and stderr logs. """ try: - content = read_from_file_when_ready( - tfp_runner_attribute.name, command_obj, timeout=file_read_timeout + return read_from_fifo_when_ready( + attribute_file_fd, command_obj=command_obj, timeout=file_read_timeout ) - return content except (CalledProcessError, TimeoutError) as e: raise RuntimeError(make_process_error_message(command_obj)) from e async def async_handle_timeout( - tfp_runner_attribute, command_obj: "CommandManager", file_read_timeout: int + attribute_file_fd, command_obj: "CommandManager", file_read_timeout: int ): """ Handle the timeout for a running subprocess command that reads a file @@ -149,8 +228,8 @@ async def async_handle_timeout( Parameters ---------- - tfp_runner_attribute : NamedTemporaryFile - Temporary file that stores runner attribute data. + attribute_file_fd : int + File descriptor belonging to the FIFO containing the attribute data. command_obj : CommandManager Command manager object that encapsulates the running command details. file_read_timeout : int @@ -168,8 +247,8 @@ async def async_handle_timeout( stdout and stderr logs. """ try: - return await async_read_from_file_when_ready( - tfp_runner_attribute.name, command_obj, timeout=file_read_timeout + return await async_read_from_fifo_when_ready( + attribute_file_fd, command_obj=command_obj, timeout=file_read_timeout ) except (CalledProcessError, TimeoutError) as e: raise RuntimeError(make_process_error_message(command_obj)) from e