Skip to content

Commit

Permalink
Use sync fns in except CancelledError blocks
Browse files Browse the repository at this point in the history
Signed-off-by: Fabrice Normandin <[email protected]>
  • Loading branch information
lebrice committed Apr 29, 2024
1 parent 7e1f9db commit fda3fd5
Showing 1 changed file with 31 additions and 20 deletions.
51 changes: 31 additions & 20 deletions milatools/utils/compute_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,17 @@ async def get_queued_milatools_job_ids(
return set([int(job_id_str) for job_id_str in jobs.splitlines()])


def get_queued_milatools_job_ids_sync(
login_node: RemoteV2, job_name="mila-code"
) -> set[int]:
"""Synchronous version for when we can't really use async (in an except block)."""
# NOTE: `since` is unused in this case.
jobs = login_node.get_output(
f"squeue --noheader --me --format=%A --name={job_name}"
)
return set([int(job_id_str) for job_id_str in jobs.splitlines()])


async def get_milatools_job_ids(
login_node: RemoteV2,
job_name="mila-code",
Expand Down Expand Up @@ -246,9 +257,11 @@ async def cancel_new_jobs_on_interrupt(login_node: RemoteV2, job_name: str):
)
try:
yield
except (KeyboardInterrupt, asyncio.CancelledError):
jobs_after = await get_queued_milatools_job_ids(login_node, job_name=job_name)
logger.warning("Interrupted before we were able to parse a job id!")
except (KeyboardInterrupt, asyncio.CancelledError) as err:
logger.warning(
f"Interrupted by {type(err).__name__} before we were able to parse a job id!"
)
jobs_after = get_queued_milatools_job_ids_sync(login_node, job_name=job_name)
# We were unable to get the job id, so we'll try to cancel only the newly
# spawned jobs from this user that match the set name.
new_jobs = list(set(jobs_after) - set(jobs_before))
Expand All @@ -268,12 +281,10 @@ async def cancel_new_jobs_on_interrupt(login_node: RemoteV2, job_name: str):
"Cancelling all of them to be safe...",
style="yellow",
)
await asyncio.shield(
login_node.run_async(
"scancel " + " ".join(str(job_id) for job_id in new_jobs),
display=True,
hide=False,
)
login_node.run(
"scancel " + " ".join(str(job_id) for job_id in new_jobs),
display=True,
hide=False,
)
else:
warnings.warn(
Expand Down Expand Up @@ -309,14 +320,14 @@ async def salloc(
# trying to go full-async so that the parsing of the job-id from stderr can
# eventually be done at the same time as something else (while waiting for the
# job to start) using things like `asyncio.gather` and `asyncio.wait_for`.
logger.debug(f"(local) $ {shlex.join(command)}")
console.log(
f"({login_node.hostname}) $ {salloc_command}", style="green", markup=False
)
async with cancel_new_jobs_on_interrupt(login_node, job_name):
# BUG: IF stdin is not set (or set to PIPE?) then writing `salloc`, then the
# terminal is actually 'live' and affects the compute node! For instance if
# you do `mila code .` and then write `salloc`, it spawns a second job!!
logger.debug(f"(local) $ {shlex.join(command)}")
console.log(
f"({login_node.hostname}) $ {salloc_command}", style="green", markup=False
)
salloc_subprocess = await asyncio.subprocess.create_subprocess_exec(
*command,
shell=False,
Expand All @@ -342,14 +353,14 @@ async def salloc(
try:
console.log(f"Waiting for job {job_id} to start.", style="green")
await wait_while_job_is_pending(login_node, job_id)
except (KeyboardInterrupt, asyncio.CancelledError):
except (KeyboardInterrupt, asyncio.CancelledError) as err:
logger.warning(
f"Interrupted by {type(err).__name__} while waiting for the job to start."
)
login_node.run(f"scancel {job_id}", display=True, hide=False)
if salloc_subprocess is not None:
logger.debug("Killing the salloc subprocess following a KeyboardInterrupt.")
await salloc_subprocess.communicate("exit\n".encode()) # noqa: UP012
# salloc_subprocess.send_signal(signal.SIGINT)
# salloc_subprocess.send_signal(signal.SIGKILL)
# salloc_subprocess.terminate()
await login_node.run_async(f"scancel {job_id}", display=True, hide=False)
logger.debug("Killing the salloc subprocess.")
salloc_subprocess.terminate()
raise

# todo: Are there are states between `PENDING` and `RUNNING`?
Expand Down

0 comments on commit fda3fd5

Please sign in to comment.