Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jobs] revamp scheduling for managed jobs #4485

Merged
merged 28 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
78eef52
revamp scheduling for managed jobs
cg505 Dec 19, 2024
4c54642
simplify locking mechanism
cg505 Dec 20, 2024
aeaaf7b
additional fixes
cg505 Dec 20, 2024
3b4cf44
fix pid writing
cg505 Dec 20, 2024
ea84bb4
Merge branch 'master' of github.com:skypilot-org/skypilot into manage…
cg505 Dec 20, 2024
a255236
address PR comments
cg505 Jan 6, 2025
0c54fc6
fix jobs logs --controller
cg505 Jan 8, 2025
734fdc2
catch inconsistent schedule state in update_managed_job_status
cg505 Jan 9, 2025
8685abb
fix bug
cg505 Jan 9, 2025
88eb21e
Merge branch 'master' of github.com:skypilot-org/skypilot into manage…
cg505 Jan 11, 2025
6a75cdf
fix --sync-down for new controller logs location
cg505 Jan 11, 2025
ce20367
fix sync-down code
cg505 Jan 14, 2025
6b24481
update default controller resources
cg505 Jan 14, 2025
227be43
make sure lock dir is created
cg505 Jan 14, 2025
4dc3503
rename schedule_step function
cg505 Jan 14, 2025
7aac560
check for pid reuse
cg505 Jan 14, 2025
d7e4a7d
add comments/logs to update_managed_job_status
cg505 Jan 14, 2025
1fb3db5
Update sky/jobs/utils.py
cg505 Jan 14, 2025
6ab6b87
update log_lib comment
cg505 Jan 15, 2025
903896a
fix falsy ==0 check
cg505 Jan 15, 2025
afd24c6
turn the scheduled launch into a context
cg505 Jan 15, 2025
5f89500
clarify scheduler.py docstring
cg505 Jan 15, 2025
e223e0c
fix lint/tests
cg505 Jan 15, 2025
79737be
avoid redundant expanduser
cg505 Jan 15, 2025
a7de7d8
clarify controller vs controller process
cg505 Jan 15, 2025
de8d832
clarify comment
cg505 Jan 15, 2025
dab5bf1
address PR comment
cg505 Jan 15, 2025
6ced431
add comments
cg505 Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 34 additions & 27 deletions sky/backends/cloud_vm_ray_backend.py
KeplerC marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from sky.clouds.utils import gcp_utils
from sky.data import data_utils
from sky.data import storage as storage_lib
from sky.jobs import constants as managed_jobs_constants
from sky.provision import common as provision_common
from sky.provision import instance_setup
from sky.provision import metadata_utils
Expand Down Expand Up @@ -3910,40 +3911,45 @@ def sync_down_managed_job_logs(
Returns:
A dictionary mapping job_id to log path.
"""
# if job_name is not None, job_id should be None
# if job_name and job_id should not both be specified
assert job_name is None or job_id is None, (job_name, job_id)
if job_id is None and job_name is not None:

if job_id is None:
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
# generate code to get the job_id
# if job_name is None, get all job_ids
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
# TODO: Only get the latest job_id, since that's the only one we use
code = managed_jobs.ManagedJobCodeGen.get_all_job_ids_by_name(
job_name=job_name)
returncode, run_timestamps, stderr = self.run_on_head(
handle,
code,
stream_logs=False,
require_outputs=True,
separate_stderr=True)
returncode, job_ids, stderr = self.run_on_head(handle,
code,
stream_logs=False,
require_outputs=True,
separate_stderr=True)
subprocess_utils.handle_returncode(returncode, code,
'Failed to sync down logs.',
stderr)
job_ids = common_utils.decode_payload(run_timestamps)
job_ids = common_utils.decode_payload(job_ids)
if not job_ids:
logger.info(f'{colorama.Fore.YELLOW}'
'No matching job found'
f'{colorama.Style.RESET_ALL}')
return {}
elif len(job_ids) > 1:
logger.info(
f'{colorama.Fore.YELLOW}'
f'Multiple jobs IDs found under the name {job_name}. '
'Downloading the latest job logs.'
f'{colorama.Style.RESET_ALL}')
job_ids = [job_ids[0]] # descending order
else:
job_ids = [job_id]
name_str = ''
if job_name is not None:
name_str = ('Multiple jobs IDs found under the name '
f'{job_name}. ')
logger.info(f'{colorama.Fore.YELLOW}'
f'{name_str}'
'Downloading the latest job logs.'
f'{colorama.Style.RESET_ALL}')
# list should aready be in descending order
job_id = job_ids[0]

# get the run_timestamp
# the function takes in [job_id]
code = job_lib.JobLibCodeGen.get_run_timestamp_with_globbing(job_ids)
code = job_lib.JobLibCodeGen.get_run_timestamp_with_globbing(
[str(job_id)])
returncode, run_timestamps, stderr = self.run_on_head(
handle,
code,
Expand All @@ -3964,13 +3970,14 @@ def sync_down_managed_job_logs(
job_id = list(run_timestamps.keys())[0]
local_log_dir = ''
if controller: # download controller logs
remote_log_dir = os.path.join(constants.SKY_LOGS_DIRECTORY,
run_timestamp)
remote_log = os.path.join(
managed_jobs_constants.JOBS_CONTROLLER_LOGS_DIR,
f'{job_id}.log')
local_log_dir = os.path.expanduser(
os.path.join(local_dir, run_timestamp))

logger.info(f'{colorama.Fore.CYAN}'
f'Job {job_ids} local logs: {local_log_dir}'
f'Job {job_id} local logs: {local_log_dir}'
f'{colorama.Style.RESET_ALL}')

runners = handle.get_command_runners()
Expand All @@ -3981,12 +3988,12 @@ def _rsync_down(args) -> None:
Args:
args: A tuple of (runner, local_log_dir, remote_log_dir)
"""
(runner, local_log_dir, remote_log_dir) = args
(runner, local_log_dir, remote_log) = args
try:
os.makedirs(local_log_dir, exist_ok=True)
runner.rsync(
source=f'{remote_log_dir}/',
target=local_log_dir,
source=remote_log,
target=f'{local_log_dir}/controller.log',
up=False,
stream_logs=False,
)
Expand All @@ -3999,9 +4006,9 @@ def _rsync_down(args) -> None:
else:
raise

parallel_args = [[runner, *item]
for item in zip([local_log_dir], [remote_log_dir])
for runner in runners]
parallel_args = [
(runner, local_log_dir, remote_log) for runner in runners
]
subprocess_utils.run_in_parallel(_rsync_down, parallel_args)
else: # download job logs
local_log_dir = os.path.expanduser(
Expand Down
31 changes: 1 addition & 30 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3588,18 +3588,6 @@ def jobs():
is_flag=True,
help=('If True, as soon as a job is submitted, return from this call '
'and do not stream execution logs.'))
@click.option(
'--retry-until-up/--no-retry-until-up',
'-r/-no-r',
default=None,
is_flag=True,
required=False,
help=(
'(Default: True; this flag is deprecated and will be removed in a '
'future release.) Whether to retry provisioning infinitely until the '
'cluster is up, if unavailability errors are encountered. This ' # pylint: disable=bad-docstring-quotes
'applies to launching all managed jobs (both the initial and '
'any recovery attempts), not the jobs controller.'))
@click.option('--yes',
'-y',
is_flag=True,
Expand Down Expand Up @@ -3636,7 +3624,6 @@ def jobs_launch(
disk_tier: Optional[str],
ports: Tuple[str],
detach_run: bool,
retry_until_up: Optional[bool],
yes: bool,
fast: bool,
):
Expand Down Expand Up @@ -3680,19 +3667,6 @@ def jobs_launch(
ports=ports,
job_recovery=job_recovery,
)
# Deprecation. We set the default behavior to be retry until up, and the
# flag `--retry-until-up` is deprecated. We can remove the flag in 0.8.0.
if retry_until_up is not None:
flag_str = '--retry-until-up'
if not retry_until_up:
flag_str = '--no-retry-until-up'
click.secho(
f'Flag {flag_str} is deprecated and will be removed in a '
'future release (managed jobs will always be retried). '
'Please file an issue if this does not work for you.',
fg='yellow')
else:
retry_until_up = True

# Deprecation. The default behavior is fast, and the flag will be removed.
# The flag was not present in 0.7.x (only nightly), so we will remove before
Expand Down Expand Up @@ -3742,10 +3716,7 @@ def jobs_launch(

common_utils.check_cluster_name_is_valid(name)

managed_jobs.launch(dag,
name,
detach_run=detach_run,
retry_until_up=retry_until_up)
managed_jobs.launch(dag, name, detach_run=detach_run)


@jobs.command('queue', cls=_DocumentedCodeCommand)
Expand Down
15 changes: 8 additions & 7 deletions sky/jobs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@

JOBS_CONTROLLER_TEMPLATE = 'jobs-controller.yaml.j2'
JOBS_CONTROLLER_YAML_PREFIX = '~/.sky/jobs_controller'
JOBS_CONTROLLER_LOGS_DIR = '~/sky_logs/jobs_controller'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in the place we use this, we need to make sure that we create the folder if not exists. : )

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


JOBS_TASK_YAML_PREFIX = '~/.sky/managed_jobs'

# Resources as a dict for the jobs controller.
# Use default CPU instance type for jobs controller with >= 24GB, i.e.
# m6i.2xlarge (8vCPUs, 32 GB) for AWS, Standard_D8s_v4 (8vCPUs, 32 GB)
# for Azure, and n1-standard-8 (8 vCPUs, 32 GB) for GCP, etc.
# Based on profiling, memory should be at least 3x (in GB) as num vCPUs to avoid
# OOM (each vCPU can have 4 jobs controller processes as we set the CPU
# requirement to 0.25, and 3 GB is barely enough for 4 job processes).
# Use smaller CPU instance type for jobs controller, but with more memory, i.e.
# r6i.xlarge (4vCPUs, 32 GB) for AWS, Standard_E4s_v5 (4vCPUs, 32 GB) for Azure,
# and n2-highmem-4 (4 vCPUs, 32 GB) for GCP, etc.
# Concurrently limits are set based on profiling. 4x num vCPUs is the launch
# parallelism limit, and memory / 350MB is the limit to concurrently running
# jobs. See _get_launch_parallelism and _get_job_parallelism in scheduler.py.
# We use 50 GB disk size to reduce the cost.
CONTROLLER_RESOURCES = {'cpus': '8+', 'memory': '3x', 'disk_size': 50}
CONTROLLER_RESOURCES = {'cpus': '4+', 'memory': '8x', 'disk_size': 50}

# Max length of the cluster name for GCP is 35, the user hash to be attached is
# 4+1 chars, and we assume the maximum length of the job id is 4+1, so the max
Expand Down
24 changes: 11 additions & 13 deletions sky/jobs/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from sky.backends import backend_utils
from sky.backends import cloud_vm_ray_backend
from sky.jobs import recovery_strategy
from sky.jobs import scheduler
from sky.jobs import state as managed_job_state
from sky.jobs import utils as managed_job_utils
from sky.skylet import constants
Expand Down Expand Up @@ -46,12 +47,10 @@ def _get_dag_and_name(dag_yaml: str) -> Tuple['sky.Dag', str]:
class JobsController:
"""Each jobs controller manages the life cycle of one managed job."""

def __init__(self, job_id: int, dag_yaml: str,
retry_until_up: bool) -> None:
def __init__(self, job_id: int, dag_yaml: str) -> None:
self._job_id = job_id
self._dag, self._dag_name = _get_dag_and_name(dag_yaml)
logger.info(self._dag)
self._retry_until_up = retry_until_up
# TODO(zhwu): this assumes the specific backend.
self._backend = cloud_vm_ray_backend.CloudVmRayBackend()

Expand Down Expand Up @@ -174,7 +173,7 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool:
cluster_name = managed_job_utils.generate_managed_job_cluster_name(
task.name, self._job_id)
self._strategy_executor = recovery_strategy.StrategyExecutor.make(
cluster_name, self._backend, task, self._retry_until_up)
cluster_name, self._backend, task, self._job_id)
managed_job_state.set_submitted(
self._job_id,
task_id,
Expand Down Expand Up @@ -202,6 +201,7 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool:
task_id=task_id,
start_time=remote_job_submitted_at,
callback_func=callback_func)

while True:
time.sleep(managed_job_utils.JOB_STATUS_CHECK_GAP_SECONDS)

Expand Down Expand Up @@ -428,11 +428,11 @@ def _update_failed_task_state(
task=self._dag.tasks[task_id]))


def _run_controller(job_id: int, dag_yaml: str, retry_until_up: bool):
def _run_controller(job_id: int, dag_yaml: str):
"""Runs the controller in a remote process for interruption."""
# The controller needs to be instantiated in the remote process, since
# the controller is not serializable.
jobs_controller = JobsController(job_id, dag_yaml, retry_until_up)
jobs_controller = JobsController(job_id, dag_yaml)
jobs_controller.run()


Expand Down Expand Up @@ -489,7 +489,7 @@ def _cleanup(job_id: int, dag_yaml: str):
backend.teardown_ephemeral_storage(task)


def start(job_id, dag_yaml, retry_until_up):
def start(job_id, dag_yaml):
"""Start the controller."""
controller_process = None
cancelling = False
Expand All @@ -502,8 +502,7 @@ def start(job_id, dag_yaml, retry_until_up):
# So we can only enable daemon after we no longer need to
# start daemon processes like Ray.
controller_process = multiprocessing.Process(target=_run_controller,
args=(job_id, dag_yaml,
retry_until_up))
args=(job_id, dag_yaml))
controller_process.start()
while controller_process.is_alive():
_handle_signal(job_id)
Expand Down Expand Up @@ -563,21 +562,20 @@ def start(job_id, dag_yaml, retry_until_up):
failure_reason=('Unexpected error occurred. For details, '
f'run: sky jobs logs --controller {job_id}'))

scheduler.job_done(job_id)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--job-id',
required=True,
type=int,
help='Job id for the controller job.')
parser.add_argument('--retry-until-up',
action='store_true',
help='Retry until the cluster is up.')
parser.add_argument('dag_yaml',
type=str,
help='The path to the user job yaml file.')
args = parser.parse_args()
# We start process with 'spawn', because 'fork' could result in weird
# behaviors; 'spawn' is also cross-platform.
multiprocessing.set_start_method('spawn', force=True)
start(args.job_id, args.dag_yaml, args.retry_until_up)
start(args.job_id, args.dag_yaml)
2 changes: 0 additions & 2 deletions sky/jobs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def launch(
name: Optional[str] = None,
stream_logs: bool = True,
detach_run: bool = False,
retry_until_up: bool = False,
# TODO(cooperc): remove fast arg before 0.8.0
fast: bool = True, # pylint: disable=unused-argument for compatibility
) -> None:
Expand Down Expand Up @@ -115,7 +114,6 @@ def launch(
'jobs_controller': controller_name,
# Note: actual cluster name will be <task.name>-<managed job ID>
'dag_name': dag.name,
'retry_until_up': retry_until_up,
'remote_user_config_path': remote_user_config_path,
'modified_catalogs':
service_catalog_common.get_modified_catalog_file_mounts(),
Expand Down
Loading
Loading