Skip to content

Commit

Permalink
🎨Dask sidecar: use reproducible zipfile library (#6571)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Oct 22, 2024
1 parent 6e8867b commit 4c5aa41
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 6 deletions.
1 change: 1 addition & 0 deletions services/dask-sidecar/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ fsspec[http, s3] # sub types needed as we acces http and s3 here
lz4 # for compression
pydantic[email,dotenv]
prometheus_client
repro-zipfile
4 changes: 3 additions & 1 deletion services/dask-sidecar/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,9 @@ referencing==0.29.3
# jsonschema
# jsonschema-specifications
repro-zipfile==0.3.1
# via -r requirements/../../../packages/service-library/requirements/_base.in
# via
# -r requirements/../../../packages/service-library/requirements/_base.in
# -r requirements/_base.in
requests==2.32.3
# via opentelemetry-exporter-otlp-proto-http
rich==13.7.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import time
import zipfile
from collections.abc import Awaitable, Callable
from io import BytesIO
from io import IOBase
from pathlib import Path
from typing import Any, Final, TypedDict, cast

import aiofiles
import aiofiles.tempfile
import fsspec # type: ignore[import-untyped]
import repro_zipfile # type: ignore[import-untyped]
from pydantic import ByteSize, FileUrl, parse_obj_as
from pydantic.networks import AnyUrl
from servicelib.logging_utils import LogLevelInt, LogMessageStr
Expand All @@ -33,7 +34,7 @@ def _file_progress_cb(
log_publishing_cb: LogPublishingCB,
text_prefix: str,
main_loop: asyncio.AbstractEventLoop,
**kwargs,
**kwargs, # noqa: ARG001
):
asyncio.run_coroutine_threadsafe(
log_publishing_cb(
Expand Down Expand Up @@ -78,7 +79,7 @@ def _s3fs_settings_from_s3_settings(s3_settings: S3Settings) -> S3FsSettingsDict
return s3fs_settings


def _file_chunk_streamer(src: BytesIO, dst: BytesIO):
def _file_chunk_streamer(src: IOBase, dst: IOBase):
data = src.read(CHUNK_SIZE)
segment_len = dst.write(data)
return (data, segment_len)
Expand All @@ -98,6 +99,8 @@ async def _copy_file(
with fsspec.open(src_url, mode="rb", **src_storage_kwargs) as src_fp, fsspec.open(
dst_url, "wb", **dst_storage_kwargs
) as dst_fp:
assert isinstance(src_fp, IOBase) # nosec
assert isinstance(dst_fp, IOBase) # nosec
file_size = getattr(src_fp, "size", None)
data_read = True
total_data_written = 0
Expand Down Expand Up @@ -159,7 +162,7 @@ async def pull_file_from_remote(
if src_mime_type == _ZIP_MIME_TYPE and target_mime_type != _ZIP_MIME_TYPE:
await log_publishing_cb(f"Uncompressing '{dst_path.name}'...", logging.INFO)
logger.debug("%s is a zip file and will be now uncompressed", dst_path)
with zipfile.ZipFile(dst_path, "r") as zip_obj:
with repro_zipfile.ReproducibleZipFile(dst_path, "r") as zip_obj:
await asyncio.get_event_loop().run_in_executor(
None, zip_obj.extractall, dst_path.parents[0]
)
Expand Down Expand Up @@ -248,7 +251,8 @@ async def push_file_to_remote(
f"Compressing '{src_path.name}' to '{archive_file_path.name}'...",
logging.INFO,
)
with zipfile.ZipFile(

with repro_zipfile.ReproducibleZipFile(
archive_file_path, mode="w", compression=zipfile.ZIP_STORED
) as zfp:
await asyncio.get_event_loop().run_in_executor(
Expand Down
72 changes: 72 additions & 0 deletions services/dask-sidecar/tests/unit/test_file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# pylint: disable=unused-variable

import asyncio
import hashlib
import mimetypes
import zipfile
from collections.abc import AsyncIterable
Expand Down Expand Up @@ -375,3 +376,74 @@ async def test_pull_compressed_zip_file_from_remote(
assert file.exists()
assert file.name in file_names_within_zip_file
mocked_log_publishing_cb.assert_called()


def _compute_hash(file_path: Path) -> str:
with file_path.open("rb") as file_to_hash:
file_hash = hashlib.sha256()
chunk = file_to_hash.read(8192)
while chunk:
file_hash.update(chunk)
chunk = file_to_hash.read(8192)

return file_hash.hexdigest()


async def test_push_file_to_remote_creates_reproducible_zip_archive(
remote_parameters: StorageParameters,
tmp_path: Path,
faker: Faker,
mocked_log_publishing_cb: mock.AsyncMock,
):
destination_url1 = parse_obj_as(AnyUrl, f"{remote_parameters.remote_file_url}1.zip")
destination_url2 = parse_obj_as(AnyUrl, f"{remote_parameters.remote_file_url}2.zip")
src_path = tmp_path / faker.file_name()
TEXT_IN_FILE = faker.text()
src_path.write_text(TEXT_IN_FILE)
assert src_path.exists()

# pushing 2 times should produce the same archive with the same hash
await push_file_to_remote(
src_path,
destination_url1,
mocked_log_publishing_cb,
remote_parameters.s3_settings,
)
await asyncio.sleep(
5
) # NOTE: we wait a bit to ensure the created zipfile has a different creation time (that is normally used for computing the hash)
await push_file_to_remote(
src_path,
destination_url2,
mocked_log_publishing_cb,
remote_parameters.s3_settings,
)

# now we pull both file and compare their hash

# USE-CASE 1: if destination is a zip then no decompression is done
download_folder = tmp_path / "download"
download_folder.mkdir(parents=True, exist_ok=True)
assert download_folder.exists()
dst_path1 = download_folder / f"{faker.file_name()}1.zip"
dst_path2 = download_folder / f"{faker.file_name()}2.zip"

await pull_file_from_remote(
src_url=destination_url1,
target_mime_type=None,
dst_path=dst_path1,
log_publishing_cb=mocked_log_publishing_cb,
s3_settings=remote_parameters.s3_settings,
)
assert dst_path1.exists()

await pull_file_from_remote(
src_url=destination_url2,
target_mime_type=None,
dst_path=dst_path2,
log_publishing_cb=mocked_log_publishing_cb,
s3_settings=remote_parameters.s3_settings,
)
assert dst_path2.exists()

assert _compute_hash(dst_path1) == _compute_hash(dst_path2)

0 comments on commit 4c5aa41

Please sign in to comment.