From 817300224bac22e5b563bdce0b68b10f1dfd0bcc Mon Sep 17 00:00:00 2001 From: zepingguo Date: Wed, 20 Nov 2024 16:44:48 +0800 Subject: [PATCH 01/12] commit for logging change --- sky/sky_logging.py | 53 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/sky/sky_logging.py b/sky/sky_logging.py index 75dc836a49e..5571218325b 100644 --- a/sky/sky_logging.py +++ b/sky/sky_logging.py @@ -2,11 +2,16 @@ import builtins import contextlib import logging +import os +import pathlib import sys import threading +from typing import Callable, Dict, List, Union import colorama +from sky.backends import backend_utils +from sky.skylet import constants from sky.utils import env_options from sky.utils import rich_utils @@ -96,6 +101,11 @@ def reload_logger(): def init_logger(name: str): + if name in _LOGGER_NAME_INITIALIZER_MAP and not\ + _LOGGER_NAME_INITIALIZER_MAP[name][1]: + # Initialize the logger if it is not initialized + # and configured in _LOGGER_NAME_INITIALIZER_MAP. + _LOGGER_NAME_INITIALIZER_MAP[name][0](name) # type: ignore return logging.getLogger(name) @@ -143,3 +153,46 @@ def is_silent(): # threads. _logging_config.is_silent = False return _logging_config.is_silent + + +def _initialize_tmp_file_logger(logger_name: str) -> None: + initialized = _LOGGER_NAME_INITIALIZER_MAP[logger_name][1] + if initialized: + return + + # Initialize the logger + run_timestamp = backend_utils.get_run_timestamp() + + # set up the logger to write to a tmp file + log_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp) + log_path = os.path.expanduser(os.path.join(log_dir, logger_name)) + os.makedirs(os.path.dirname(log_path), exist_ok=True) + log_abs_path = pathlib.Path(log_path).expanduser().absolute() + fh = logging.FileHandler(log_abs_path) + + logger = logging.getLogger(logger_name) + + for handler in logger.handlers: + print(handler) + + fh.setFormatter(FORMATTER) + fh.setLevel(logging.DEBUG) + logger.addHandler(fh) + + # Clear stream handler + for handler in logger.handlers: + if isinstance(handler, logging.StreamHandler): + logger.removeHandler(handler) + + _LOGGER_NAME_INITIALIZER_MAP[logger_name][1] = True + + +# A map from logger name to a tuple of (initializer, is_initialized). +# The initializer is a function that initializes the logger. +# The is_initialized is a boolean indicating if the logger is initialized. +_LOGGER_NAME_INITIALIZER_MAP: Dict[str, + List[Union[Callable[[str], None], bool]]] = { + 'sky.data.storage': [ + _initialize_tmp_file_logger, False + ] + } From b111333b27ef6416b3a6c2c5d178b82e46054da2 Mon Sep 17 00:00:00 2001 From: ZePing Guo Date: Wed, 20 Nov 2024 19:21:16 +0800 Subject: [PATCH 02/12] logger for storage --- sky/data/data_utils.py | 16 +++++++++- sky/data/storage.py | 11 +++++++ sky/sky_logging.py | 66 ++++++++++++++++++++++-------------------- 3 files changed, 61 insertions(+), 32 deletions(-) diff --git a/sky/data/data_utils.py b/sky/data/data_utils.py index 0c8fd64ddea..0b910882485 100644 --- a/sky/data/data_utils.py +++ b/sky/data/data_utils.py @@ -521,17 +521,31 @@ def get_gsutil_command() -> Tuple[str, str]: def run_upload_cli(command: str, access_denied_message: str, bucket_name: str): # TODO(zhwu): Use log_lib.run_with_log() and redirect the output # to a log file. + storage_logger = sky_logging.init_logger(sky_logging.STORAGE_LOGGER_NAME) + with subprocess.Popen(command, stderr=subprocess.PIPE, - stdout=subprocess.DEVNULL, + stdout=subprocess.PIPE, shell=True) as process: stderr = [] + stdout_line_write_cnt = 0 assert process.stderr is not None # for mypy + if process.stdout is not None: + for line in process.stdout: + str_line = line.decode('utf-8') + storage_logger.info(str_line) + stdout_line_write_cnt += 1 + + if stdout_line_write_cnt == 0: + storage_logger.info('No file upload, could be error' + 'happened or all files already exist on cloud') + while True: line = process.stderr.readline() if not line: break str_line = line.decode('utf-8') + storage_logger.error(str_line) stderr.append(str_line) if access_denied_message in str_line: process.kill() diff --git a/sky/data/storage.py b/sky/data/storage.py index 897f2f96b94..68b13f252ec 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -1001,7 +1001,18 @@ def warn_for_git_dir(source: str): else: for source in self.source: warn_for_git_dir(source) + # init logger on demand + sky_logging.init_logger(sky_logging.STORAGE_LOGGER_NAME) + log_path = sky_logging.get_logger_log_file_path( + sky_logging.STORAGE_LOGGER_NAME) + store_name = ( + f'{store.__class__.__name__.replace("Store", "").lower()}:' + f'{store.name}') store.upload() + logger.info( + ux_utils.finishing_message( + f'Storage synced: {self.source!r} to {store_name}', + log_path)) except exceptions.StorageUploadError: logger.error(f'Could not upload {self.source!r} to store ' f'name {store.name!r}.') diff --git a/sky/sky_logging.py b/sky/sky_logging.py index 5571218325b..c5fe20b05d6 100644 --- a/sky/sky_logging.py +++ b/sky/sky_logging.py @@ -1,16 +1,16 @@ """Logging utilities.""" import builtins import contextlib +from datetime import datetime import logging import os import pathlib import sys import threading -from typing import Callable, Dict, List, Union +from typing import Callable, Dict, Optional, Tuple import colorama -from sky.backends import backend_utils from sky.skylet import constants from sky.utils import env_options from sky.utils import rich_utils @@ -20,6 +20,10 @@ not env_options.Options.MINIMIZE_LOGGING.get()) _FORMAT = '%(levelname).1s %(asctime)s %(filename)s:%(lineno)d] %(message)s' _DATE_FORMAT = '%m-%d %H:%M:%S' +_logging_init_lock = threading.Lock() +# Can not be 'sky.data.storage' because it inherits +# from the root logger which configured the stream handler +STORAGE_LOGGER_NAME = 'data.storage' class NewLineFormatter(logging.Formatter): @@ -100,15 +104,25 @@ def reload_logger(): _setup_logger() -def init_logger(name: str): - if name in _LOGGER_NAME_INITIALIZER_MAP and not\ - _LOGGER_NAME_INITIALIZER_MAP[name][1]: - # Initialize the logger if it is not initialized - # and configured in _LOGGER_NAME_INITIALIZER_MAP. - _LOGGER_NAME_INITIALIZER_MAP[name][0](name) # type: ignore +def init_logger(name: str) -> logging.Logger: + with _logging_init_lock: + if name in _LOGGER_NAME_INITIALIZER_MAP and not\ + _LOGGER_NAME_INITIALIZER_MAP[name][1]: + # Initialize the logger if it is not initialized + # and configured in _LOGGER_NAME_INITIALIZER_MAP. + log_file_path = _LOGGER_NAME_INITIALIZER_MAP[name][0](name) + _LOGGER_NAME_INITIALIZER_MAP[name] = ( + _LOGGER_NAME_INITIALIZER_MAP[name][0], True, log_file_path) return logging.getLogger(name) +def get_logger_log_file_path(name: str) -> Optional[str]: + if name in _LOGGER_NAME_INITIALIZER_MAP and _LOGGER_NAME_INITIALIZER_MAP[ + name][1] and _LOGGER_NAME_INITIALIZER_MAP[name][2]: + return _LOGGER_NAME_INITIALIZER_MAP[name][2] + return None + + @contextlib.contextmanager def set_logging_level(logger: str, level: int): logger = logging.getLogger(logger) @@ -155,44 +169,34 @@ def is_silent(): return _logging_config.is_silent -def _initialize_tmp_file_logger(logger_name: str) -> None: - initialized = _LOGGER_NAME_INITIALIZER_MAP[logger_name][1] - if initialized: - return - - # Initialize the logger - run_timestamp = backend_utils.get_run_timestamp() +def _initialize_tmp_file_logger(logger_name: str) -> str: + """Initialize the logger to write to a tmp file.""" + run_timestamp = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f') # set up the logger to write to a tmp file log_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp) - log_path = os.path.expanduser(os.path.join(log_dir, logger_name)) + log_path = os.path.expanduser( + os.path.join(log_dir, + logger_name.replace('.', '_') + '.log')) os.makedirs(os.path.dirname(log_path), exist_ok=True) log_abs_path = pathlib.Path(log_path).expanduser().absolute() fh = logging.FileHandler(log_abs_path) logger = logging.getLogger(logger_name) - for handler in logger.handlers: - print(handler) - fh.setFormatter(FORMATTER) - fh.setLevel(logging.DEBUG) + fh.setLevel(_root_logger.level) logger.addHandler(fh) - # Clear stream handler - for handler in logger.handlers: - if isinstance(handler, logging.StreamHandler): - logger.removeHandler(handler) - - _LOGGER_NAME_INITIALIZER_MAP[logger_name][1] = True + return log_path -# A map from logger name to a tuple of (initializer, is_initialized). +# A map from logger name to a tuple of (initializer, is_initialized, log_path). # The initializer is a function that initializes the logger. # The is_initialized is a boolean indicating if the logger is initialized. _LOGGER_NAME_INITIALIZER_MAP: Dict[str, - List[Union[Callable[[str], None], bool]]] = { - 'sky.data.storage': [ - _initialize_tmp_file_logger, False - ] + Tuple[Callable[[str], str], bool, str]] = { + STORAGE_LOGGER_NAME: + (_initialize_tmp_file_logger, False, + '') } From d8eababb454acb6f2dcb577a0fdd0abcc1a476ae Mon Sep 17 00:00:00 2001 From: ZePing Guo Date: Wed, 20 Nov 2024 19:21:39 +0800 Subject: [PATCH 03/12] grammar --- sky/data/data_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/data/data_utils.py b/sky/data/data_utils.py index 0b910882485..0bfc43959b9 100644 --- a/sky/data/data_utils.py +++ b/sky/data/data_utils.py @@ -537,7 +537,7 @@ def run_upload_cli(command: str, access_denied_message: str, bucket_name: str): stdout_line_write_cnt += 1 if stdout_line_write_cnt == 0: - storage_logger.info('No file upload, could be error' + storage_logger.info('No file uploaded, could be error' 'happened or all files already exist on cloud') while True: From 90a7662f9626ec17b0e1c475ba1470e92e4863d3 Mon Sep 17 00:00:00 2001 From: ZePing Guo Date: Wed, 20 Nov 2024 19:35:57 +0800 Subject: [PATCH 04/12] fix format --- sky/data/storage.py | 15 +++++++++++---- sky/sky_logging.py | 2 ++ 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/sky/data/storage.py b/sky/data/storage.py index 68b13f252ec..ddbfb6f9ce9 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -1002,17 +1002,24 @@ def warn_for_git_dir(source: str): for source in self.source: warn_for_git_dir(source) # init logger on demand - sky_logging.init_logger(sky_logging.STORAGE_LOGGER_NAME) + storage_logger = sky_logging.init_logger( + sky_logging.STORAGE_LOGGER_NAME) log_path = sky_logging.get_logger_log_file_path( sky_logging.STORAGE_LOGGER_NAME) store_name = ( f'{store.__class__.__name__.replace("Store", "").lower()}:' f'{store.name}') + sync_message = f'{self.source!r} to {store_name}' + # file log of start sync + storage_logger.info(f'Starting Storage sync: {sync_message}') store.upload() + # console log logger.info( - ux_utils.finishing_message( - f'Storage synced: {self.source!r} to {store_name}', - log_path)) + ux_utils.finishing_message(f'Storage synced: {sync_message}', + log_path)) + # file log of end sync + storage_logger.info( + f'Storage synced: {sync_message} {os.linesep * 2}') except exceptions.StorageUploadError: logger.error(f'Could not upload {self.source!r} to store ' f'name {store.name!r}.') diff --git a/sky/sky_logging.py b/sky/sky_logging.py index c5fe20b05d6..103e2e256f9 100644 --- a/sky/sky_logging.py +++ b/sky/sky_logging.py @@ -187,6 +187,8 @@ def _initialize_tmp_file_logger(logger_name: str) -> str: fh.setFormatter(FORMATTER) fh.setLevel(_root_logger.level) logger.addHandler(fh) + # Disable propagate to avoid streaming logs to the console + logger.propagate = False return log_path From f6ea96d8714f17b489a334511dd527c890231185 Mon Sep 17 00:00:00 2001 From: ZePing Guo Date: Wed, 20 Nov 2024 21:50:37 +0800 Subject: [PATCH 05/12] better comment --- sky/sky_logging.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sky/sky_logging.py b/sky/sky_logging.py index 103e2e256f9..4eb9dfa18e1 100644 --- a/sky/sky_logging.py +++ b/sky/sky_logging.py @@ -23,6 +23,7 @@ _logging_init_lock = threading.Lock() # Can not be 'sky.data.storage' because it inherits # from the root logger which configured the stream handler +# we only need file logging don't need to print to console STORAGE_LOGGER_NAME = 'data.storage' From 24e84d7c32c4a6442c96384ac692fc06a5bbb660 Mon Sep 17 00:00:00 2001 From: ZePing Guo Date: Thu, 21 Nov 2024 11:52:43 +0800 Subject: [PATCH 06/12] resolve copilot review --- sky/data/data_utils.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sky/data/data_utils.py b/sky/data/data_utils.py index 0bfc43959b9..bd8d53e80dd 100644 --- a/sky/data/data_utils.py +++ b/sky/data/data_utils.py @@ -528,17 +528,18 @@ def run_upload_cli(command: str, access_denied_message: str, bucket_name: str): stdout=subprocess.PIPE, shell=True) as process: stderr = [] - stdout_line_write_cnt = 0 + stdout_line_count = 0 assert process.stderr is not None # for mypy if process.stdout is not None: for line in process.stdout: str_line = line.decode('utf-8') storage_logger.info(str_line) - stdout_line_write_cnt += 1 + stdout_line_count += 1 - if stdout_line_write_cnt == 0: - storage_logger.info('No file uploaded, could be error' - 'happened or all files already exist on cloud') + if stdout_line_count == 0: + storage_logger.info( + 'No file uploaded. This could be due to an error or ' + 'because all files already exist on the cloud.') while True: line = process.stderr.readline() From 067586d37bafbe0a9edf89ca23b79345ee0372ad Mon Sep 17 00:00:00 2001 From: ZePing Guo Date: Tue, 26 Nov 2024 17:00:37 +0800 Subject: [PATCH 07/12] resolve PR comment --- sky/data/data_utils.py | 72 ++++++++++++---------------------- sky/data/storage.py | 89 +++++++++++++++++++++++++++--------------- sky/sky_logging.py | 50 ++---------------------- 3 files changed, 85 insertions(+), 126 deletions(-) diff --git a/sky/data/data_utils.py b/sky/data/data_utils.py index bd8d53e80dd..b32659db48d 100644 --- a/sky/data/data_utils.py +++ b/sky/data/data_utils.py @@ -20,6 +20,7 @@ from sky.adaptors import cloudflare from sky.adaptors import gcp from sky.adaptors import ibm +from sky.skylet import log_lib from sky.utils import common_utils from sky.utils import ux_utils @@ -430,6 +431,7 @@ def _group_files_by_dir( def parallel_upload(source_path_list: List[str], filesync_command_generator: Callable[[str, List[str]], str], dirsync_command_generator: Callable[[str, str], str], + log_path: str, bucket_name: str, access_denied_message: str, create_dirs: bool = False, @@ -445,6 +447,7 @@ def parallel_upload(source_path_list: List[str], for a list of files belonging to the same dir. dirsync_command_generator: Callable that generates rsync command for a directory. + log_path: Path to the log file. access_denied_message: Message to intercept from the underlying upload utility when permissions are insufficient. Used in exception handling. @@ -477,7 +480,7 @@ def parallel_upload(source_path_list: List[str], p.starmap( run_upload_cli, zip(commands, [access_denied_message] * len(commands), - [bucket_name] * len(commands))) + [bucket_name] * len(commands), [log_path] * len(commands))) def get_gsutil_command() -> Tuple[str, str]: @@ -518,52 +521,27 @@ def get_gsutil_command() -> Tuple[str, str]: return gsutil_alias, alias_gen -def run_upload_cli(command: str, access_denied_message: str, bucket_name: str): - # TODO(zhwu): Use log_lib.run_with_log() and redirect the output - # to a log file. - storage_logger = sky_logging.init_logger(sky_logging.STORAGE_LOGGER_NAME) - - with subprocess.Popen(command, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, - shell=True) as process: - stderr = [] - stdout_line_count = 0 - assert process.stderr is not None # for mypy - if process.stdout is not None: - for line in process.stdout: - str_line = line.decode('utf-8') - storage_logger.info(str_line) - stdout_line_count += 1 - - if stdout_line_count == 0: - storage_logger.info( - 'No file uploaded. This could be due to an error or ' - 'because all files already exist on the cloud.') - - while True: - line = process.stderr.readline() - if not line: - break - str_line = line.decode('utf-8') - storage_logger.error(str_line) - stderr.append(str_line) - if access_denied_message in str_line: - process.kill() - with ux_utils.print_exception_no_traceback(): - raise PermissionError( - 'Failed to upload files to ' - 'the remote bucket. The bucket does not have ' - 'write permissions. It is possible that ' - 'the bucket is public.') - returncode = process.wait() - if returncode != 0: - stderr_str = '\n'.join(stderr) - with ux_utils.print_exception_no_traceback(): - logger.error(stderr_str) - raise exceptions.StorageUploadError( - f'Upload to bucket failed for store {bucket_name}. ' - 'Please check the logs.') +def run_upload_cli(command: str, access_denied_message: str, bucket_name: str, + log_path: str): + returncode, stdout, stderr = log_lib.run_with_log(command, + log_path, + shell=True, + require_outputs=True) + if access_denied_message in stderr: + with ux_utils.print_exception_no_traceback(): + raise PermissionError('Failed to upload files to ' + 'the remote bucket. The bucket does not have ' + 'write permissions. It is possible that ' + 'the bucket is public.') + if returncode != 0: + with ux_utils.print_exception_no_traceback(): + logger.error(stderr) + raise exceptions.StorageUploadError( + f'Upload to bucket failed for store {bucket_name}. ' + 'Please check the logs.') + if not stdout: + logger.debug('No file uploaded. This could be due to an error or ' + 'because all files already exist on the cloud.') def get_cos_regions() -> List[str]: diff --git a/sky/data/storage.py b/sky/data/storage.py index ddbfb6f9ce9..66cd7835961 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -72,6 +72,8 @@ 'Bucket {bucket_name!r} does not exist. ' 'It may have been deleted externally.') +_STORAGE_LOG_FILE_NAME = 'storage_sync.log' + def get_cached_enabled_storage_clouds_or_refresh( raise_if_no_cloud_access: bool = False) -> List[str]: @@ -1001,25 +1003,7 @@ def warn_for_git_dir(source: str): else: for source in self.source: warn_for_git_dir(source) - # init logger on demand - storage_logger = sky_logging.init_logger( - sky_logging.STORAGE_LOGGER_NAME) - log_path = sky_logging.get_logger_log_file_path( - sky_logging.STORAGE_LOGGER_NAME) - store_name = ( - f'{store.__class__.__name__.replace("Store", "").lower()}:' - f'{store.name}') - sync_message = f'{self.source!r} to {store_name}' - # file log of start sync - storage_logger.info(f'Starting Storage sync: {sync_message}') store.upload() - # console log - logger.info( - ux_utils.finishing_message(f'Storage synced: {sync_message}', - log_path)) - # file log of end sync - storage_logger.info( - f'Storage synced: {sync_message} {os.linesep * 2}') except exceptions.StorageUploadError: logger.error(f'Could not upload {self.source!r} to store ' f'name {store.name!r}.') @@ -1349,17 +1333,24 @@ def get_dir_sync_command(src_dir_path, dest_dir_name): else: source_message = source_path_list[0] + log_path = sky_logging.generate_tmp_logging_file_path( + _STORAGE_LOG_FILE_NAME) + sync_path = f'{source_message} -> s3://{self.name}/' with rich_utils.safe_status( - ux_utils.spinner_message(f'Syncing {source_message} -> ' - f's3://{self.name}/')): + ux_utils.spinner_message(f'Syncing {sync_path}', + log_path=log_path)): data_utils.parallel_upload( source_path_list, get_file_sync_command, get_dir_sync_command, + log_path, self.name, self._ACCESS_DENIED_MESSAGE, create_dirs=create_dirs, max_concurrent_uploads=_MAX_CONCURRENT_UPLOADS) + logger.info( + ux_utils.finishing_message(f'Storage synced: {sync_path}', + log_path)) def _transfer_to_s3(self) -> None: assert isinstance(self.source, str), self.source @@ -1759,13 +1750,19 @@ def batch_gsutil_cp(self, gsutil_alias, alias_gen = data_utils.get_gsutil_command() sync_command = (f'{alias_gen}; echo "{copy_list}" | {gsutil_alias} ' f'cp -e -n -r -I gs://{self.name}') - + log_path = sky_logging.generate_tmp_logging_file_path( + _STORAGE_LOG_FILE_NAME) + sync_path = f'{source_message} -> gs://{self.name}/' with rich_utils.safe_status( - ux_utils.spinner_message(f'Syncing {source_message} -> ' - f'gs://{self.name}/')): + ux_utils.spinner_message(f'Syncing {sync_path}', + log_path=log_path)): data_utils.run_upload_cli(sync_command, self._ACCESS_DENIED_MESSAGE, - bucket_name=self.name) + bucket_name=self.name, + log_path=log_path) + logger.info( + ux_utils.finishing_message(f'Storage synced: {sync_path}', + log_path)) def batch_gsutil_rsync(self, source_path_list: List[Path], @@ -1815,17 +1812,24 @@ def get_dir_sync_command(src_dir_path, dest_dir_name): else: source_message = source_path_list[0] + log_path = sky_logging.generate_tmp_logging_file_path( + _STORAGE_LOG_FILE_NAME) + sync_path = f'{source_message} -> gs://{self.name}/' with rich_utils.safe_status( - ux_utils.spinner_message(f'Syncing {source_message} -> ' - f'gs://{self.name}/')): + ux_utils.spinner_message(f'Syncing {sync_path}', + log_path=log_path)): data_utils.parallel_upload( source_path_list, get_file_sync_command, get_dir_sync_command, + log_path, self.name, self._ACCESS_DENIED_MESSAGE, create_dirs=create_dirs, max_concurrent_uploads=_MAX_CONCURRENT_UPLOADS) + logger.info( + ux_utils.finishing_message(f'Storage synced: {sync_path}', + log_path)) def _transfer_to_gcs(self) -> None: if isinstance(self.source, str) and self.source.startswith('s3://'): @@ -2553,17 +2557,24 @@ def get_dir_sync_command(src_dir_path, dest_dir_name) -> str: container_endpoint = data_utils.AZURE_CONTAINER_URL.format( storage_account_name=self.storage_account_name, container_name=self.name) + log_path = sky_logging.generate_tmp_logging_file_path( + _STORAGE_LOG_FILE_NAME) + sync_path = f'{source_message} -> {container_endpoint}/' with rich_utils.safe_status( - ux_utils.spinner_message( - f'Syncing {source_message} -> {container_endpoint}/')): + ux_utils.spinner_message(f'Syncing {sync_path}', + log_path=log_path)): data_utils.parallel_upload( source_path_list, get_file_sync_command, get_dir_sync_command, + log_path, self.name, self._ACCESS_DENIED_MESSAGE, create_dirs=create_dirs, max_concurrent_uploads=_MAX_CONCURRENT_UPLOADS) + logger.info( + ux_utils.finishing_message(f'Storage synced: {sync_path}', + log_path)) def _get_bucket(self) -> Tuple[str, bool]: """Obtains the AZ Container. @@ -2956,17 +2967,24 @@ def get_dir_sync_command(src_dir_path, dest_dir_name): else: source_message = source_path_list[0] + log_path = sky_logging.generate_tmp_logging_file_path( + _STORAGE_LOG_FILE_NAME) + sync_path = f'{source_message} -> r2://{self.name}/' with rich_utils.safe_status( - ux_utils.spinner_message( - f'Syncing {source_message} -> r2://{self.name}/')): + ux_utils.spinner_message(f'Syncing {sync_path}', + log_path=log_path)): data_utils.parallel_upload( source_path_list, get_file_sync_command, get_dir_sync_command, + log_path, self.name, self._ACCESS_DENIED_MESSAGE, create_dirs=create_dirs, max_concurrent_uploads=_MAX_CONCURRENT_UPLOADS) + logger.info( + ux_utils.finishing_message(f'Storage synced: {sync_path}', + log_path)) def _transfer_to_r2(self) -> None: assert isinstance(self.source, str), self.source @@ -3397,17 +3415,24 @@ def get_file_sync_command(base_dir_path, file_names) -> str: else: source_message = source_path_list[0] + log_path = sky_logging.generate_tmp_logging_file_path( + _STORAGE_LOG_FILE_NAME) + sync_path = f'{source_message} -> cos://{self.region}/{self.name}/' with rich_utils.safe_status( - ux_utils.spinner_message(f'Syncing {source_message} -> ' - f'cos://{self.region}/{self.name}/')): + ux_utils.spinner_message(f'Syncing {sync_path}', + log_path=log_path)): data_utils.parallel_upload( source_path_list, get_file_sync_command, get_dir_sync_command, + log_path, self.name, self._ACCESS_DENIED_MESSAGE, create_dirs=create_dirs, max_concurrent_uploads=_MAX_CONCURRENT_UPLOADS) + logger.info( + ux_utils.finishing_message(f'Storage synced: {sync_path}', + log_path)) def _get_bucket(self) -> Tuple[StorageHandle, bool]: """returns IBM COS bucket object if exists, otherwise creates it. diff --git a/sky/sky_logging.py b/sky/sky_logging.py index 4eb9dfa18e1..ed7de4a10be 100644 --- a/sky/sky_logging.py +++ b/sky/sky_logging.py @@ -4,10 +4,8 @@ from datetime import datetime import logging import os -import pathlib import sys import threading -from typing import Callable, Dict, Optional, Tuple import colorama @@ -20,7 +18,6 @@ not env_options.Options.MINIMIZE_LOGGING.get()) _FORMAT = '%(levelname).1s %(asctime)s %(filename)s:%(lineno)d] %(message)s' _DATE_FORMAT = '%m-%d %H:%M:%S' -_logging_init_lock = threading.Lock() # Can not be 'sky.data.storage' because it inherits # from the root logger which configured the stream handler # we only need file logging don't need to print to console @@ -106,24 +103,9 @@ def reload_logger(): def init_logger(name: str) -> logging.Logger: - with _logging_init_lock: - if name in _LOGGER_NAME_INITIALIZER_MAP and not\ - _LOGGER_NAME_INITIALIZER_MAP[name][1]: - # Initialize the logger if it is not initialized - # and configured in _LOGGER_NAME_INITIALIZER_MAP. - log_file_path = _LOGGER_NAME_INITIALIZER_MAP[name][0](name) - _LOGGER_NAME_INITIALIZER_MAP[name] = ( - _LOGGER_NAME_INITIALIZER_MAP[name][0], True, log_file_path) return logging.getLogger(name) -def get_logger_log_file_path(name: str) -> Optional[str]: - if name in _LOGGER_NAME_INITIALIZER_MAP and _LOGGER_NAME_INITIALIZER_MAP[ - name][1] and _LOGGER_NAME_INITIALIZER_MAP[name][2]: - return _LOGGER_NAME_INITIALIZER_MAP[name][2] - return None - - @contextlib.contextmanager def set_logging_level(logger: str, level: int): logger = logging.getLogger(logger) @@ -170,36 +152,10 @@ def is_silent(): return _logging_config.is_silent -def _initialize_tmp_file_logger(logger_name: str) -> str: - """Initialize the logger to write to a tmp file.""" +def generate_tmp_logging_file_path(file_name: str) -> str: + """Generate an absolute path of a tmp file for logging.""" run_timestamp = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f') - - # set up the logger to write to a tmp file log_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp) - log_path = os.path.expanduser( - os.path.join(log_dir, - logger_name.replace('.', '_') + '.log')) - os.makedirs(os.path.dirname(log_path), exist_ok=True) - log_abs_path = pathlib.Path(log_path).expanduser().absolute() - fh = logging.FileHandler(log_abs_path) - - logger = logging.getLogger(logger_name) - - fh.setFormatter(FORMATTER) - fh.setLevel(_root_logger.level) - logger.addHandler(fh) - # Disable propagate to avoid streaming logs to the console - logger.propagate = False + log_path = os.path.expanduser(os.path.join(log_dir, file_name)) return log_path - - -# A map from logger name to a tuple of (initializer, is_initialized, log_path). -# The initializer is a function that initializes the logger. -# The is_initialized is a boolean indicating if the logger is initialized. -_LOGGER_NAME_INITIALIZER_MAP: Dict[str, - Tuple[Callable[[str], str], bool, str]] = { - STORAGE_LOGGER_NAME: - (_initialize_tmp_file_logger, False, - '') - } From df59890cdf6d6f211c935c7f42c6342d6e61a847 Mon Sep 17 00:00:00 2001 From: ZePing Guo Date: Tue, 26 Nov 2024 17:04:59 +0800 Subject: [PATCH 08/12] remove unuse var --- sky/sky_logging.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sky/sky_logging.py b/sky/sky_logging.py index ed7de4a10be..6d76b329949 100644 --- a/sky/sky_logging.py +++ b/sky/sky_logging.py @@ -18,10 +18,6 @@ not env_options.Options.MINIMIZE_LOGGING.get()) _FORMAT = '%(levelname).1s %(asctime)s %(filename)s:%(lineno)d] %(message)s' _DATE_FORMAT = '%m-%d %H:%M:%S' -# Can not be 'sky.data.storage' because it inherits -# from the root logger which configured the stream handler -# we only need file logging don't need to print to console -STORAGE_LOGGER_NAME = 'data.storage' class NewLineFormatter(logging.Formatter): From 663da49dc0d0bf450ad09da4ec4cae5913a93b53 Mon Sep 17 00:00:00 2001 From: zpoint Date: Mon, 2 Dec 2024 23:04:15 +0800 Subject: [PATCH 09/12] Update sky/data/data_utils.py Co-authored-by: Romil Bhardwaj --- sky/data/data_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/data/data_utils.py b/sky/data/data_utils.py index b32659db48d..d66c79afeb0 100644 --- a/sky/data/data_utils.py +++ b/sky/data/data_utils.py @@ -538,7 +538,7 @@ def run_upload_cli(command: str, access_denied_message: str, bucket_name: str, logger.error(stderr) raise exceptions.StorageUploadError( f'Upload to bucket failed for store {bucket_name}. ' - 'Please check the logs.') + f'Please check the logs: {log_path}') if not stdout: logger.debug('No file uploaded. This could be due to an error or ' 'because all files already exist on the cloud.') From 5dc9a823f73d88dd6688e67b73039158675fba83 Mon Sep 17 00:00:00 2001 From: ZePing Guo Date: Mon, 2 Dec 2024 23:10:17 +0800 Subject: [PATCH 10/12] resolve PR comment --- sky/backends/backend_utils.py | 3 +-- sky/sky_logging.py | 6 +++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 23c2222762a..4ddda8f1f5c 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -966,8 +966,7 @@ def _add_auth_to_cluster_config(cloud: clouds.Cloud, cluster_config_file: str): common_utils.dump_yaml(cluster_config_file, config) -def get_run_timestamp() -> str: - return 'sky-' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f') +get_run_timestamp = sky_logging.get_run_timestamp def get_timestamp_from_run_timestamp(run_timestamp: str) -> float: diff --git a/sky/sky_logging.py b/sky/sky_logging.py index 6d76b329949..c18835bdbac 100644 --- a/sky/sky_logging.py +++ b/sky/sky_logging.py @@ -148,9 +148,13 @@ def is_silent(): return _logging_config.is_silent +def get_run_timestamp() -> str: + return 'sky-' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f') + + def generate_tmp_logging_file_path(file_name: str) -> str: """Generate an absolute path of a tmp file for logging.""" - run_timestamp = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f') + run_timestamp = get_run_timestamp() log_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp) log_path = os.path.expanduser(os.path.join(log_dir, file_name)) From 8bb913a3505e2c8ad63d01bbe98ca6b7911cc369 Mon Sep 17 00:00:00 2001 From: ZePing Guo Date: Mon, 2 Dec 2024 23:18:26 +0800 Subject: [PATCH 11/12] update comment for get_run_timestamp --- sky/backends/backend_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index c8abd84ccb4..3af71498e22 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -975,6 +975,7 @@ def _add_auth_to_cluster_config(cloud: clouds.Cloud, cluster_config_file: str): common_utils.dump_yaml(cluster_config_file, config) +# Reuse the sky_logging for consistency and avoid circular import. get_run_timestamp = sky_logging.get_run_timestamp From 57d1a65cf026c7f9e097bdc21ecd3f7c31469848 Mon Sep 17 00:00:00 2001 From: ZePing Guo Date: Mon, 23 Dec 2024 14:50:53 +0800 Subject: [PATCH 12/12] rename backend_util.get_run_timestamp to sky_logging.get_run_timestamp --- sky/backends/backend_utils.py | 4 ---- sky/backends/cloud_vm_ray_backend.py | 2 +- sky/benchmark/benchmark_utils.py | 2 +- sky/cli.py | 8 ++++---- 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index ed3e9d34b21..0333cf49602 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1019,10 +1019,6 @@ def _add_auth_to_cluster_config(cloud: clouds.Cloud, cluster_config_file: str): common_utils.dump_yaml(cluster_config_file, config) -# Reuse the sky_logging for consistency and avoid circular import. -get_run_timestamp = sky_logging.get_run_timestamp - - def get_timestamp_from_run_timestamp(run_timestamp: str) -> float: return datetime.strptime( run_timestamp.partition('-')[2], '%Y-%m-%d-%H-%M-%S-%f').timestamp() diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 26118ad2de2..9d94f469df3 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2599,7 +2599,7 @@ class CloudVmRayBackend(backends.Backend['CloudVmRayResourceHandle']): ResourceHandle = CloudVmRayResourceHandle # pylint: disable=invalid-name def __init__(self): - self.run_timestamp = backend_utils.get_run_timestamp() + self.run_timestamp = sky_logging.get_run_timestamp() # NOTE: do not expanduser() here, as this '~/...' path is used for # remote as well to be expanded on the remote side. self.log_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, diff --git a/sky/benchmark/benchmark_utils.py b/sky/benchmark/benchmark_utils.py index c9c17f00944..766b1fa9138 100644 --- a/sky/benchmark/benchmark_utils.py +++ b/sky/benchmark/benchmark_utils.py @@ -535,7 +535,7 @@ def launch_benchmark_clusters(benchmark: str, clusters: List[str], for yaml_fd, cluster in zip(yaml_fds, clusters)] # Save stdout/stderr from cluster launches. - run_timestamp = backend_utils.get_run_timestamp() + run_timestamp = sky_logging.get_run_timestamp() log_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp) log_dir = os.path.expanduser(log_dir) logger.info( diff --git a/sky/cli.py b/sky/cli.py index dca45e81164..5d4f07d535f 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -830,7 +830,7 @@ class _NaturalOrderGroup(click.Group): Reference: https://github.com/pallets/click/issues/513 """ - def list_commands(self, ctx): + def list_commands(self, ctx): # pylint: disable=unused-argument return self.commands.keys() @usage_lib.entrypoint('sky.cli', fallback=True) @@ -5286,7 +5286,7 @@ def _deploy_local_cluster(gpus: bool): run_command = shlex.split(run_command) # Setup logging paths - run_timestamp = backend_utils.get_run_timestamp() + run_timestamp = sky_logging.get_run_timestamp() log_path = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp, 'local_up.log') tail_cmd = 'tail -n100 -f ' + log_path @@ -5400,7 +5400,7 @@ def _deploy_remote_cluster(ip_file: str, ssh_user: str, ssh_key_path: str, deploy_command = shlex.split(deploy_command) # Setup logging paths - run_timestamp = backend_utils.get_run_timestamp() + run_timestamp = sky_logging.get_run_timestamp() log_path = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp, 'local_up.log') tail_cmd = 'tail -n100 -f ' + log_path @@ -5515,7 +5515,7 @@ def local_down(): run_command = shlex.split(down_script_path) # Setup logging paths - run_timestamp = backend_utils.get_run_timestamp() + run_timestamp = sky_logging.get_run_timestamp() log_path = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp, 'local_down.log') tail_cmd = 'tail -n100 -f ' + log_path