diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 7ba394e..dbea118 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -1,6 +1,9 @@ """ The main implementation of the ``Container`` class of the object store. """ + +from __future__ import annotations + # pylint: disable=too-many-lines import dataclasses import io @@ -12,21 +15,7 @@ from contextlib import contextmanager from enum import Enum from pathlib import Path -from typing import ( - Any, - Callable, - Dict, - Iterator, - List, - Literal, - Optional, - Sequence, - Set, - Tuple, - Type, - Union, - overload, -) +from typing import TYPE_CHECKING, overload from sqlalchemy.orm.session import Session from sqlalchemy.sql import func @@ -62,6 +51,24 @@ yield_first_element, ) +if TYPE_CHECKING: + from typing import ( + Any, + Callable, + Dict, + Iterator, + List, + Literal, + Optional, + Sequence, + Set, + Tuple, + Type, + Union, + ) + + from mypy_extensions import Arg + ObjQueryResults = namedtuple( "ObjQueryResults", ["hashkey", "offset", "length", "compressed", "size"] ) @@ -106,19 +113,19 @@ class Container: # pylint: disable=too-many-public-methods # (after VACUUMing, as mentioned above). _MAX_CHUNK_ITERATE_LENGTH = 9500 - def __init__(self, folder: Union[str, Path]) -> None: + def __init__(self, folder: str | Path) -> None: """Create the class that represents the container. :param folder: the path to a folder that will host this object-store container. """ self._folder = Path(folder).resolve() # Will be populated by the _get_session function - self._session: Optional[Session] = None + self._session: Session | None = None # These act as caches and will be populated by the corresponding properties # IMPORANT! IF YOU ADD MORE, REMEMBER TO CLEAR THEM IN `init_container()`! - self._current_pack_id: Optional[int] = None - self._config: Optional[dict] = None + self._current_pack_id: int | None = None + self._config: dict | None = None def get_folder(self) -> Path: """Return the path to the folder that will host the object-store container.""" @@ -130,7 +137,7 @@ def close(self) -> None: self._session.close() self._session = None - def __enter__(self) -> "Container": + def __enter__(self) -> Container: """Return a context manager that will close the session when exiting the context.""" return self @@ -182,12 +189,12 @@ def _get_session( @overload def _get_session( self, create: bool = False, raise_if_missing: Literal[False] = False - ) -> Optional[Session]: + ) -> Session | None: ... def _get_session( self, create: bool = False, raise_if_missing: bool = False - ) -> Optional[Session]: + ) -> Session | None: """Return a new session to connect to the pack-index SQLite DB. :param create: if True, creates the sqlite file and schema. @@ -225,7 +232,7 @@ def _get_loose_path_from_hashkey(self, hashkey: str) -> Path: return self._get_loose_folder() / hashkey def _get_pack_path_from_pack_id( - self, pack_id: Union[str, int], allow_repack_pack: bool = False + self, pack_id: str | int, allow_repack_pack: bool = False ) -> Path: """Return the path of the pack file on disk for the given pack ID. @@ -386,7 +393,7 @@ def init_container( self._get_session(create=True) - def _get_repository_config(self) -> Dict[str, Union[int, str]]: + def _get_repository_config(self) -> dict[str, int | str]: """Return the repository config.""" if self._config is None: if not self.is_initialised: @@ -444,7 +451,7 @@ def _get_compressobj_instance(self): """Return the correct `compressobj` class for the compression algorithm defined for this container.""" return get_compressobj_instance(self.compression_algorithm) - def _get_stream_decompresser(self) -> Type[ZlibStreamDecompresser]: + def _get_stream_decompresser(self) -> type[ZlibStreamDecompresser]: """Return a new instance of the correct StreamDecompresser class for the compression algorithm defined for this container. """ @@ -479,7 +486,7 @@ def get_object_stream(self, hashkey: str) -> Iterator[StreamReadBytesType]: @contextmanager def get_object_stream_and_meta( self, hashkey: str - ) -> Iterator[Tuple[StreamReadBytesType, ObjectMeta],]: + ) -> Iterator[tuple[StreamReadBytesType, ObjectMeta],]: """Return a context manager yielding a stream to get the content of an object, and a metadata dictionary. To be used as a context manager:: @@ -521,7 +528,7 @@ def _get_objects_stream_meta_generator( hashkeys: Sequence[str], skip_if_missing: bool, with_streams: Literal[False], - ) -> Iterator[Tuple[str, ObjectMeta]]: + ) -> Iterator[tuple[str, ObjectMeta]]: ... @overload @@ -530,7 +537,7 @@ def _get_objects_stream_meta_generator( hashkeys: Sequence[str], skip_if_missing: bool, with_streams: Literal[True], - ) -> Iterator[Tuple[str, Optional[StreamSeekBytesType], ObjectMeta]]: + ) -> Iterator[tuple[str, StreamSeekBytesType | None, ObjectMeta]]: ... def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too-many-statements,too-many-locals @@ -539,10 +546,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too skip_if_missing: bool, with_streams: bool, ) -> Iterator[ - Union[ - Tuple[str, ObjectMeta], - Tuple[str, Optional[StreamSeekBytesType], ObjectMeta], - ] + (tuple[str, ObjectMeta] | tuple[str, StreamSeekBytesType | None, ObjectMeta]) ]: """Return a generator yielding triplets of (hashkey, open stream, size). @@ -569,12 +573,12 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too # open at a given time. # The try/finally block makes sure we close it at the end, if any was open. last_open_file = None - lazy_loose_stream: Optional[LazyLooseStream] = None + lazy_loose_stream: LazyLooseStream | None = None # Operate on a set - only return once per required hashkey, even if required more than once hashkeys_set = set(hashkeys) - hashkeys_in_packs: Set[str] = set() + hashkeys_in_packs: set[str] = set() packs = defaultdict(list) # Currently ordering in the DB (it's ordered across all packs, but this should not be @@ -851,7 +855,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too @contextmanager def get_objects_stream_and_meta( self, hashkeys: Sequence[str], skip_if_missing: bool = True - ) -> Iterator[Iterator[Tuple[str, Optional[StreamSeekBytesType], ObjectMeta]]]: + ) -> Iterator[Iterator[tuple[str, StreamSeekBytesType | None, ObjectMeta]]]: """A context manager returning a generator yielding triplets of (hashkey, open stream, metadata). :note: the hash keys yielded are often in a *different* order than the original @@ -893,7 +897,7 @@ def get_lazy_loose_stream(self, hashkey: str) -> LazyLooseStream: def get_objects_meta( self, hashkeys: Sequence[str], skip_if_missing: bool = True - ) -> Iterator[Tuple[str, ObjectMeta]]: + ) -> Iterator[tuple[str, ObjectMeta]]: """A generator yielding pairs of (hashkey, metadata). :note: the hash keys yielded are often in a *different* order than the original @@ -958,7 +962,7 @@ def get_object_meta(self, hashkey: str) -> ObjectMeta: ), "No object found, this should never happen since I pass skip_if_missing=False!" raise NotExistent(f"No object with hash key {hashkey}") - def has_objects(self, hashkeys: Union[List[str], Tuple[str, ...]]) -> List[bool]: + def has_objects(self, hashkeys: list[str] | tuple[str, ...]) -> list[bool]: """Return whether the container contains objects with the given hash keys. :param hashkeys: a list of hash keys to check. @@ -986,8 +990,8 @@ def has_object(self, hashkey: str) -> bool: return self.has_objects([hashkey])[0] def get_objects_content( - self, hashkeys: List[str], skip_if_missing: bool = True - ) -> Dict[str, Optional[bytes]]: + self, hashkeys: list[str], skip_if_missing: bool = True + ) -> dict[str, bytes | None]: """Get the content of a number of objects with given hash keys. :note: use this method only if you know objects fit in memory. @@ -998,7 +1002,7 @@ def get_objects_content( :return: a dictionary of byte streams where the keys are the hash keys and the values are the object contents. """ - retrieved: Dict[str, Optional[bytes]] = {} + retrieved: dict[str, bytes | None] = {} with self.get_objects_stream_and_meta( hashkeys=hashkeys, skip_if_missing=skip_if_missing ) as triplets: @@ -1259,8 +1263,8 @@ def _write_data_to_packfile( pack_handle: StreamWriteBytesType, read_handle: StreamReadBytesType, compress: bool, - hash_type: Optional[str] = None, - ) -> Union[Tuple[int, None], Tuple[int, str]]: + hash_type: str | None = None, + ) -> tuple[int, None] | tuple[int, str]: """Append data, read from read_handle until it ends, to the correct packfile. Return the number of bytes READ (note that this will be different @@ -1316,9 +1320,11 @@ def _write_data_to_packfile( def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-many-statements self, - compress: Union[bool, CompressMode] = CompressMode.NO, + compress: bool | CompressMode = CompressMode.NO, validate_objects: bool = True, do_fsync: bool = True, + callback: None + | (Callable[[Arg(str, "action"), Arg(Any, "value")], None]) = None, ) -> None: """Pack all loose objects. @@ -1338,6 +1344,15 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man Needed to guarantee that data will be there even in the case of a power loss. Set to False if you don't need such a guarantee (anyway the loose version will be kept, so often this guarantee is not strictly needed). + :param callback: a callback function that can be used to report progress. + The callback function should accept two arguments: a string with the action being performed + and the value of the action. The action can be "init" (initialization), + "update" (update of the progress), or "close" (finalization). + In case of "init", the value is a dictionary with the key "total" as the total size of the operation + and key "description" as the label of the operation. + In case of "update", the value is amount of the operation that has been completed. + In case of "close", the value is None. + return value of the callback function is ignored. """ hash_type = self.hash_type if validate_objects else None @@ -1393,6 +1408,19 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man # on Mac and on Windows (see issues #37 and #43). Therefore, I do NOT delete them, # and deletion is deferred to a manual clean-up operation. + if callback: + callback( + action="init", + value={ + "total": self.get_total_size()["total_size_loose"], + "description": "Packing loose objects", + }, + ) + # I wish this would show as MB, GB. In tqdm it's easy: + # just pass unit='B' and unit_scale=1024. + # But to do this here, some changes is in callback function is needed. + # see this feature request for more details: https://github.com/aiidateam/aiida-core/issues/6564 + # Outer loop: this is used to continue when a new pack file needs to be created while loose_objects: # Store the last pack integer ID, needed to know later if I need to open a new pack @@ -1423,7 +1451,7 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man # Get next hash key to process loose_hashkey = loose_objects.pop() - obj_dict: Dict[str, Any] = {} + obj_dict: dict[str, Any] = {} obj_dict["hashkey"] = loose_hashkey obj_dict["pack_id"] = pack_int_id obj_dict["offset"] = pack_handle.tell() @@ -1469,6 +1497,11 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man # Appending for later bulk commit - see comments in add_streamed_objects_to_pack obj_dicts.append(obj_dict) + if callback: + callback( + action="update", + value=obj_dict["size"], + ) # It's now time to write to the DB, in a single bulk operation (per pack) if obj_dicts: # Here I shouldn't need to do `OR IGNORE` as in `add_streamed_objects_to_pack` @@ -1504,6 +1537,8 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man # HOWEVER, while this would work fine on Linux, there are concurrency issues both # on Mac and on Windows (see issues #37 and #43). Therefore, I do NOT delete them, # and deletion is deferred to a manual clean-up operation. + if callback: + callback(action="close", value=None) def add_streamed_object_to_pack( # pylint: disable=too-many-arguments self, @@ -1512,7 +1547,7 @@ def add_streamed_object_to_pack( # pylint: disable=too-many-arguments open_streams: bool = False, no_holes: bool = False, no_holes_read_twice: bool = True, - callback: Optional[Callable] = None, + callback: Callable | None = None, callback_size_hint: int = 0, do_fsync: bool = True, do_commit: bool = True, @@ -1526,7 +1561,7 @@ def add_streamed_object_to_pack( # pylint: disable=too-many-arguments length in the callbacks :return: a single object hash key """ - streams: List[StreamSeekBytesType] = [ + streams: list[StreamSeekBytesType] = [ CallbackStreamWrapper( stream, callback=callback, total_length=callback_size_hint ) @@ -1551,15 +1586,15 @@ def add_streamed_object_to_pack( # pylint: disable=too-many-arguments def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-branches, too-many-statements, too-many-arguments self, - stream_list: Union[List[StreamSeekBytesType], List[LazyOpener]], + stream_list: list[StreamSeekBytesType] | list[LazyOpener], compress: bool = False, open_streams: bool = False, no_holes: bool = False, no_holes_read_twice: bool = True, - callback: Optional[Callable] = None, + callback: Callable | None = None, do_fsync: bool = True, do_commit: bool = True, - ) -> List[str]: + ) -> list[str]: """Add objects directly to a pack, reading from a list of streams. This is a maintenance operation, available mostly for efficiency reasons @@ -1599,7 +1634,7 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b compress, bool ), "Only True of False are valid `compress` modes when adding direclty to a pack" yield_per_size = 1000 - hashkeys: List[str] = [] + hashkeys: list[str] = [] # Make a copy of the list and revert its order, so we can pop from the list # without affecting the original list, and it's from the end so it's fast @@ -1713,7 +1748,7 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b # is already there position_before = pack_handle.tell() - obj_dict: Dict[str, Any] = {} + obj_dict: dict[str, Any] = {} obj_dict["pack_id"] = pack_int_id obj_dict["compressed"] = compress obj_dict["offset"] = pack_handle.tell() @@ -1848,14 +1883,14 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b def add_objects_to_pack( # pylint: disable=too-many-arguments self, - content_list: Union[List[bytes], Tuple[bytes, ...]], + content_list: list[bytes] | tuple[bytes, ...], compress: bool = False, no_holes: bool = False, no_holes_read_twice: bool = True, - callback: Optional[Callable] = None, + callback: Callable | None = None, do_fsync: bool = True, do_commit: bool = True, - ) -> List[str]: + ) -> list[str]: """Add objects directly to a pack, reading from a list of content byte arrays. This is a maintenance operation, available mostly for efficiency reasons @@ -1881,7 +1916,7 @@ def add_objects_to_pack( # pylint: disable=too-many-arguments :return: a list of object hash keys """ - stream_list: List[StreamSeekBytesType] = [ + stream_list: list[StreamSeekBytesType] = [ io.BytesIO(content) for content in content_list ] return self.add_streamed_objects_to_pack( @@ -2075,12 +2110,12 @@ def clean_storage( # pylint: disable=too-many-branches,too-many-locals def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-many-branches,too-many-arguments self, hashkeys: Sequence[str], - source_container: "Container", + source_container: Container, compress: bool = False, target_memory_bytes: int = 104857600, - callback: Optional[Callable] = None, + callback: Callable | None = None, do_fsync: bool = True, - ) -> Dict[str, str]: + ) -> dict[str, str]: """Imports the objects with the specified hashkeys into the container. :param hashkeys: an iterable of hash keys. @@ -2103,7 +2138,7 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m # We load data in this cache as long as the memory usage is < target_memory_bytes # We then flush in 'bulk' to the `other_container`, thus speeding up the process - content_cache: Dict[str, bytes] = {} + content_cache: dict[str, bytes] = {} cache_size = 0 if source_container.hash_type == self.hash_type: @@ -2302,8 +2337,8 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m # Let us also compute the hash def _validate_hashkeys_pack( # pylint: disable=too-many-locals - self, pack_id: int, callback: Optional[Callable] = None - ) -> Dict[str, Union[List[Union[str, Any]], List[Any]]]: + self, pack_id: int, callback: Callable | None = None + ) -> dict[str, list[str | Any] | list[Any]]: """Validate all hashkeys and returns a dictionary of problematic entries. The keys are the problem type, the values are a list of hashkeys of problematic objects. @@ -2442,7 +2477,7 @@ def callback(self, action, value): "overlapping_packed": overlapping, } - def validate(self, callback: Optional[Callable] = None) -> ValidationIssues: + def validate(self, callback: Callable | None = None) -> ValidationIssues: """Perform a number of validations on the container content, to make sure it is not corrupt. The callback can be used to show a progress bar (see e.g. its use in the `validate` command of @@ -2453,7 +2488,7 @@ def validate(self, callback: Optional[Callable] = None) -> ValidationIssues: dataclass to check if there are errors, or check all fields if you prefer and see if all fields are empty lists (which means no error). """ - all_errors: Dict[str, List[str]] = { + all_errors: dict[str, list[str]] = { field.name: [] for field in dataclasses.fields(ValidationIssues) } @@ -2491,7 +2526,7 @@ def validate(self, callback: Optional[Callable] = None) -> ValidationIssues: return ValidationIssues(**dict(all_errors)) - def delete_objects(self, hashkeys: List[str]) -> List[Union[str, Any]]: + def delete_objects(self, hashkeys: list[str]) -> list[str | Any]: """Delete the selected objects. .. note:: In the current version, this has to be considered a maintenance operation, and as such it should @@ -2578,7 +2613,12 @@ def delete_objects(self, hashkeys: List[str]) -> List[Union[str, Any]]: # was deleted) should be considered as if the object has *not* been deleted return list(deleted_loose.union(deleted_packed)) - def repack(self, compress_mode: CompressMode = CompressMode.KEEP) -> None: + def repack( + self, + compress_mode: CompressMode = CompressMode.KEEP, + callback: None + | (Callable[[Arg(str, "action"), Arg(Any, "value")], None]) = None, + ) -> None: """Perform a repack of all packed objects. At the end, it also VACUUMs the DB to reclaim unused space and make @@ -2587,13 +2627,26 @@ def repack(self, compress_mode: CompressMode = CompressMode.KEEP) -> None: This is a maintenance operation. :param compress_mode: see docstring of ``repack_pack``. + :param callback: a callback function that can be used to report progress. + The callback function should accept two arguments: a string with the action being performed + and the value of the action. The action can be "init" (initialization), + "update" (update of the progress), or "close" (finalization). + In case of "init", the value is a dictionary with the key "total" as the total size of the operation + and key "description" as the label of the operation. + In case of "update", the value is amount of the operation that has been completed. + In case of "close", the value is None. + return value of the callback function is ignored. """ for pack_id in self._list_packs(): - self.repack_pack(pack_id, compress_mode=compress_mode) + self.repack_pack(pack_id, compress_mode=compress_mode, callback=callback) self._vacuum() - def repack_pack( # pylint: disable=too-many-branches,too-many-statements - self, pack_id: str, compress_mode: CompressMode = CompressMode.KEEP + def repack_pack( # pylint: disable=too-many-branches,too-many-statements,too-many-locals + self, + pack_id: str, + compress_mode: CompressMode = CompressMode.KEEP, + callback: None + | (Callable[[Arg(str, "action"), Arg(Any, "value")], None]) = None, ) -> None: """Perform a repack of a given pack object. @@ -2604,6 +2657,15 @@ def repack_pack( # pylint: disable=too-many-branches,too-many-statements preserves the same compression (this means that repacking is *much* faster as it can simply transfer the bytes without decompressing everything first, and recompressing it back again). + :param callback: a callback function that can be used to report progress. + The callback function should accept two arguments: a string with the action being performed + and the value of the action. The action can be "init" (initialization), + "update" (update of the progress), or "close" (finalization). + In case of "init", the value is a dictionary with the key "total" as the total size of the operation + and key "description" as the label of the operation. + In case of "update", the value is amount of the operation that has been completed. + In case of "close", the value is None. + return value of the callback function is ignored. """ assert ( pack_id != self._REPACK_PACK_ID @@ -2628,6 +2690,14 @@ def repack_pack( # pylint: disable=too-many-branches,too-many-statements obj_dicts = [] # At least one object. Let's repack. We have checked before that the # REPACK_PACK_ID did not exist. + if callback: + callback( + action="init", + value={ + "total": self.get_total_size()["total_size_packed"], + "description": f"Repack {pack_id}", + }, + ) with self.lock_pack( str(self._REPACK_PACK_ID), allow_repack_pack=True ) as write_pack_handle: @@ -2653,9 +2723,9 @@ def repack_pack( # pylint: disable=too-many-branches,too-many-statements source_compressed, ) in session.execute(stmt): # This is the read handle of the bytes in the pack - it might be - read_handle: Union[ - PackedObjectReader, ZlibStreamDecompresser - ] = PackedObjectReader(read_pack, offset, length) + read_handle: ( + PackedObjectReader | ZlibStreamDecompresser + ) = PackedObjectReader(read_pack, offset, length) # Determine if I should compress or not the destination - this function will # try to do it in a cheap way (e.g. if the source is already compressed, will just @@ -2725,13 +2795,17 @@ def repack_pack( # pylint: disable=too-many-branches,too-many-statements # Appending for later bulk commit # I will assume that all objects of a single pack fit in memory obj_dicts.append(obj_dict) + if callback: + callback(action="update", value=obj_dict["size"]) + # safe flush to disk seems to be a time consuming operation, but no easy way to include in the progress bar safe_flush_to_disk( write_pack_handle, self._get_pack_path_from_pack_id( self._REPACK_PACK_ID, allow_repack_pack=True ), ) - + if callback: + callback(action="close", value=None) # We are done with data transfer. # At this stage we just have a new pack -1 (_REPACK_PACK_ID) but it is never referenced. # Let us store the information in the DB. diff --git a/tests/test_container.py b/tests/test_container.py index dae8841..f0c89e5 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -3446,6 +3446,36 @@ def test_repack_auto_many_sizes(temp_container: Container, start_compressed): assert issues.is_valid() +def test_repack_progress_bar(temp_container: Container): + """Check that the progress bar is correctly updated when packing all loose objects.""" + objects = [] + for length in range(0, 100): + objects.append(b"a" * length) + + temp_container.add_objects_to_pack(objects, compress=False) + + passed_updates = 0 + passed_total = 0 + + def progress(action, value): + """A dummy progress function.""" + if action == "init": + assert ( + value["total"] == temp_container.get_total_size()["total_size_packed"] + ) + nonlocal passed_total + passed_total = value["total"] + assert value["description"] == "Repack 0" + elif action == "update": + isinstance(value, (int, float)) + nonlocal passed_updates + passed_updates += value + elif action == "close": + assert passed_updates == passed_total + + temp_container.repack(callback=progress) + + @pytest.mark.parametrize("compress_mode", [True, False] + list(CompressMode)) def test_pack_all_loose_compress_modes(temp_container: Container, compress_mode): """Check that pack_all_loose() uses the correct compression mode. @@ -3518,6 +3548,33 @@ def test_pack_all_loose_many(temp_container): assert retrieved == expected +def test_pack_all_loose_progress_bar(temp_container): + """Check that the progress bar is correctly updated when packing all loose objects.""" + # Add 10 objects + for idx in range(10): + content = f"{idx}".encode() + temp_container.add_object(content) + + passed_updates = 0 + passed_total = 0 + + def progress(action, value): + """A dummy progress function.""" + if action == "init": + assert value["total"] == temp_container.get_total_size()["total_size_loose"] + nonlocal passed_total + passed_total = value["total"] + assert value["description"] == "Packing loose objects" + elif action == "update": + isinstance(value, (int, float)) + nonlocal passed_updates + passed_updates += value + elif action == "close": + assert passed_updates == passed_total + + temp_container.pack_all_loose(callback=progress) + + def test_container_id(temp_container): """Check the creation of unique container IDs.""" old_container_id = temp_container.container_id