Skip to content

Commit

Permalink
🐛 Ensure chunk file download (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
bisgaard-itis authored Nov 11, 2024
1 parent 80b81c5 commit bcc2567
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 84 deletions.
1 change: 1 addition & 0 deletions clients/python/requirements/e2e-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ ipykernel
ipython
jinja2
matplotlib
memory_profiler
packaging
pandas
papermill
Expand Down
1 change: 1 addition & 0 deletions clients/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"tenacity",
"tqdm>=4.48.0",
f"osparc_client=={VERSION}",
"aiofiles",
]

SETUP = dict(
Expand Down
69 changes: 51 additions & 18 deletions clients/python/src/osparc/_api_files_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
import json
import logging
import math
import random
import shutil
import string
from pathlib import Path
from typing import Any, Iterator, List, Optional, Tuple, Union

Expand All @@ -28,6 +25,10 @@
FileUploadData,
UploadedPart,
)
from urllib.parse import urljoin
import aiofiles
from tempfile import NamedTemporaryFile
import shutil
from ._utils import (
DEFAULT_TIMEOUT_SECONDS,
PaginationGenerator,
Expand Down Expand Up @@ -65,25 +66,57 @@ def __getattr__(self, name: str) -> Any:
return super().__getattribute__(name)

def download_file(
self, file_id: str, *, destination_folder: Optional[Path] = None, **kwargs
self,
file_id: str,
*,
destination_folder: Optional[Path] = None,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
**kwargs,
) -> str:
return asyncio.run(
self.download_file_async(
file_id=file_id,
destination_folder=destination_folder,
timeout_seconds=timeout_seconds,
**kwargs,
)
)

async def download_file_async(
self,
file_id: str,
*,
destination_folder: Optional[Path] = None,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
**kwargs,
) -> str:
if destination_folder is not None and not destination_folder.is_dir():
raise RuntimeError(
f"destination_folder: {destination_folder} must be a directory"
)
downloaded_file: Path = Path(super().download_file(file_id, **kwargs))
if destination_folder is not None:
dest_file: Path = destination_folder / downloaded_file.name
while dest_file.is_file():
new_name = (
downloaded_file.stem
+ "".join(random.choices(string.ascii_letters, k=8))
+ downloaded_file.suffix
async with aiofiles.tempfile.NamedTemporaryFile(
mode="wb",
delete=False,
) as downloaded_file:
async with AsyncHttpClient(
configuration=self.api_client.configuration, timeout=timeout_seconds
) as session:
url = urljoin(
self.api_client.configuration.host, f"/v0/files/{file_id}/content"
)
dest_file = destination_folder / new_name
shutil.move(downloaded_file, dest_file)
downloaded_file = dest_file
return str(downloaded_file.resolve())
async for response in await session.stream(
"GET", url=url, auth=self._auth, follow_redirects=True
):
response.raise_for_status()
async for chunk in response.aiter_bytes():
await downloaded_file.write(chunk)
dest_file = f"{downloaded_file.name}"
if destination_folder is not None:
dest_file = NamedTemporaryFile(dir=destination_folder, delete=False).name
shutil.move(
f"{downloaded_file.name}", dest_file
) # aiofiles doesnt seem to have an async variant of this
return dest_file

def upload_file(
self,
Expand All @@ -105,7 +138,7 @@ async def upload_file_async(
file = Path(file)
if not file.is_file():
raise RuntimeError(f"{file} is not a file")
checksum: str = compute_sha256(file)
checksum: str = await compute_sha256(file)
for file_result in self._search_files(
sha256_checksum=checksum, timeout_seconds=timeout_seconds
):
Expand Down Expand Up @@ -159,7 +192,7 @@ async def upload_file_async(
)
async with AsyncHttpClient(
configuration=self.api_client.configuration,
request_type="post",
method="post",
url=links.abort_upload,
body=abort_body.to_dict(),
base_url=self.api_client.configuration.host,
Expand Down
42 changes: 39 additions & 3 deletions clients/python/src/osparc/_http_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
from contextlib import suppress
from datetime import datetime
from email.utils import parsedate_to_datetime
from typing import Any, Awaitable, Callable, Dict, Optional, Set
from typing import (
Any,
Awaitable,
Callable,
Dict,
Optional,
Set,
Literal,
AsyncGenerator,
)

import httpx
import tenacity
Expand All @@ -17,14 +26,14 @@ def __init__(
self,
*,
configuration: Configuration,
request_type: Optional[str] = None,
method: Optional[str] = None,
url: Optional[str] = None,
body: Optional[Dict] = None,
**httpx_async_client_kwargs,
):
self.configuration = configuration
self._client = httpx.AsyncClient(**httpx_async_client_kwargs)
self._callback = getattr(self._client, request_type) if request_type else None
self._callback = getattr(self._client, method) if method else None
self._url = url
self._body = body
if self._callback is not None:
Expand Down Expand Up @@ -77,6 +86,28 @@ async def _():

return await _()

async def _stream(
self, method: Literal["GET"], url: str, *args, **kwargs
) -> AsyncGenerator[httpx.Response, None]:
n_attempts = self.configuration.retries.total
assert isinstance(n_attempts, int)

@tenacity.retry(
reraise=True,
wait=self._wait_callback,
stop=tenacity.stop_after_attempt(n_attempts),
retry=tenacity.retry_if_exception_type(httpx.HTTPStatusError),
)
async def _() -> AsyncGenerator[httpx.Response, None]:
async with self._client.stream(
method=method, url=url, *args, **kwargs
) as response:
if response.status_code in self.configuration.retries.status_forcelist:
response.raise_for_status()
yield response

return _()

async def put(self, *args, **kwargs) -> httpx.Response:
return await self._request(self._client.put, *args, **kwargs)

Expand All @@ -92,6 +123,11 @@ async def patch(self, *args, **kwargs) -> httpx.Response:
async def get(self, *args, **kwargs) -> httpx.Response:
return await self._request(self._client.get, *args, **kwargs)

async def stream(
self, method: Literal["GET"], url: str, *args, **kwargs
) -> AsyncGenerator[httpx.Response, None]:
return await self._stream(method=method, url=url, *args, **kwargs)

def _wait_callback(self, retry_state: tenacity.RetryCallState) -> int:
assert retry_state.outcome is not None
if retry_state.outcome and retry_state.outcome.exception():
Expand Down
16 changes: 8 additions & 8 deletions clients/python/src/osparc/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
Solver,
Study,
)

import aiofiles
from ._exceptions import RequestError

_KB = 1024 # in bytes
Expand Down Expand Up @@ -87,15 +87,15 @@ async def file_chunk_generator(
bytes_read: int = 0
file_size: int = file.stat().st_size
while bytes_read < file_size:
with open(file, "rb") as f:
f.seek(bytes_read)
async with aiofiles.open(file, "rb") as f:
await f.seek(bytes_read)
nbytes = (
chunk_size
if (bytes_read + chunk_size <= file_size)
else (file_size - bytes_read)
)
assert nbytes > 0
chunk = await asyncio.get_event_loop().run_in_executor(None, f.read, nbytes)
chunk = await f.read(nbytes)
yield chunk, nbytes
bytes_read += nbytes

Expand All @@ -109,16 +109,16 @@ async def _fcn_to_coro(callback: Callable[..., S], *args) -> S:
return result


def compute_sha256(file: Path) -> str:
async def compute_sha256(file: Path) -> str:
assert file.is_file()
sha256 = hashlib.sha256()
with open(file, "rb") as f:
async with aiofiles.open(file, "rb") as f:
while True:
data = f.read(100 * _KB)
data = await f.read(100 * _KB)
if not data:
break
sha256.update(data)
return sha256.hexdigest()
return sha256.hexdigest()


def dev_features_enabled() -> bool:
Expand Down
14 changes: 0 additions & 14 deletions clients/python/test/e2e/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,6 @@ def repo_version() -> Version:
return Version(version_file.read_text())


def skip_if_no_dev_features(test):
if (
Version(osparc.__version__) < repo_version()
or not osparc_dev_features_enabled()
):
return pytest.mark.skip(
(
f"{osparc.__version__=}<{str(repo_version)} "
f"or {osparc_dev_features_enabled()=}"
)
)(test)
return test


def skip_if_osparc_version(
*,
at_least: Optional[Version] = None,
Expand Down
38 changes: 19 additions & 19 deletions clients/python/test/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,14 @@
from numpy import random
from packaging.version import Version
from pydantic import ByteSize
from typing import Callable

try:
from osparc._settings import ConfigurationEnvVars
except ImportError:
pass


_KB: ByteSize = ByteSize(1024) # in bytes
_MB: ByteSize = ByteSize(_KB * 1024) # in bytes
_GB: ByteSize = ByteSize(_MB * 1024) # in bytes


# Dictionary to store start times of tests
_test_start_times = {}

Expand Down Expand Up @@ -133,20 +129,24 @@ def async_client() -> Iterable[AsyncClient]:


@pytest.fixture
def tmp_file(tmp_path: Path, caplog: pytest.LogCaptureFixture) -> Path:
caplog.set_level(logging.INFO)
byte_size: ByteSize = 1 * _GB
tmp_file = tmp_path / "large_test_file.txt"
ss: random.SeedSequence = random.SeedSequence()
logging.info("Entropy used to generate random file: %s", f"{ss.entropy}")
rng: random.Generator = random.default_rng(ss)
tmp_file.write_bytes(rng.bytes(1000))
with open(tmp_file, "wb") as f:
f.truncate(byte_size)
assert (
tmp_file.stat().st_size == byte_size
), f"Could not create file of size: {byte_size}"
return tmp_file
def create_tmp_file(
tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> Callable[[ByteSize], Path]:
def _generate_file(file_size: ByteSize):
caplog.set_level(logging.INFO)
tmp_file = tmp_path / "large_test_file.txt"
ss: random.SeedSequence = random.SeedSequence()
logging.info("Entropy used to generate random file: %s", f"{ss.entropy}")
rng: random.Generator = random.default_rng(ss)
tmp_file.write_bytes(rng.bytes(1000))
with open(tmp_file, "wb") as f:
f.truncate(file_size)
assert (
tmp_file.stat().st_size == file_size
), f"Could not create file of size: {file_size}"
return tmp_file

return _generate_file


@pytest.fixture
Expand Down
Loading

0 comments on commit bcc2567

Please sign in to comment.