-
Notifications
You must be signed in to change notification settings - Fork 539
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[jobs] revamp scheduling for managed jobs #4485
base: master
Are you sure you want to change the base?
Conversation
sky/jobs/scheduler.py
Outdated
os.makedirs(logs_dir, exist_ok=True) | ||
log_path = os.path.join(logs_dir, f'{managed_job_id}.log') | ||
|
||
pid = subprocess_utils.launch_new_process_tree( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if scheduler is killed before this line (e.g. when running as part of a controller job), we will get stuck since the job will be submitted but the controller will never start. Todo figure out how to recover from this case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have a skylet event to monitor managed job table, like we do for normal unmanaged jobs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are already using the exiting managed job skylet event for that, but the problem is that if it dies right here, there's no way to know if the scheduler is just about to start the process or if it already died. We need a way to check if the scheduler died or maybe a timestamp for the WAITING -> LAUNCHING transition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cg505 for making this significant change! This is awesome! I glanced the code, and it mostly looks good. The main concern is the complexity and granularity we have for limiting the number of launches. Please see the comments below.
sky/jobs/constants.py
Outdated
@@ -2,10 +2,12 @@ | |||
|
|||
JOBS_CONTROLLER_TEMPLATE = 'jobs-controller.yaml.j2' | |||
JOBS_CONTROLLER_YAML_PREFIX = '~/.sky/jobs_controller' | |||
JOBS_CONTROLLER_LOGS_DIR = '~/sky_controller_logs' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we store the logs in either ~/sky_logs/jobs_controller
or ~/.sky
?
sky/jobs/scheduler.py
Outdated
print(launching_jobs, alive_jobs) | ||
print(_get_launch_parallelism(), _get_job_parallelism()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to redirect the logging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These were debug lines I left in accidentally. I am now using logger.debug. I guess that's probably fine?
sky/jobs/scheduler.py
Outdated
os.makedirs(logs_dir, exist_ok=True) | ||
log_path = os.path.join(logs_dir, f'{managed_job_id}.log') | ||
|
||
pid = subprocess_utils.launch_new_process_tree( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have a skylet event to monitor managed job table, like we do for normal unmanaged jobs.
sky/jobs/scheduler.py
Outdated
_ACTIVE_JOB_LAUNCH_WAIT_INTERVAL = 0.5 | ||
|
||
|
||
def schedule_step() -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about calling it schedule_new_job()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to maybe_start_waiting_jobs(). Open to other names - it might be a bit of a misnomer if it schedules an existing job that wants to launch something.
sky/jobs/scheduler.py
Outdated
|
||
|
||
def schedule_step() -> None: | ||
"""Determine if any jobs can be launched, and if so, launch them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"""Determine if any jobs can be launched, and if so, launch them. | |
"""Determine if any jobs can be scheduled, and if so, schedule them. |
sky/jobs/scheduler.py
Outdated
|
||
|
||
@contextlib.contextmanager | ||
def schedule_active_job_launch(is_running: bool): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this function may overcomplicated the problem here. For launches, we do not need to preserve a FIFO order.
Instead, we can use the schedule_step
above to preserve the order for the order of the jobs to be scheduled, while using a semaphore to limit the total number of launches.
Reason: I think we should limit the actual launches in a finer granularity:
skypilot/sky/jobs/recovery_strategy.py
Lines 311 to 321 in 83b2325
sky.launch( | |
self.dag, | |
cluster_name=self.cluster_name, | |
# We expect to tear down the cluster as soon as the job is | |
# finished. However, in case the controller dies, set | |
# autodown to try and avoid a resource leak. | |
idle_minutes_to_autostop=_AUTODOWN_MINUTES, | |
down=True, | |
detach_setup=True, | |
detach_run=True, | |
_is_launched_by_jobs_controller=True) |
Otherwise, when there is bad resource capacity, and the job went into an hour-long waiting loop, it will block other jobs from being able to launch their resources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this the schedule_step
only needs to check the threshold for the total number of jobs that can run in parallel, but don't need to check the total launches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated how this works significantly. New version will transition to ALIVE (does not count towards launching limit) while in backoff.
/quicktest-core |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cg505! This PR looks pretty good to me! We should do some thorough test with managed jobs, especially testing for:
- scheduling speed for jobs
- special cases that may get the scheduling stuck
- many jobs
- cancellation of jobs
- in parallel jobs scheduling
@@ -191,6 +190,8 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: | |||
f'Submitted managed job {self._job_id} (task: {task_id}, name: ' | |||
f'{task.name!r}); {constants.TASK_ID_ENV_VAR}: {task_id_env_var}') | |||
|
|||
scheduler.wait_until_launch_okay(self._job_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new API looks much better than before. Maybe we can turn this into a context so as to combine the wait
and finish
db_utils.add_column_to_table(cursor, conn, 'job_info', 'schedule_state', | ||
'TEXT') | ||
|
||
db_utils.add_column_to_table(cursor, conn, 'job_info', 'pid', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be called controller_pid
?
There is no well-defined mapping from the managed job status to schedule | ||
state or vice versa. (In fact, schedule state is defined on the job and | ||
status on the task.) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume the mapping is more like, INACTIVE and WAITING only happens during PENDING state, and ALIVE_WAITING only happens during RECORVING
state, etc. We can probably mention that.
A future TODO, reflect the schedule state in the displayed states or the description in sky jobs queue
, so that a user has a more detailed idea of what actual state the job is in.
except filelock.Timeout: | ||
# If we can't get the lock, just exit. The process holding the lock | ||
# should launch any pending jobs. | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we skip any scheduling call when there is an job holding the lock to start. Would it be possible that the following happens:
- This function is called in
job_done
for job 1. - This function schedule the next job, and the next job passed the wait_until_launch_okay, but takes forever for launching due to resource availability issue.
- The next pending job will not be scheduled until the next job transition, which may take a long time?
Also, there is a race between this function release the scheduling lock vs the underlying jobs.controller
calling the scheduling again. Should the underlying jobs.controller
to wait for this scheduler function to set some specific state before proceeding to trigger the scheduler?
] | ||
if show_all: | ||
columns += ['STARTED', 'CLUSTER', 'REGION', 'FAILURE'] | ||
columns += ['STARTED', 'CLUSTER', 'REGION', 'FAILURE', 'SCHED. STATE'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would prefer to not have the sched. state
column, instead, we may want to do something similar as kubectl describe pod
where it shows detailed description of what the pod is working on in the same state. For example, we can maybe rename the FAILURE
column to be DESCRIPTION
.
# Backwards compatibility: this job was submitted when ray was still | ||
# used for managing the parallelism of job controllers. This code | ||
# path can be removed before 0.11.0. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add TODO
in the comment to make it easier to search for.
# Warning: it's totally possible for the controller job to transition to | ||
# a terminal state during the course of this function. We will see that | ||
# as an abnormal failure. However, set_failed() will not update the | ||
# state in this case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Warning: it's totally possible for the controller job to transition to | |
# a terminal state during the course of this function. We will see that | |
# as an abnormal failure. However, set_failed() will not update the | |
# state in this case. | |
# Warning: it's totally possible for the controller job to transition to | |
# a terminal state during the course of this function. The set_failed() called below | |
# will not update the state for jobs already in terminal state, so it should be fine. |
Detaches the job controller from ray worker and the ray driver program, and uses our own scheduling and parallelism control mechanism.
See the commands in sky/jobs/scheduler.py for more info.
Tested (run the relevant ones):
bash format.sh
pytest tests/test_smoke.py
pytest tests/test_smoke.py::test_fill_in_the_name
conda deactivate; bash -i tests/backward_compatibility_tests.sh