Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 fix templates publish/open with states as directories #4586

Merged
merged 11 commits into from
Aug 8, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def synchronise_meta_data_table(request: web.Request) -> web.Response:
get_dsm_provider(request.app).get(SimcoreS3DataManager.get_location_id()),
)
sync_results: list[StorageFileID] = []
sync_coro = dsm.synchronise_meta_data_table(query_params.dry_run)
sync_coro = dsm.synchronise_meta_data_table(dry_run=query_params.dry_run)

if query_params.fire_and_forget:
settings: Settings = request.app[APP_CONFIG_KEY]
Expand Down
33 changes: 21 additions & 12 deletions services/storage/src/simcore_service_storage/s3_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from contextlib import AsyncExitStack
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Final, TypeAlias, cast
from typing import AsyncGenerator, Callable, Final, TypeAlias, cast

import aioboto3
from aiobotocore.session import ClientCreatorContext
Expand Down Expand Up @@ -277,6 +277,21 @@ async def complete_multipart_upload(
async def delete_file(self, bucket: S3BucketName, file_id: SimcoreS3FileID) -> None:
await self.client.delete_object(Bucket=bucket, Key=file_id)

async def list_all_objects_gen(
self, bucket: S3BucketName, *, prefix: str, max_yield_result_size: int
) -> AsyncGenerator[list[ObjectTypeDef], None]:
while True:
s3_objects, next_continuation_token = await _list_objects_v2_paginated(
self.client,
bucket=bucket,
prefix=prefix,
max_total_items=max_yield_result_size,
)
yield s3_objects

if next_continuation_token is None:
break

@s3_exception_handler(_logger)
async def delete_files_in_path(self, bucket: S3BucketName, *, prefix: str) -> None:
"""Removes one or more files from a given S3 path.
Expand All @@ -287,23 +302,17 @@ async def delete_files_in_path(self, bucket: S3BucketName, *, prefix: str) -> No

# NOTE: deletion of objects is done in batches of max 1000 elements,
# the maximum accepted by the S3 API
while True:
s3_objects, next_continuation_token = await _list_objects_v2_paginated(
self.client,
bucket=bucket,
prefix=prefix,
max_total_items=_DELETE_OBJECTS_MAX_ACCEPTED_ELEMENTS,
)

async for s3_objects in self.list_all_objects_gen(
bucket,
prefix=prefix,
max_yield_result_size=_DELETE_OBJECTS_MAX_ACCEPTED_ELEMENTS,
):
if objects_to_delete := [f["Key"] for f in s3_objects if "Key" in f]:
await self.client.delete_objects(
Bucket=bucket,
Delete={"Objects": [{"Key": key} for key in objects_to_delete]},
)

if next_continuation_token is None:
break

@s3_exception_handler(_logger)
async def delete_files_in_project_node(
self,
Expand Down
92 changes: 72 additions & 20 deletions services/storage/src/simcore_service_storage/simcore_s3_dsm.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import datetime
import functools
import logging
import tempfile
import urllib.parse
from collections import deque
from collections.abc import Awaitable, Callable
from contextlib import suppress
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Awaitable, Callable
from typing import Any, Final, cast

from aiohttp import web
from aiopg.sa import Engine
Expand All @@ -21,7 +21,7 @@
StorageFileID,
)
from models_library.users import UserID
from pydantic import AnyUrl, ByteSize, parse_obj_as
from pydantic import AnyUrl, ByteSize, NonNegativeInt, parse_obj_as
from servicelib.aiohttp.client_session import get_client_session
from servicelib.aiohttp.long_running_tasks.server import TaskProgress
from servicelib.utils import ensure_ends_with, logged_gather
Expand Down Expand Up @@ -63,7 +63,7 @@
UploadLinks,
)
from .s3 import get_s3_client
from .s3_client import S3MetaData
from .s3_client import S3MetaData, StorageS3Client
from .s3_utils import S3TransferDataCB, update_task_progress
from .settings import Settings
from .simcore_s3_dsm_utils import expand_directory, get_simcore_directory
Expand All @@ -74,6 +74,9 @@
is_valid_managed_multipart_upload,
)

_MAX_PARALLEL_S3_CALLS: Final[NonNegativeInt] = 10
_MAX_ELEMENTS_TO_LIST: Final[int] = 1000

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -496,11 +499,12 @@ async def deep_copy_project_simcore_s3(
src_project_files: list[
FileMetaDataAtDB
] = await db_file_meta_data.list_fmds(conn, project_ids=[src_project_uuid])
src_project_total_data_size = parse_obj_as(
ByteSize,
functools.reduce(
lambda a, b: a + b, [f.file_size for f in src_project_files], 0
),
project_file_sizes: list[ByteSize] = await logged_gather(
*[self._get_size(fmd) for fmd in src_project_files],
max_concurrency=_MAX_PARALLEL_S3_CALLS,
)
src_project_total_data_size: ByteSize = parse_obj_as(
ByteSize, sum(project_file_sizes)
)
# Step 3.1: copy: files referenced from file_metadata
copy_tasks: deque[Awaitable] = deque()
Expand All @@ -519,7 +523,7 @@ async def deep_copy_project_simcore_s3(

if new_node_id := node_mapping.get(src_fmd.node_id):
copy_tasks.append(
self._copy_file_s3_s3(
self._copy_path_s3_s3(
user_id,
src_fmd,
SimcoreS3FileID(
Expand Down Expand Up @@ -549,6 +553,21 @@ async def deep_copy_project_simcore_s3(
# ensure the full size is reported
s3_transfered_data_cb.finalize_transfer()

async def _get_size(self, fmd: FileMetaDataAtDB) -> ByteSize:
if not fmd.is_directory:
return fmd.file_size

# in case of directory list files and return size
total_size: int = 0
async for s3_objects in get_s3_client(self.app).list_all_objects_gen(
self.simcore_bucket_name,
prefix=f"{fmd.object_name}",
max_yield_result_size=_MAX_ELEMENTS_TO_LIST,
):
total_size += sum(x.get("Size", 0) for x in s3_objects)

return parse_obj_as(ByteSize, total_size)

async def search_files_starting_with(
self, user_id: UserID, prefix: str
) -> list[FileMetaData]:
Expand Down Expand Up @@ -596,7 +615,9 @@ async def create_soft_link(
async with self.engine.acquire() as conn:
return convert_db_to_model(await db_file_meta_data.insert(conn, target))

async def synchronise_meta_data_table(self, dry_run: bool) -> list[StorageFileID]:
async def synchronise_meta_data_table(
self, *, dry_run: bool
) -> list[StorageFileID]:
file_ids_to_remove = []
async with self.engine.acquire() as conn:
logger.warning(
Expand Down Expand Up @@ -663,7 +684,9 @@ async def _clean_expired_uploads(self):
)
list_of_fmds_to_delete = [
expired_fmd
for expired_fmd, updated_fmd in zip(list_of_expired_uploads, updated_fmds)
for expired_fmd, updated_fmd in zip(
list_of_expired_uploads, updated_fmds, strict=True
)
if not isinstance(updated_fmd, FileMetaDataAtDB)
]
if list_of_fmds_to_delete:
Expand Down Expand Up @@ -870,7 +893,7 @@ async def _copy_file_datcore_s3(

return convert_db_to_model(updated_fmd)

async def _copy_file_s3_s3(
async def _copy_path_s3_s3(
self,
user_id: UserID,
src_fmd: FileMetaDataAtDB,
Expand All @@ -886,17 +909,46 @@ async def _copy_file_s3_s3(
user_id,
dst_file_id,
upload_id=S3_UNDEFINED_OR_EXTERNAL_MULTIPART_ID,
is_directory=False,
is_directory=src_fmd.is_directory,
)
# NOTE: ensure the database is updated so cleaner does not pickup newly created uploads
await transaction.commit()

await get_s3_client(self.app).copy_file(
self.simcore_bucket_name,
src_fmd.object_name,
new_fmd.object_name,
bytes_transfered_cb=bytes_transfered_cb,
)
s3_client: StorageS3Client = get_s3_client(self.app)

if src_fmd.is_directory:
async for s3_objects in s3_client.list_all_objects_gen(
self.simcore_bucket_name,
prefix=src_fmd.object_name,
max_yield_result_size=_MAX_ELEMENTS_TO_LIST,
):
s3_objects_src_to_new: dict[str, str] = {
x["Key"]: x["Key"].replace(
f"{src_fmd.object_name}", f"{new_fmd.object_name}"
)
for x in s3_objects
}

await logged_gather(
*[
s3_client.copy_file(
self.simcore_bucket_name,
cast(SimcoreS3FileID, src),
cast(SimcoreS3FileID, new),
bytes_transfered_cb=bytes_transfered_cb,
)
for src, new in s3_objects_src_to_new.items()
],
max_concurrency=_MAX_PARALLEL_S3_CALLS,
)
else:
await s3_client.copy_file(
self.simcore_bucket_name,
src_fmd.object_name,
new_fmd.object_name,
bytes_transfered_cb=bytes_transfered_cb,
)

updated_fmd = await self._update_database_from_storage(conn, new_fmd)
logger.info("copied %s to %s", f"{src_fmd=}", f"{updated_fmd=}")
return convert_db_to_model(updated_fmd)
Expand Down
Loading