From 15d1ceef1e11c2c38122145c683b945df69c6e28 Mon Sep 17 00:00:00 2001 From: Andreas Albert Date: Fri, 8 Sep 2023 11:05:58 +0200 Subject: [PATCH] Extend test to cover double upload bug --- quetz/tests/api/test_main_packages.py | 153 +++++++++++++++++--------- 1 file changed, 103 insertions(+), 50 deletions(-) diff --git a/quetz/tests/api/test_main_packages.py b/quetz/tests/api/test_main_packages.py index cc8f308f..1723df2a 100644 --- a/quetz/tests/api/test_main_packages.py +++ b/quetz/tests/api/test_main_packages.py @@ -1,9 +1,10 @@ import hashlib import json import os +import shutil import time from pathlib import Path -from typing import BinaryIO, Callable, Union +from typing import BinaryIO, Callable, Tuple, Union import pytest @@ -333,12 +334,12 @@ def _upload_file_1( auth_client, public_channel, public_package, - package_filename: str, + filepath: Path, force: bool = False, ): """Upload a file using /channels/{channel_name}/packages/{package_name}/files/""" - with open(package_filename, "rb") as fid: - files = {"files": (package_filename, fid)} + with open(filepath, "rb") as fid: + files = {"files": (filepath.name, fid)} response = auth_client.post( f"/api/channels/{public_channel.name}/packages/" f"{public_package.name}/files/", @@ -352,15 +353,15 @@ def _upload_file_2( auth_client, public_channel, public_package, - package_filename: str, + filepath: Path, force: bool = False, ): """Upload a file using /channels/{channel_name}/upload/{file_name}""" - with open(package_filename, "rb") as fid: + with open(filepath, "rb") as fid: body_bytes = fid.read() response = auth_client.post( - f"/api/channels/{public_channel.name}/upload/{package_filename}", + f"/api/channels/{public_channel.name}/upload/{filepath.name}", content=body_bytes, params={"force": force, "sha256": hashlib.sha256(body_bytes).hexdigest()}, ) @@ -384,7 +385,7 @@ def test_upload_package_version_wrong_filename( os.rename("test-package-0.1-0.tar.bz2", package_filename) response = upload_function( - auth_client, public_channel, public_package, package_filename + auth_client, public_channel, public_package, Path(package_filename) ) package_dir = Path(pkgstore.channels_dir) / public_channel.name / 'linux-64' @@ -397,12 +398,14 @@ def test_upload_package_version_wrong_filename( assert not os.path.exists(package_dir) -def sha256(path: Union[Path, str]) -> str: +def sha_and_md5(path: Union[Path, str]) -> Tuple[str, str]: sha = hashlib.sha256() + md5 = hashlib.md5() with open(path, "rb") as f: for chunk in iter(lambda: f.read(2**16), b""): sha.update(chunk) - return sha.hexdigest() + md5.update(chunk) + return sha.hexdigest(), md5.hexdigest() @pytest.mark.parametrize("upload_function", [_upload_file_1, _upload_file_2]) @@ -416,78 +419,128 @@ def test_upload_duplicate_package_version( config, remove_package_versions, upload_function: Callable, + tmp_path, ): pkgstore = config.get_package_store() - package_filename = "test-package-0.1-0.tar.bz2" - package_filename_copy = "test-package-0.1-0_copy.tar.bz2" + def get_repodata(): + """Helper function to read repo data""" + package_dir = Path(pkgstore.channels_dir) / public_channel.name / 'linux-64' + return json.loads((package_dir / 'repodata.json').read_text()) + + # Test setup: path1 is a package we will upload + path1 = Path(__file__).parent.parent / "data" / "test-package-0.1-0.tar.bz2" + + # To test duplicate uploads, we have a second file `_copy`, which is the same + # package and version but has a different content, so different hashes + # We must move this file to a temporary directory so that we can give it the same + # name as the first file. + path2 = tmp_path / path1.name + shutil.copyfile( + Path(__file__).parent.parent / "data" / "test-package-0.1-0_copy.tar.bz2", path2 + ) - file_sha = sha256(package_filename) - file_copy_sha = sha256(package_filename_copy) + # Sanity checks + sha1, md51 = sha_and_md5(path1) + sha2, md52 = sha_and_md5(path2) + assert (sha1 != sha2) and ( + md51 != md52 + ), "Sanity check failure: Test files have same hash" assert ( - file_sha != file_copy_sha - ), "Sanity check: Test files must have different hashes for this test." - file_size = os.path.getsize(package_filename) - file_copy_size = os.path.getsize(package_filename_copy) + path1.name == path2.name + ), "Sanity check failure: Test files have different name" + + size1 = os.path.getsize(path1) + size2 = os.path.getsize(path2) assert ( - file_size != file_copy_size - ), "Sanity check: Test files must have different sizes for this test." + size1 != size2 + ), "Sanity check failure: Test files must have different sizes for this test." - upload_function(auth_client, public_channel, public_package, package_filename) + # First upload + # File should not exist + assert not pkgstore.file_exists(public_channel.name, f"linux-64/{path1.name}") + response = upload_function(auth_client, public_channel, public_package, path1) - def get_repodata(): - """Helper function to read repo data""" - package_dir = Path(pkgstore.channels_dir) / public_channel.name / 'linux-64' - return json.loads((package_dir / 'repodata.json').read_text()) + # Expect success + assert response.status_code == 201 - repodata_init = get_repodata() - assert repodata_init["packages"][package_filename]["sha256"] == file_sha + # Check meta data is OK + repodata_after_first = get_repodata() + assert repodata_after_first["packages"][path1.name]["sha256"] == sha1 + assert repodata_after_first["packages"][path1.name]["md5"] == md51 + assert repodata_after_first["packages"][path1.name]["size"] == size1 - # Try submitting the same package without 'force' flag - response = upload_function( - auth_client, public_channel, public_package, package_filename + # Check that the file in the store is OK + file_in_store = ( + Path(pkgstore.channels_dir) / public_channel.name / 'linux-64' / path1.name ) + assert size1 == os.path.getsize(file_in_store) + assert (sha1, md51) == sha_and_md5(file_in_store) + + # Second upload: File with same name but different content, without force + response = upload_function(auth_client, public_channel, public_package, path2) - # Expect 409 here + # Expect 409 since the file already exists assert response.status_code == 409 detail = response.json()['detail'] assert "Duplicate" in detail - # Change the archive to test force update - os.remove(package_filename) - os.rename(package_filename_copy, package_filename) + # Check meta data is OK: It should not have changed with respect to before + repodata_after_second = get_repodata() + assert repodata_after_second == repodata_after_first + # Check that the file in the store is OK + file_in_store = ( + Path(pkgstore.channels_dir) / public_channel.name / 'linux-64' / path1.name + ) + + # File in store should not be the second file + assert not ( + (sha2, md52) == sha_and_md5(file_in_store) + ), "Duplicate upload without force updated stored file." + assert not ( + size2 == os.path.getsize(file_in_store) + ), "Duplicate upload without force updated stored file." + + # File in store should be the first file + assert size1 == os.path.getsize( + file_in_store + ), "Duplicate upload without force updated stored file." + assert (sha1, md51) == sha_and_md5( + file_in_store + ), "Duplicate upload without force updated stored file." + + # Third upload: Same as second but now with force # Ensure the 'time_modified' value changes in repodata.json time.sleep(1) # Submit the same package with 'force' flag response = upload_function( - auth_client, public_channel, public_package, package_filename, force=True + auth_client, public_channel, public_package, path2, force=True ) assert response.status_code == 201 # Check that repodata content has been updated - repodata = get_repodata() + repodata_after_force = get_repodata() # Info should match - assert repodata_init["info"] == repodata["info"] + assert repodata_after_first["info"] == repodata_after_force["info"] # Package keys should match - assert repodata_init["packages"].keys() == repodata["packages"].keys() - repodata_init_pkg = repodata_init["packages"][package_filename] - repodata_pkg = repodata["packages"][package_filename] - - # Hashes should match pre-upload expectation - assert repodata_init_pkg["sha256"] == file_sha - assert repodata_pkg["sha256"] == file_copy_sha + assert ( + repodata_after_first["packages"].keys() + == repodata_after_force["packages"].keys() + ) - # Sizes should match pre-upload expectation - assert repodata_init_pkg["size"] == file_size - assert repodata_pkg["size"] == file_copy_size + # Hashes should now have changed to the second file + assert repodata_after_force["packages"][path1.name]["sha256"] == sha2 + assert repodata_after_force["packages"][path1.name]["md5"] == md52 + assert repodata_after_force["packages"][path1.name]["size"] == size2 - # Upload-related metadata should not match - for key in "time_modified", "md5", "sha256": - assert repodata_init_pkg[key] != repodata_pkg[key] + assert ( + repodata_after_force["packages"][path1.name]["time_modified"] + > repodata_after_first["packages"][path1.name]["time_modified"] + ) @pytest.mark.parametrize("package_name", ["test-package"])