Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async runner improvements #2056

Merged
merged 17 commits into from
Dec 2, 2024

Conversation

filipcacky
Copy link
Contributor

@filipcacky filipcacky commented Sep 24, 2024

Follow-up to #2053

Added an async implementation of Runner's signal handling, as discussed here #2053 (comment).

Added an async implementation of Runner's __get_executing_run, which is used in async_run and async_resume, the current implementation relies on time.sleep, which blocks other coroutines from running (modified code from my previous PR #2033).

async def async_read_from_file_when_ready(
file_path: str, command_obj: "CommandManager", timeout: float = 5
):
await asyncio.wait_for(command_obj.process.wait(), timeout)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we wait for the process to finish with a timeout..

I am not sure we want that. The file content can be written earlier than the finish time of the process.

And we probably want the file content back as soon as possible..

The synchronous version doesn't wait for the process to finish before checking the file content. It keeps reading the file in a loop, and as soon as there is content, it returns it—even if the process is still running.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't realize the poll call isn't blocking. I rewrote it so that its the same as the sync version except a call to asyncio.sleep.

The impl doesn't really sit right with me though, I can imagine a scenario in which a call to read doesn't return the entire file content, because another write is issued by the subprocess immediately after. I think we could reimplement this with pipes and poll to see if the write side (subprocess) has closed the pipe and thus indicated it finished the writing. I could take a look at this after.

Copy link
Collaborator

@madhur-ob madhur-ob Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we know for a fact that the file being written will only be written once..

