Skip to content

Commit

Permalink
Fix corruption during CONFLICT upload (#3499)
Browse files Browse the repository at this point in the history
* Fix corruption during CONFLICT upload

PBENCH-1219

b0.72 ARCHIVE server version

Large uploads can time out, causing the client (e.g., the 0.69 passthrough
server's dispatch) to retry. Eventually, this will result in an `OK` (200)
response, which is good. However if we retry before the original operation
finishes (it may be still running, despite the client timeout), we catch the
already existing "temporary intake directory" as a `CONFLICT` error.

Unfortunately, the cleanup logic doesn't recognize this distinction, and still
deleted the intake directory on exit. Timed correctly, this could break the
original upload: at best, it results in a noisy termination with complaints
that the existed temporary directory no longer exists.

Fix this problem by attempting to delete only when this API instance has
successfully created the temporary directory. Modify the `CONFLICT` unit test
case to reproduce the situation more accurately and additionally validate that
the directory still exists after completion.
  • Loading branch information
dbutenhof authored Jul 21, 2023
1 parent f8fb65d commit c307cb5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 16 deletions.
18 changes: 10 additions & 8 deletions lib/pbench/server/api/resources/upload_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon

attributes = {"access": access, "metadata": metadata}
filename = args.uri["filename"]
tmp_dir: Optional[Path] = None
intake_dir: Optional[Path] = None

try:
try:
Expand Down Expand Up @@ -248,13 +248,15 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon
try:
tmp_dir = self.temporary / md5sum
tmp_dir.mkdir()
except FileExistsError:
except FileExistsError as e:
raise CleanupTime(
HTTPStatus.CONFLICT,
"Temporary upload directory already exists",
)
tar_full_path = tmp_dir / filename
md5_full_path = tmp_dir / f"{filename}.md5"
"Dataset is currently being uploaded",
) from e
else:
intake_dir = tmp_dir
tar_full_path = intake_dir / filename
md5_full_path = intake_dir / f"{filename}.md5"

bytes_received = 0
usage = shutil.disk_usage(tar_full_path.parent)
Expand Down Expand Up @@ -523,9 +525,9 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon
else:
raise APIAbort(status, message) from e
finally:
if tmp_dir:
if intake_dir:
try:
shutil.rmtree(tmp_dir)
shutil.rmtree(intake_dir)
except Exception as e:
current_app.logger.warning("Error removing {}: {}", tmp_dir, str(e))

Expand Down
32 changes: 24 additions & 8 deletions lib/pbench/test/unit/server/test_upload.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from http import HTTPStatus
from logging import Logger
from pathlib import Path
from typing import Any
from typing import Any, Optional

from freezegun import freeze_time
import pytest
Expand Down Expand Up @@ -289,19 +289,33 @@ def test_empty_upload(
def test_temp_exists(
self, monkeypatch, client, tmp_path, server_config, pbench_drb_token
):
"""Test behavior of a conflicting upload
When the MD5-based temporary intake directory exists already, upload
will fail with CONFLICT. We want to verify that behavior, and that we
don't delete the existing directory during cleanup, which could
interfere with a running upload. This can happen, for example, when a
large upload times out and the client retries before the original is
finished.
"""
md5 = "d41d8cd98f00b204e9800998ecf8427e"
temp_path: Optional[Path] = None

def td_exists(self, *args, **kwargs):
"""Mock out Path.mkdir()
The trick here is that calling the UPLOAD API results in two calls
to Path.mkdir: one in the __init__ to be sure that ARCHIVE/UPLOAD
exists, and the second for the temporary subdirectory. We want the
first to succeed normally so we'll pass the call to the real mkdir
if the path doesn't end with our MD5 value.
exists, and the second for the temporary subdirectory. We want to
create both directories, but for the second (MD5-based intake temp)
we want to raise FileExistsError as if it had already existed, to
trigger the duplicate upload logic.
"""
retval = self.real_mkdir(*args, **kwargs)
if self.name != md5:
return self.real_mkdir(*args, **kwargs)
return retval
nonlocal temp_path
temp_path = self
raise FileExistsError(str(self))

filename = "tmp.tar.xz"
Expand All @@ -317,9 +331,11 @@ def td_exists(self, *args, **kwargs):
headers=self.gen_headers(pbench_drb_token, md5),
)
assert response.status_code == HTTPStatus.CONFLICT
assert (
response.json.get("message") == "Temporary upload directory already exists"
)

# Assert that we captured an intake temporary directory path and that
# the "duplicate" path wasn't deleted during API cleanup.
assert temp_path and temp_path.is_dir()
assert response.json.get("message") == "Dataset is currently being uploaded"
assert not self.cachemanager_created

@pytest.mark.parametrize(
Expand Down

0 comments on commit c307cb5

Please sign in to comment.