diff --git a/sky/cli.py b/sky/cli.py index 1faf0003ff9..66360d91847 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3525,11 +3525,11 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r if sum([len(names) > 0, all]) != 1: raise click.UsageError('Either --all or a name must be specified.') if all: - storages = sky.storage_ls() - if not storages: + # Use '*' to get all storages. + names = global_user_state.get_glob_storage_name(storage_name='*') + if not names: click.echo('No storage(s) to delete.') return - names = [s['name'] for s in storages] else: names = _get_glob_storages(names) if names: @@ -3543,7 +3543,13 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r abort=True, show_default=True) - subprocess_utils.run_in_parallel(sky.storage_delete, names) + def delete_storage(name: str) -> None: + try: + sky.storage_delete(name) + except Exception as e: # pylint: disable=broad-except + click.secho(f'Error deleting storage {name}: {e}', fg='red') + + subprocess_utils.run_in_parallel(delete_storage, names) @cli.group(cls=_NaturalOrderGroup) diff --git a/sky/core.py b/sky/core.py index 9f1288d7fb6..732d7202bc5 100644 --- a/sky/core.py +++ b/sky/core.py @@ -915,8 +915,11 @@ def storage_delete(name: str) -> None: handle = global_user_state.get_handle_from_storage_name(name) if handle is None: raise ValueError(f'Storage name {name!r} not found.') - else: - storage_object = data.Storage(name=handle.storage_name, - source=handle.source, - sync_on_reconstruction=False) - storage_object.delete() + + assert handle.storage_name == name, ( + f'In global_user_state, storage name {name!r} does not match ' + f'handle.storage_name {handle.storage_name!r}') + storage_object = data.Storage(name=handle.storage_name, + source=handle.source, + sync_on_reconstruction=False) + storage_object.delete() diff --git a/sky/data/storage.py b/sky/data/storage.py index 897f2f96b94..63544e1c353 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -950,18 +950,16 @@ def delete(self, store_type: Optional[StoreType] = None) -> None: if not self.stores: logger.info('No backing stores found. Deleting storage.') global_user_state.remove_storage(self.name) - if store_type: + if store_type is not None: store = self.stores[store_type] - is_sky_managed = store.is_sky_managed # We delete a store from the cloud if it's sky managed. Else just # remove handle and return - if is_sky_managed: + if store.is_sky_managed: self.handle.remove_store(store) store.delete() # Check remaining stores - if none is sky managed, remove # the storage from global_user_state. - delete = all( - s.is_sky_managed is False for s in self.stores.values()) + delete = all(not s.is_sky_managed for s in self.stores.values()) if delete: global_user_state.remove_storage(self.name) else: @@ -1491,6 +1489,9 @@ def _delete_s3_bucket(self, bucket_name: str) -> bool: Returns: bool; True if bucket was deleted, False if it was deleted externally. + + Raises: + StorageBucketDeleteError: If deleting the bucket fails. """ # Deleting objects is very slow programatically # (i.e. bucket.objects.all().delete() is slow). @@ -1934,6 +1935,11 @@ def _delete_gcs_bucket(self, bucket_name: str) -> bool: Returns: bool; True if bucket was deleted, False if it was deleted externally. + + Raises: + StorageBucketDeleteError: If deleting the bucket fails. + PermissionError: If the bucket is external and the user is not + allowed to delete it. """ with rich_utils.safe_status( @@ -3096,6 +3102,9 @@ def _delete_r2_bucket(self, bucket_name: str) -> bool: Returns: bool; True if bucket was deleted, False if it was deleted externally. + + Raises: + StorageBucketDeleteError: If deleting the bucket fails. """ # Deleting objects is very slow programatically # (i.e. bucket.objects.all().delete() is slow). @@ -3532,7 +3541,7 @@ def _create_cos_bucket(self, return self.bucket - def _delete_cos_bucket(self): + def _delete_cos_bucket(self) -> None: bucket = self.s3_resource.Bucket(self.name) try: bucket_versioning = self.s3_resource.BucketVersioning(self.name) diff --git a/sky/global_user_state.py b/sky/global_user_state.py index 2a5cbc7eb3f..2c1ecc33575 100644 --- a/sky/global_user_state.py +++ b/sky/global_user_state.py @@ -827,7 +827,7 @@ def get_storage_names_start_with(starts_with: str) -> List[str]: def get_storage() -> List[Dict[str, Any]]: - rows = _DB.cursor.execute('select * from storage') + rows = _DB.cursor.execute('SELECT * FROM storage') records = [] for name, launched_at, handle, last_use, status in rows: # TODO: use namedtuple instead of dict diff --git a/sky/utils/subprocess_utils.py b/sky/utils/subprocess_utils.py index 992c6bbe3ff..e38ed3ec87a 100644 --- a/sky/utils/subprocess_utils.py +++ b/sky/utils/subprocess_utils.py @@ -101,8 +101,6 @@ def run_in_parallel(func: Callable, num_threads: Optional[int] = None) -> List[Any]: """Run a function in parallel on a list of arguments. - The function 'func' should raise a CommandError if the command fails. - Args: func: The function to run in parallel args: Iterable of arguments to pass to func @@ -111,14 +109,16 @@ def run_in_parallel(func: Callable, Returns: A list of the return values of the function func, in the same order as the - arguments. + arguments. + + Raises: + Exception: The first exception encountered. """ - # Reference: https://stackoverflow.com/questions/25790279/python-multiprocessing-early-termination # pylint: disable=line-too-long - processes = num_threads if num_threads is not None else get_parallel_threads( - ) + processes = (num_threads + if num_threads is not None else get_parallel_threads()) with pool.ThreadPool(processes=processes) as p: - # Run the function in parallel on the arguments, keeping the order. - return list(p.imap(func, args)) + ordered_iterators = p.imap(func, args) + return list(ordered_iterators) def handle_returncode(returncode: int,