This makes things a bit easier..

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we want to get the content ASAP since that allows us to fetch the Run object..
(while the process is still running..)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@filipcacky has a point here though -- it is true we write once and in most cases it should work fine but it is up to the OS to determine how the writes proceed and there is no guarantee that the content will be written all at once. Another mechanism may be warranted. File locks are always a bit annoying though (and not always reliable). We could also check for the presence of a particular string at the end or something (the OS should ensure writes are at least done sequentially). It may be worth fixing now because these types of bugs are particularly nasty when they do occur (it's rare but not impossible).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably not worth doing in this PR, but when I have a bit more time I can take a look at it).

Reimplementing it for a peace of mind and or better ergonomics on the write side (not being forced to make only a single write) and faster read on the read side (reading until a pipe close, instead of waking up and checking for a non-null content) is I think worth it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can resolve this comment for now? But if possible..
@filipcacky can you take a stab at it within this PR itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, reimplemented it via a FIFO, made the same changes to Deployer.

metaflow/runner/utils.py Outdated Show resolved Hide resolved
metaflow/runner/utils.py Outdated Show resolved Hide resolved
metaflow/runner/utils.py Outdated Show resolved Hide resolved
@filipcacky filipcacky requested a review from madhur-ob October 16, 2024 20:31
raise TimeoutError(
"Timeout while waiting for file content from '%s'" % file_path
)
await asyncio.sleep(0.1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the only difference with the sync version is this sleep statement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Although now i've just offloaded the sync version to asyncio.to_thread to avoid blocking the main thread in filesystem IO.

@@ -9,6 +9,8 @@
import threading
from typing import Callable, Dict, Iterator, List, Optional, Tuple

from .utils import check_process_exited


def kill_process_and_descendants(pid, termination_timeout):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can remove this? If it is not being used, since you replaced it by a plural version..

Actually, it is used in the kill() function..
Maybe you can modify kill to use your plural version and then get rid of this one perhaps..

If you do it, we can copy over the TODO comments to the plural version..
also, you can use this kill in _handle_sigint and _async_handle_sigint

Copy link
Contributor Author

@filipcacky filipcacky Oct 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reimplemented kill in terms of the plural version to keep the API incase someone depends on it.

also, you can use this kill in _handle_sigint and _async_handle_sigint

As in calling kill in a loop? I'd rather avoid that since we can just spawn one subprocess to send a signal to all managed processes instead of one for each.

@madhur-ob
Copy link
Collaborator

I guess we are close...

@filipcacky Can you probably post an example / code snippet through which we can observe the behaviour before and after merging of this PR? aka what is the exact subtle nuanced behaviour you ran into that this PR solved for you..

I will make sure to verify from my side as well...

@filipcacky filipcacky force-pushed the async_runner_improvements branch from ae75600 to a6aed6e Compare October 20, 2024 00:21
Comment on lines +116 to +117
poll_begin = time.time()
poll.poll(timeout)
timeout -= 1000 * (time.time() - poll_begin)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poll on the fd first in order to wait for the subprocess to open and write. Calling a read before the pipe is opened results in an empty response on a non-blocking fifo.

Comment on lines +175 to +176
return await asyncio.to_thread(
read_from_fifo_when_ready, fifo_fd, command_obj, encoding, timeout
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved FS IO to a separate thread in order to avoid blocking the main thread with the poll.

Comment on lines +124 to +129
data = os.read(fifo_fd, 128)
while data:
content += data
data = os.read(fifo_fd, 128)

# Read from a non-blocking closed FIFO returns an empty byte array
break
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is either going to break when the FIFO write-side has been closed, indicating all bytes have been read. Or it will raise a BlockingIOError (EAGAIN) indicating the write-side still has the FIFO opened but hasn't written to it yet.

The os.read and using a raw file descriptor is deliberate, the file.read from the wrapper returned by os.fdopen or simple open behaves differently.

@filipcacky
Copy link
Contributor Author

I'll try to come up with a script to demonstrate the differences before and after these changes. The goal is to offload non-asyncio operations and lengthy filesystem I/O from the main thread in the async interface, so it can handle other tasks in the meantime, which may be a bit hard to demonstrate.

@filipcacky
Copy link
Contributor Author

Scripts demonstrating the difference between old and new behaviour.

async_test_flow_runner.py

import metaflow
import asyncio

async def run_signal_test():
    async with metaflow.Runner(flow_file="./async_test_flow.py") as runner:
        run1 = await runner.async_run()
        run2 = await runner.async_run()
        await run2.wait()
        await run1.wait()
        
if __name__ == "__main__":
    asyncio.run(run_signal_test())

async_test_flow.py

from metaflow import FlowSpec, step
import time


class AsyncTestFlow(FlowSpec):
    @step
    def start(self):
        time.sleep(30)
        self.next(self.end)

    @step
    def end(self):
        pass


if __name__ == "__main__":
    AsyncTestFlow()

async_test.py

import metaflow
import asyncio
import signal
import time


async def run_signal_test():
    print("Running signal test")
    subprocess = await asyncio.create_subprocess_exec(
        "python", "async_test_flow_runner.py"
    )

    print("\tWaiting 5 seconds for flows to start")
    await asyncio.sleep(5)

    print("\tSending SIGINT to subprocess")
    begin = time.time()
    subprocess.send_signal(signal.SIGINT)
    await subprocess.wait()
    print(f"\tSubprocess exited in {time.time() - begin} seconds\n")


async def foo():
    """
    This function is used to simulate a long running task
    """

    print("\tRunning foo")
    for i in range(20):
        print(f"\tFoo {i}")
        await asyncio.sleep(0.1)


async def run_metaflow():
    async with metaflow.Runner(flow_file="./async_test_flow.py") as runner:
        print("\tRunning flow")
        run1 = await runner.async_run()
        print("\tWaiting for flow to finish")
        await run1.wait()


async def run_async_test():
    print("Running async test")
    t1 = asyncio.create_task(foo())
    t2 = asyncio.create_task(run_metaflow())
    await asyncio.gather(t1, t2)


if __name__ == "__main__":
    asyncio.run(run_signal_test())
    asyncio.run(run_async_test())

Old behaviour tested at cbfaea9 runner fixes and enhancements (#2053)

Running signal test
	Waiting 5 seconds for flows to start
	Sending SIGINT to subprocess
	Subprocess exited in 4.178466796875 seconds

Running async test
	Running foo
	Foo 0
	Running flow
	Waiting for flow to finish
	Foo 1
	...
	Foo 19

New behaviour

Running signal test
	Waiting 5 seconds for flows to start
	Sending SIGINT to subprocess
	Subprocess exited in 0.06499624252319336 seconds

Running async test
	Running foo
	Foo 0
	Running flow
	Foo 1
        ...
	Foo 10
	Waiting for flow to finish
	Foo 11
        ...
	Foo 19

With the new behaviour, other coroutines can run during await runner.async_run() - Foo 1 ... Foo 10.

The signal handler sends an INT to both run1 and run2, causing await run.wait() to complete for both, and the program terminates immediately. If the subprocesses don't terminate right away, they will be forcefully killed after the asyncio.sleep timeout expires.

@filipcacky filipcacky requested a review from madhur-ob October 21, 2024 15:07
@madhur-ob
Copy link
Collaborator

madhur-ob commented Oct 22, 2024

@filipcacky we have some more cleanup stuff here that was recently merged: #2104
maybe you can rebase on top of master to:

  • include those changes
  • resolve conflicts that exist..

@filipcacky filipcacky force-pushed the async_runner_improvements branch 3 times, most recently from e48db92 to e850e68 Compare October 22, 2024 14:07
@filipcacky
Copy link
Contributor Author

@madhur-ob Rebased, should be fine now.

@filipcacky filipcacky force-pushed the async_runner_improvements branch from e850e68 to 3fb6bd8 Compare October 28, 2024 14:33
@filipcacky filipcacky force-pushed the async_runner_improvements branch from 3fb6bd8 to ec34a71 Compare November 14, 2024 15:02

# Set the correct metadata from the runner_attribute file corresponding to this run.
metadata_for_flow = content.get("metadata")
metadata(metadata_for_flow)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are no longer setting the metadata explicitly.. i.e metadata(metadata_for_flow) needs to be removed..

refer to the implementation of __get_executing_run and you will see the following..

# Set the correct metadata from the runner_attribute file corresponding to this run.
metadata_for_flow = content.get("metadata")

run_object = Run(
    pathspec, _namespace_check=False, _current_metadata=metadata_for_flow
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed! Must've missed it while rebasing.


main_kill = await asyncio.create_subprocess_exec("kill", "-KILL", *pids)
await main_kill.wait()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can encapsulate the SIGTERM part and the SIGKILL part inside try/except each?
just like we did inside the sync version i.e. kill_processes_and_descendants

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was checking asyncio docs as well as the implementation while writing this part, didn't see any info about or any exceptions being raised, but i may have completely missed them, not familiar with the impl. Probably a good idea to wrap the calls regardless.

if command.process and not check_process_exited(command)
]
if pids:
kill_processes_and_descendants(pids, termination_timeout=2)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nitpick: earlier, termination_timeout was 2 seconds per PID
maybe we should pass 2*len(pids) here? since this will handle all of them collectively..

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly for the async version, if we choose to do this..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be necessary anymore. All the pids are enumerated before the call and are sent out simultaneously by kill/pkill, so they all get the same amount of time as before.

madhur-ob
madhur-ob previously approved these changes Nov 26, 2024
Copy link
Collaborator

@madhur-ob madhur-ob left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, one thing needs to be fixed i.e. #2056 (comment)

Other comments are minor / nitpicks but will be good to consider.

@savingoyal savingoyal merged commit 286f9ac into Netflix:master Dec 2, 2024
29 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants