Skip to content

Commit

Permalink
Merge pull request #4 from filipcacky/async_runner_improvements_fixes
Browse files Browse the repository at this point in the history
Async runner improvements fixes
  • Loading branch information
filipcacky authored Nov 26, 2024
2 parents e305c68 + b3ed0f4 commit b06af09
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
5 changes: 3 additions & 2 deletions metaflow/runner/metaflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,10 @@ async def __async_get_executing_run(self, attribute_file_fd, command_obj):

# Set the correct metadata from the runner_attribute file corresponding to this run.
metadata_for_flow = content.get("metadata")
metadata(metadata_for_flow)

run_object = Run(pathspec, _namespace_check=False)
run_object = Run(
pathspec, _namespace_check=False, _current_metadata=metadata_for_flow
)
return ExecutingRun(self, command_obj, run_object)

def run(self, **kwargs) -> ExecutingRun:
Expand Down
28 changes: 20 additions & 8 deletions metaflow/runner/subprocess_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,31 @@ async def async_kill_processes_and_descendants(
# TODO: there's a race condition that new descendants might
# spawn b/w the invocations of 'pkill' and 'kill'.
# Needs to be fixed in future.
sub_term = await asyncio.create_subprocess_exec("pkill", "-TERM", "-P", *pids)
await sub_term.wait()
try:
sub_term = await asyncio.create_subprocess_exec("pkill", "-TERM", "-P", *pids)
await sub_term.wait()
except Exception:
pass

main_term = await asyncio.create_subprocess_exec("kill", "-TERM", *pids)
await main_term.wait()
try:
main_term = await asyncio.create_subprocess_exec("kill", "-TERM", *pids)
await main_term.wait()
except Exception:
pass

await asyncio.sleep(termination_timeout)

sub_kill = await asyncio.create_subprocess_exec("pkill", "-KILL", "-P", *pids)
await sub_kill.wait()
try:
sub_kill = await asyncio.create_subprocess_exec("pkill", "-KILL", "-P", *pids)
await sub_kill.wait()
except Exception:
pass

main_kill = await asyncio.create_subprocess_exec("kill", "-KILL", *pids)
await main_kill.wait()
try:
main_kill = await asyncio.create_subprocess_exec("kill", "-KILL", *pids)
await main_kill.wait()
except Exception:
pass


class LogReadTimeoutError(Exception):
Expand Down

0 comments on commit b06af09

Please sign in to comment.