Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Show logs for storage mount #4387

Merged
57 changes: 25 additions & 32 deletions sky/data/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from sky.adaptors import cloudflare
from sky.adaptors import gcp
from sky.adaptors import ibm
from sky.skylet import log_lib
from sky.utils import common_utils
from sky.utils import ux_utils

Expand Down Expand Up @@ -430,6 +431,7 @@ def _group_files_by_dir(
def parallel_upload(source_path_list: List[str],
filesync_command_generator: Callable[[str, List[str]], str],
dirsync_command_generator: Callable[[str, str], str],
log_path: str,
bucket_name: str,
access_denied_message: str,
create_dirs: bool = False,
Expand All @@ -445,6 +447,7 @@ def parallel_upload(source_path_list: List[str],
for a list of files belonging to the same dir.
dirsync_command_generator: Callable that generates rsync command
for a directory.
log_path: Path to the log file.
access_denied_message: Message to intercept from the underlying
upload utility when permissions are insufficient. Used in
exception handling.
Expand Down Expand Up @@ -477,7 +480,7 @@ def parallel_upload(source_path_list: List[str],
p.starmap(
run_upload_cli,
zip(commands, [access_denied_message] * len(commands),
[bucket_name] * len(commands)))
[bucket_name] * len(commands), [log_path] * len(commands)))


def get_gsutil_command() -> Tuple[str, str]:
Expand Down Expand Up @@ -518,37 +521,27 @@ def get_gsutil_command() -> Tuple[str, str]:
return gsutil_alias, alias_gen


def run_upload_cli(command: str, access_denied_message: str, bucket_name: str):
# TODO(zhwu): Use log_lib.run_with_log() and redirect the output
# to a log file.
with subprocess.Popen(command,
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL,
shell=True) as process:
stderr = []
assert process.stderr is not None # for mypy
while True:
line = process.stderr.readline()
if not line:
break
str_line = line.decode('utf-8')
stderr.append(str_line)
if access_denied_message in str_line:
process.kill()
with ux_utils.print_exception_no_traceback():
raise PermissionError(
'Failed to upload files to '
'the remote bucket. The bucket does not have '
'write permissions. It is possible that '
'the bucket is public.')
returncode = process.wait()
if returncode != 0:
stderr_str = '\n'.join(stderr)
with ux_utils.print_exception_no_traceback():
logger.error(stderr_str)
raise exceptions.StorageUploadError(
f'Upload to bucket failed for store {bucket_name}. '
'Please check the logs.')
def run_upload_cli(command: str, access_denied_message: str, bucket_name: str,
log_path: str):
returncode, stdout, stderr = log_lib.run_with_log(command,
log_path,
shell=True,
require_outputs=True)
if access_denied_message in stderr:
with ux_utils.print_exception_no_traceback():
raise PermissionError('Failed to upload files to '
'the remote bucket. The bucket does not have '
'write permissions. It is possible that '
'the bucket is public.')
if returncode != 0:
with ux_utils.print_exception_no_traceback():
logger.error(stderr)
raise exceptions.StorageUploadError(
f'Upload to bucket failed for store {bucket_name}. '
'Please check the logs.')
zpoint marked this conversation as resolved.
Show resolved Hide resolved
if not stdout:
logger.debug('No file uploaded. This could be due to an error or '
'because all files already exist on the cloud.')


def get_cos_regions() -> List[str]:
Expand Down
71 changes: 57 additions & 14 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
'Bucket {bucket_name!r} does not exist. '
'It may have been deleted externally.')

_STORAGE_LOG_FILE_NAME = 'storage_sync.log'


def get_cached_enabled_storage_clouds_or_refresh(
raise_if_no_cloud_access: bool = False) -> List[str]:
Expand Down Expand Up @@ -1331,17 +1333,24 @@ def get_dir_sync_command(src_dir_path, dest_dir_name):
else:
source_message = source_path_list[0]

log_path = sky_logging.generate_tmp_logging_file_path(
_STORAGE_LOG_FILE_NAME)
sync_path = f'{source_message} -> s3://{self.name}/'
with rich_utils.safe_status(
ux_utils.spinner_message(f'Syncing {source_message} -> '
f's3://{self.name}/')):
ux_utils.spinner_message(f'Syncing {sync_path}',
log_path=log_path)):
data_utils.parallel_upload(
source_path_list,
get_file_sync_command,
get_dir_sync_command,
log_path,
self.name,
self._ACCESS_DENIED_MESSAGE,
create_dirs=create_dirs,
max_concurrent_uploads=_MAX_CONCURRENT_UPLOADS)
logger.info(
ux_utils.finishing_message(f'Storage synced: {sync_path}',
log_path))

def _transfer_to_s3(self) -> None:
assert isinstance(self.source, str), self.source
Expand Down Expand Up @@ -1741,13 +1750,19 @@ def batch_gsutil_cp(self,
gsutil_alias, alias_gen = data_utils.get_gsutil_command()
sync_command = (f'{alias_gen}; echo "{copy_list}" | {gsutil_alias} '
f'cp -e -n -r -I gs://{self.name}')

log_path = sky_logging.generate_tmp_logging_file_path(
_STORAGE_LOG_FILE_NAME)
sync_path = f'{source_message} -> gs://{self.name}/'
with rich_utils.safe_status(
ux_utils.spinner_message(f'Syncing {source_message} -> '
f'gs://{self.name}/')):
ux_utils.spinner_message(f'Syncing {sync_path}',
log_path=log_path)):
data_utils.run_upload_cli(sync_command,
self._ACCESS_DENIED_MESSAGE,
bucket_name=self.name)
bucket_name=self.name,
log_path=log_path)
logger.info(
ux_utils.finishing_message(f'Storage synced: {sync_path}',
log_path))

def batch_gsutil_rsync(self,
source_path_list: List[Path],
Expand Down Expand Up @@ -1797,17 +1812,24 @@ def get_dir_sync_command(src_dir_path, dest_dir_name):
else:
source_message = source_path_list[0]

log_path = sky_logging.generate_tmp_logging_file_path(
_STORAGE_LOG_FILE_NAME)
sync_path = f'{source_message} -> gs://{self.name}/'
with rich_utils.safe_status(
ux_utils.spinner_message(f'Syncing {source_message} -> '
f'gs://{self.name}/')):
ux_utils.spinner_message(f'Syncing {sync_path}',
log_path=log_path)):
data_utils.parallel_upload(
source_path_list,
get_file_sync_command,
get_dir_sync_command,
log_path,
self.name,
self._ACCESS_DENIED_MESSAGE,
create_dirs=create_dirs,
max_concurrent_uploads=_MAX_CONCURRENT_UPLOADS)
logger.info(
ux_utils.finishing_message(f'Storage synced: {sync_path}',
log_path))

def _transfer_to_gcs(self) -> None:
if isinstance(self.source, str) and self.source.startswith('s3://'):
Expand Down Expand Up @@ -2535,17 +2557,24 @@ def get_dir_sync_command(src_dir_path, dest_dir_name) -> str:
container_endpoint = data_utils.AZURE_CONTAINER_URL.format(
storage_account_name=self.storage_account_name,
container_name=self.name)
log_path = sky_logging.generate_tmp_logging_file_path(
_STORAGE_LOG_FILE_NAME)
sync_path = f'{source_message} -> {container_endpoint}/'
with rich_utils.safe_status(
ux_utils.spinner_message(
f'Syncing {source_message} -> {container_endpoint}/')):
ux_utils.spinner_message(f'Syncing {sync_path}',
log_path=log_path)):
data_utils.parallel_upload(
source_path_list,
get_file_sync_command,
get_dir_sync_command,
log_path,
self.name,
self._ACCESS_DENIED_MESSAGE,
create_dirs=create_dirs,
max_concurrent_uploads=_MAX_CONCURRENT_UPLOADS)
logger.info(
ux_utils.finishing_message(f'Storage synced: {sync_path}',
log_path))

def _get_bucket(self) -> Tuple[str, bool]:
"""Obtains the AZ Container.
Expand Down Expand Up @@ -2938,17 +2967,24 @@ def get_dir_sync_command(src_dir_path, dest_dir_name):
else:
source_message = source_path_list[0]

log_path = sky_logging.generate_tmp_logging_file_path(
_STORAGE_LOG_FILE_NAME)
sync_path = f'{source_message} -> r2://{self.name}/'
with rich_utils.safe_status(
ux_utils.spinner_message(
f'Syncing {source_message} -> r2://{self.name}/')):
ux_utils.spinner_message(f'Syncing {sync_path}',
log_path=log_path)):
data_utils.parallel_upload(
source_path_list,
get_file_sync_command,
get_dir_sync_command,
log_path,
self.name,
self._ACCESS_DENIED_MESSAGE,
create_dirs=create_dirs,
max_concurrent_uploads=_MAX_CONCURRENT_UPLOADS)
logger.info(
ux_utils.finishing_message(f'Storage synced: {sync_path}',
log_path))

def _transfer_to_r2(self) -> None:
assert isinstance(self.source, str), self.source
Expand Down Expand Up @@ -3379,17 +3415,24 @@ def get_file_sync_command(base_dir_path, file_names) -> str:
else:
source_message = source_path_list[0]

log_path = sky_logging.generate_tmp_logging_file_path(
_STORAGE_LOG_FILE_NAME)
sync_path = f'{source_message} -> cos://{self.region}/{self.name}/'
with rich_utils.safe_status(
ux_utils.spinner_message(f'Syncing {source_message} -> '
f'cos://{self.region}/{self.name}/')):
ux_utils.spinner_message(f'Syncing {sync_path}',
log_path=log_path)):
data_utils.parallel_upload(
source_path_list,
get_file_sync_command,
get_dir_sync_command,
log_path,
self.name,
self._ACCESS_DENIED_MESSAGE,
create_dirs=create_dirs,
max_concurrent_uploads=_MAX_CONCURRENT_UPLOADS)
logger.info(
ux_utils.finishing_message(f'Storage synced: {sync_path}',
log_path))

def _get_bucket(self) -> Tuple[StorageHandle, bool]:
"""returns IBM COS bucket object if exists, otherwise creates it.
Expand Down
14 changes: 13 additions & 1 deletion sky/sky_logging.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
"""Logging utilities."""
import builtins
import contextlib
from datetime import datetime
import logging
import os
import sys
import threading

import colorama

from sky.skylet import constants
from sky.utils import env_options
from sky.utils import rich_utils

Expand Down Expand Up @@ -95,7 +98,7 @@ def reload_logger():
_setup_logger()


def init_logger(name: str):
def init_logger(name: str) -> logging.Logger:
return logging.getLogger(name)


Expand Down Expand Up @@ -143,3 +146,12 @@ def is_silent():
# threads.
_logging_config.is_silent = False
return _logging_config.is_silent


def generate_tmp_logging_file_path(file_name: str) -> str:
"""Generate an absolute path of a tmp file for logging."""
run_timestamp = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use backend_utils.get_run_timestamp() to have paths consistent with provisioning logs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some modifications to avoid a circular import.

log_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp)
log_path = os.path.expanduser(os.path.join(log_dir, file_name))

return log_path