Skip to content

Commit

Permalink
🐛 archiving_utils creates deterministic zip archives (#6472)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrei Neagu <[email protected]>
  • Loading branch information
GitHK and Andrei Neagu authored Oct 3, 2024
1 parent 369777e commit 840fdf5
Show file tree
Hide file tree
Showing 24 changed files with 196 additions and 31 deletions.
2 changes: 2 additions & 0 deletions packages/aws-library/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ referencing==0.29.3
# -c requirements/../../../packages/service-library/requirements/./constraints.txt
# jsonschema
# jsonschema-specifications
repro-zipfile==0.3.1
# via -r requirements/../../../packages/service-library/requirements/_base.in
requests==2.32.3
# via opentelemetry-exporter-otlp-proto-http
rich==13.8.1
Expand Down
1 change: 1 addition & 0 deletions packages/service-library/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pydantic
pyinstrument
pyyaml
redis
repro-zipfile
tenacity
toolz
tqdm
2 changes: 2 additions & 0 deletions packages/service-library/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ referencing==0.29.3
# -c requirements/./constraints.txt
# jsonschema
# jsonschema-specifications
repro-zipfile==0.3.1
# via -r requirements/_base.in
requests==2.32.3
# via opentelemetry-exporter-otlp-proto-http
rich==13.8.1
Expand Down
2 changes: 2 additions & 0 deletions packages/service-library/requirements/_test.in
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ coverage
docker
faker
flaky
numpy
openapi-spec-validator
pillow
pytest
pytest-aiohttp
pytest-asyncio
Expand Down
4 changes: 4 additions & 0 deletions packages/service-library/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ mypy==1.11.2
# via sqlalchemy
mypy-extensions==1.0.0
# via mypy
numpy==2.1.1
# via -r requirements/_test.in
openapi-schema-validator==0.6.2
# via
# -c requirements/_aiohttp.txt
Expand All @@ -137,6 +139,8 @@ pathable==0.4.3
# via
# -c requirements/_aiohttp.txt
# jsonschema-path
pillow==10.4.0
# via -r requirements/_test.in
pluggy==1.5.0
# via pytest
pprintpp==0.4.0
Expand Down
67 changes: 37 additions & 30 deletions packages/service-library/src/servicelib/archiving_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
import logging
import types
import zipfile
from contextlib import AsyncExitStack, contextmanager
from collections.abc import Awaitable, Callable, Iterator
from contextlib import AsyncExitStack, contextmanager, suppress
from functools import partial
from pathlib import Path
from typing import Any, Awaitable, Callable, Final, Iterator
from typing import Any, Final

import tqdm
from models_library.basic_types import IDStr
from pydantic import NonNegativeFloat
from repro_zipfile import ReproducibleZipFile # type: ignore[import-untyped]
from tqdm.contrib.logging import logging_redirect_tqdm, tqdm_logging_redirect

from .file_utils import remove_directory
Expand All @@ -21,8 +24,9 @@
_MIN: Final[int] = 60 # secs
_MAX_UNARCHIVING_WORKER_COUNT: Final[int] = 2
_CHUNK_SIZE: Final[int] = 1024 * 8
_UNIT_MULTIPLIER: Final[NonNegativeFloat] = 1024.0

log = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)


class ArchiveError(Exception):
Expand All @@ -35,10 +39,10 @@ def _human_readable_size(size, decimal_places=3):
human_readable_file_size = float(size)
unit = "B"
for t_unit in ["B", "KiB", "MiB", "GiB", "TiB"]:
if human_readable_file_size < 1024.0:
if human_readable_file_size < _UNIT_MULTIPLIER:
unit = t_unit
break
human_readable_file_size /= 1024.0
human_readable_file_size /= _UNIT_MULTIPLIER

return f"{human_readable_file_size:.{decimal_places}f}{unit}"

Expand All @@ -56,19 +60,21 @@ def _iter_files_to_compress(
dir_path: Path, exclude_patterns: set[str] | None
) -> Iterator[Path]:
exclude_patterns = exclude_patterns if exclude_patterns else set()
for path in dir_path.rglob("*"):
# NOTE: make sure to sort paths othrwise between different runs
# the zip will have a different structure and hash
for path in sorted(dir_path.rglob("*")):
if path.is_file() and not any(
fnmatch.fnmatch(f"{path}", x) for x in exclude_patterns
):
yield path


def _strip_directory_from_path(input_path: Path, to_strip: Path) -> Path:
_to_strip = f"{str(to_strip)}/"
_to_strip = f"{to_strip}/"
return Path(str(input_path).replace(_to_strip, ""))


class _FastZipFileReader(zipfile.ZipFile):
class _FastZipFileReader(ReproducibleZipFile):
"""
Used to gain a speed boost of several orders of magnitude.
Expand All @@ -86,7 +92,7 @@ class _FastZipFileReader(zipfile.ZipFile):
files contained in the archive.
"""

