Skip to content

Commit

Permalink
tail file
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Aug 21, 2023
1 parent d4638b9 commit fa108d2
Showing 1 changed file with 39 additions and 20 deletions.
59 changes: 39 additions & 20 deletions python_modules/dagster/dagster/_core/external_execution/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import socket
import socketserver
import tempfile
import time
from contextlib import ExitStack, contextmanager
from copy import copy
from subprocess import Popen
Expand Down Expand Up @@ -167,19 +168,27 @@ def _output_context_manager(self) -> ContextManager[OutputIOYield]:
@contextmanager
def _stdio_output(self) -> Iterator[OutputIOYield]:
read_fd, write_fd = os.pipe()
thread = self._default_output_thread(read_fd)
yield write_fd, {}
os.close(write_fd)
thread.join()
is_task_complete = Event()
thread = self._default_output_thread(read_fd, is_task_complete)
try:
yield write_fd, {}
finally:
os.close(write_fd)
is_task_complete.set()
thread.join()

@contextmanager
def _file_output(self) -> Iterator[OutputIOYield]:
path = self._prepare_io_path(self._output_path, "output")
env = {DAGSTER_EXTERNAL_ENV_KEYS["output"]: path}
open(path, "w").close() # create file
is_task_complete = Event()
thread = self._default_output_thread(path, is_task_complete)
try:
yield None, env
self._read_output(path)
finally:
is_task_complete.set()
thread.join()
os.remove(path)

@contextmanager
Expand All @@ -188,19 +197,21 @@ def _fifo_output(self) -> Iterator[OutputIOYield]:
env = {DAGSTER_EXTERNAL_ENV_KEYS["output"]: path}
if not os.path.exists(path):
os.mkfifo(path)
# We open a write file descriptor for a FIFO in the orchestration
# process in order to keep the FIFO open for reading. Otherwise, the
# FIFO will receive EOF after the first file descriptor writing it is
# closed in an external process. Note that `O_RDWR` is required for
# this call to succeed when no readers are yet available, and
# `O_NONBLOCK` is required to prevent the `open` call from blocking.
dummy_handle = os.open(path, os.O_RDWR | os.O_NONBLOCK)
is_task_complete = Event()
thread = self._default_output_thread(path, is_task_complete)
try:
# We open a write file descriptor for a FIFO in the orchestration
# process in order to keep the FIFO open for reading. Otherwise, the
# FIFO will receive EOF after the first file descriptor writing it is
# closed in an external process. Note that `O_RDWR` is required for
# this call to succeed when no readers are yet available, and
# `O_NONBLOCK` is required to prevent the `open` call from blocking.
dummy_handle = os.open(path, os.O_RDWR | os.O_NONBLOCK)
thread = self._default_output_thread(path)
yield None, env
finally:
is_task_complete.set()
os.close(dummy_handle)
thread.join()
finally:
os.remove(path)

# Socket server is started/shutdown in a dedicated context manager to share logic between input
Expand Down Expand Up @@ -240,8 +251,10 @@ def _start_socket_server_thread(self, sockaddr: SocketAddress) -> Thread:
is_server_started.wait()
return thread

def _default_output_thread(self, path_or_fd: Union[str, int]) -> Thread:
thread = Thread(target=self._read_output, args=(path_or_fd,), daemon=True)
def _default_output_thread(
self, path_or_fd: Union[str, int], is_task_complete: Event
) -> Thread:
thread = Thread(target=self._read_output, args=(path_or_fd, is_task_complete), daemon=True)
thread.start()
return thread

Expand All @@ -252,11 +265,17 @@ def _write_input(self, path_or_fd: Union[str, int]) -> None:
json.dump(external_context, input_stream)

# Not used in socket mode
def _read_output(self, path_or_fd: Union[str, int]) -> Any:
def _read_output(self, path_or_fd: Union[str, int], is_task_complete: Event) -> Any:
with open(path_or_fd, "r") as output_stream:
for line in output_stream:
notification = json.loads(line)
self.handle_notification(notification)
while True:
line = output_stream.readline()
if line:
notification = json.loads(line)
self.handle_notification(notification)
elif is_task_complete.is_set():
break
else:
time.sleep(0.01)

# Only used in socket mode
def _start_socket_server(self, sockaddr: SocketAddress, is_server_started: Event) -> None:
Expand Down

0 comments on commit fa108d2

Please sign in to comment.