Skip to content

Commit

Permalink
feat(s3): raise an exception when the file cannot be access (#409)
Browse files Browse the repository at this point in the history
* feat(s3): raise an exception when the file cannot be access

* feat: trying to add more context to the exception raising

* Context + coverage

Signed-off-by: Luka Peschke <[email protected]>

* test: fix typo due to update on test

---------

Signed-off-by: Luka Peschke <[email protected]>
Co-authored-by: Luka Peschke <[email protected]>
  • Loading branch information
Sanix-Darker and lukapeschke authored Apr 19, 2023
1 parent a92fea1 commit 4a7d3aa
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 6 deletions.
10 changes: 7 additions & 3 deletions peakina/io/s3/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,17 @@ def parse_s3_url(url: str, file: bool = True) -> tuple[str | None, str | None, s


def _s3_open_file_with_retries(fs: s3fs.S3FileSystem, path: str, retries: int) -> Any:
for _ in range(retries):
nb_tries = 0
while nb_tries < retries:
try:
logger.info(f"opening {path}")
logger.info(f"Opening {path}")
file = fs.open(path)
return file
except Exception as ex:
logger.warning(f"could not open {path}: {ex}")
nb_tries += 1
if nb_tries >= retries:
raise Exception(f"Could not open {path} ({nb_tries} tries): {ex}") from ex
logger.warning(f"Could not open {path}: {ex}")
# if the file has just been uploaded, then it might not be visible immediatly
# but the fail to open has been cached by s3fs
# so, we invalidate the cache
Expand Down
2 changes: 1 addition & 1 deletion tests/io/s3/test_s3_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def invalidate_cache(self, path=None):
with s3_fetcher.open(filepath) as f:
# ensure logger doesn't log credentials
logger_mock.warning.assert_called_once_with(
"could not open mybucket/for_retry_0_0.csv: argh!"
"Could not open mybucket/for_retry_0_0.csv: argh!"
)
assert f.read() == b"a,b\n0,0\n0,1"
s3_client.delete_object(Bucket="mybucket", Key="tests/fixtures/for_retry_0_0.csv")
29 changes: 27 additions & 2 deletions tests/io/s3/test_s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from pytest import raises
from pytest_mock import MockerFixture

from peakina.io.s3.s3_utils import parse_s3_url as pu, s3_open
from peakina.io.s3.s3_utils import (
_s3_open_file_with_retries,
parse_s3_url as pu,
s3_open,
)


def test_parse_s3_url_no_credentials():
Expand Down Expand Up @@ -53,7 +57,7 @@ def test_s3_open(mocker):
fs_mock.open.return_value = io.BytesIO(b"a,b\n0,1\n")
tmpfile = s3_open("s3://my_key:my_secret@mybucket/file.csv")
# ensure logger doesn't log credentials
logger_mock.info.assert_called_once_with("opening mybucket/file.csv")
logger_mock.info.assert_called_once_with("Opening mybucket/file.csv")
assert tmpfile.name.endswith(".s3tmp")
assert tmpfile.read() == b"a,b\n0,1\n"

Expand Down Expand Up @@ -91,3 +95,24 @@ def test_s3_open_with_token(mocker: MockerFixture) -> None:
s3fs_file_system.assert_called_with(
secret="my_secret", key="my_key", token=None, client_kwargs=None
)


def test__s3_open_file_with_retries(mocker: MagicMock) -> None:
s3fs_mock = MagicMock()
side_effect = [
Exception("Can't open one"),
Exception("Can't open two"),
Exception("Can't open three"),
42,
]
s3fs_mock.open.side_effect = side_effect
path = "s3://my_key:my_secret@mybucket/file.csv"

with raises(Exception, match=r"\(2 tries\): Can't open two"):
_s3_open_file_with_retries(fs=s3fs_mock, path=path, retries=2)

s3fs_mock.open.side_effect = side_effect
with raises(Exception, match=r"\(3 tries\): Can't open three"):
_s3_open_file_with_retries(fs=s3fs_mock, path=path, retries=3)

assert _s3_open_file_with_retries(fs=s3fs_mock, path=path, retries=4) == 42

0 comments on commit 4a7d3aa

Please sign in to comment.