def _RealGetContents(self):
def _RealGetContents(self): # noqa: N802
"""method disabled"""


Expand All @@ -107,7 +113,7 @@ def _zipfile_single_file_extract_worker(
zip_file_path: Path,
file_in_archive: zipfile.ZipInfo,
destination_folder: Path,
is_dir: bool,
is_dir: bool, # noqa: FBT001
) -> Path:
"""Extracts file_in_archive from the archive zip_file_path -> destination_folder/file_in_archive
Expand All @@ -129,7 +135,7 @@ def _zipfile_single_file_extract_worker(
desc=desc,
**(
_TQDM_FILE_OPTIONS
| dict(miniters=_compute_tqdm_miniters(file_in_archive.file_size))
| {"miniters": _compute_tqdm_miniters(file_in_archive.file_size)}
),
) as pbar:
while chunk := zip_fp.read(_CHUNK_SIZE):
Expand All @@ -139,7 +145,7 @@ def _zipfile_single_file_extract_worker(


def _ensure_destination_subdirectories_exist(
zip_file_handler: zipfile.ZipFile, destination_folder: Path
zip_file_handler: ReproducibleZipFile, destination_folder: Path
) -> None:
# assemble full destination paths
full_destination_paths = {
Expand Down Expand Up @@ -177,7 +183,7 @@ async def unarchive_dir(
)
async with AsyncExitStack() as zip_stack:
zip_file_handler = zip_stack.enter_context(
zipfile.ZipFile( # pylint: disable=consider-using-with
ReproducibleZipFile( # pylint: disable=consider-using-with
archive_to_extract,
mode="r",
)
Expand Down Expand Up @@ -232,7 +238,7 @@ async def unarchive_dir(
extracted_path = await future
extracted_file_size = extracted_path.stat().st_size
if tqdm_progress.update(extracted_file_size) and log_cb:
with log_catch(log, reraise=False):
with log_catch(_logger, reraise=False):
await log_cb(f"{tqdm_progress}")
await sub_prog.update(extracted_file_size)
extracted_paths.append(extracted_path)
Expand Down Expand Up @@ -266,34 +272,37 @@ async def unarchive_dir(

@contextmanager
def _progress_enabled_zip_write_handler(
zip_file_handler: zipfile.ZipFile, progress_bar: tqdm.tqdm
) -> Iterator[zipfile.ZipFile]:
zip_file_handler: ReproducibleZipFile, progress_bar: tqdm.tqdm
) -> Iterator[ReproducibleZipFile]:
"""This function overrides the default zip write fct to allow to get progress using tqdm library"""

def _write_with_progress(
original_write_fct, self, data, pbar # pylint: disable=unused-argument
original_write_fct,
self, # pylint: disable=unused-argument # noqa: ARG001
data,
pbar,
):
pbar.update(len(data))
return original_write_fct(data)

# Replace original write() with a wrapper to track progress
assert zip_file_handler.fp # nosec
old_write_method = zip_file_handler.fp.write
zip_file_handler.fp.write = types.MethodType( # type: ignore[assignment]
zip_file_handler.fp.write = types.MethodType(
partial(_write_with_progress, old_write_method, pbar=progress_bar),
zip_file_handler.fp,
)
try:
yield zip_file_handler
finally:
zip_file_handler.fp.write = old_write_method # type: ignore[method-assign]
zip_file_handler.fp.write = old_write_method


def _add_to_archive(
dir_to_compress: Path,
destination: Path,
compress: bool,
store_relative_path: bool,
compress: bool, # noqa: FBT001
store_relative_path: bool, # noqa: FBT001
update_progress,
loop,
exclude_patterns: set[str] | None = None,
Expand All @@ -308,11 +317,10 @@ def _add_to_archive(
desc=f"{desc}\n",
total=folder_size_bytes,
**(
_TQDM_FILE_OPTIONS
| dict(miniters=_compute_tqdm_miniters(folder_size_bytes))
_TQDM_FILE_OPTIONS | {"miniters": _compute_tqdm_miniters(folder_size_bytes)}
),
) as progress_bar, _progress_enabled_zip_write_handler(
zipfile.ZipFile(destination, "w", compression=compression), progress_bar
ReproducibleZipFile(destination, "w", compression=compression), progress_bar
) as zip_file_handler:
for file_to_add in _iter_files_to_compress(dir_to_compress, exclude_patterns):
progress_bar.set_description(f"{desc}/{file_to_add.name}\n")
Expand Down Expand Up @@ -393,10 +401,11 @@ async def archive_dir(
if destination.is_file():
destination.unlink(missing_ok=True)

raise ArchiveError(
msg = (
f"Failed archiving {dir_to_compress} -> {destination} due to {type(err)}."
f"Details: {err}"
) from err
)
raise ArchiveError(msg) from err

except BaseException:
if destination.is_file():
Expand Down Expand Up @@ -453,11 +462,9 @@ def prune(self, exclude: set[Path]) -> None:
if path.is_file():
path.unlink()
elif path.is_dir():
try:
# prevents deleting non-empty folders
with suppress(OSError):
path.rmdir()
except OSError:
# prevents deleting non-empty folders
pass

# second pass to delete empty folders
# after deleting files, some folders might have been left empty
Expand Down
92 changes: 91 additions & 1 deletion packages/service-library/tests/test_archiving_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@
import secrets
import string
import tempfile
from collections.abc import AsyncIterable, Callable, Iterable, Iterator
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Iterable, Iterator

import numpy
import pytest
from faker import Faker
from PIL import Image
from pydantic import ByteSize, parse_obj_as
from pytest_benchmark.plugin import BenchmarkFixture
from servicelib import archiving_utils
from servicelib.archiving_utils import ArchiveError, archive_dir, unarchive_dir
from servicelib.file_utils import remove_directory


def _print_tree(path: Path, level=0):
Expand Down Expand Up @@ -596,3 +599,90 @@ def run_async_test(*args, **kwargs):
)

benchmark(run_async_test)


def _touch_all_files_in_path(path_to_archive: Path) -> None:
for path in path_to_archive.rglob("*"):
print("touching", path)
path.touch()


@pytest.fixture
async def mixed_file_types(tmp_path: Path, faker: Faker) -> AsyncIterable[Path]:
base_dir = tmp_path / "mixed_types_dir"
base_dir.mkdir()

# mixed small text files and binary files
(base_dir / "empty").mkdir()
(base_dir / "d1").mkdir()
(base_dir / "d1" / "f1.txt").write_text(faker.text())
(base_dir / "d1" / "b2.bin").write_bytes(faker.json_bytes())
(base_dir / "d1" / "sd1").mkdir()
(base_dir / "d1" / "sd1" / "f1.txt").write_text(faker.text())
(base_dir / "d1" / "sd1" / "b2.bin").write_bytes(faker.json_bytes())
(base_dir / "images").mkdir()

# images cause issues with zipping, below content produced different
# hashes for zip files
for i in range(2):
image_dir = base_dir / f"images{i}"
image_dir.mkdir()
for n in range(50):
a = numpy.random.rand(900, 900, 3) * 255 # noqa: NPY002
im_out = Image.fromarray(a.astype("uint8")).convert("RGB")
image_path = image_dir / f"out{n}.jpg"
im_out.save(image_path)

print("mixed_types_dir ---")
_print_tree(base_dir)

yield base_dir

await remove_directory(base_dir)
assert not base_dir.exists()


@pytest.mark.parametrize(
"store_relative_path, compress",
[
# test that all possible combinations still work
pytest.param(False, False, id="no_relative_path_no_compress"),
pytest.param(False, True, id="no_relative_path_with_compression"),
pytest.param(True, False, id="nodeports_options"),
pytest.param(True, True, id="with_relative_path_with_compression"),
],
)
async def test_regression_archive_hash_does_not_change(
mixed_file_types: Path,
tmp_path: Path,
store_relative_path: bool,
compress: bool,
):
destination_path = tmp_path / "archives_to_compare"
destination_path.mkdir(parents=True, exist_ok=True)

first_archive = destination_path / "first"
second_archive = destination_path / "second"
assert not first_archive.exists()
assert not second_archive.exists()
assert first_archive != second_archive

await archive_dir(
mixed_file_types,
first_archive,
compress=compress,
store_relative_path=store_relative_path,
)

_touch_all_files_in_path(mixed_file_types)

await archive_dir(
mixed_file_types,
second_archive,
compress=compress,
store_relative_path=store_relative_path,
)

_, first_hash = _compute_hash(first_archive)
_, second_hash = _compute_hash(second_archive)
assert first_hash == second_hash
2 changes: 2 additions & 0 deletions packages/simcore-sdk/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ referencing==0.29.3
# -c requirements/../../../packages/service-library/requirements/./constraints.txt
# jsonschema
# jsonschema-specifications
repro-zipfile==0.3.1
# via -r requirements/../../../packages/service-library/requirements/_base.in
requests==2.32.3
# via opentelemetry-exporter-otlp-proto-http
rich==13.8.1
Expand Down
4 changes: 4 additions & 0 deletions services/api-server/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,10 @@ redis==5.0.4
# -c requirements/../../../requirements/constraints.txt
# -r requirements/../../../packages/service-library/requirements/_base.in
# -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/_base.in
repro-zipfile==0.3.1
# via
# -r requirements/../../../packages/service-library/requirements/_base.in
# -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/_base.in
requests==2.32.3
# via opentelemetry-exporter-otlp-proto-http
rich==13.7.1
Expand Down
Loading

0 comments on commit 840fdf5

Please sign in to comment.