Skip to content

Commit

Permalink
pghoard,rohmu: add test cases to test rohmu's handling zero size file…
Browse files Browse the repository at this point in the history
…s [BF-2154]
  • Loading branch information
0xlianhu committed Sep 27, 2023
1 parent 21bc214 commit 3395066
Showing 1 changed file with 183 additions and 0 deletions.
183 changes: 183 additions & 0 deletions test/test_rohmu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# pylint: disable=attribute-defined-outside-init
import hashlib
from tempfile import NamedTemporaryFile
import os
import logging
from rohmu import rohmufile, get_transfer
from rohmu.rohmufile import create_sink_pipeline

from .base import (
CONSTANT_TEST_RSA_PRIVATE_KEY,
CONSTANT_TEST_RSA_PUBLIC_KEY
)

log = logging.getLogger(__name__)


def test_lzma_zero_size_file(tmp_path):
_test_rohmu_with_local_storage("lzma", 0, tmp_path)


def test_snappy_zero_size_file(tmp_path):
_test_rohmu_with_local_storage("snappy", 0, tmp_path)


def test_zstd_zero_size_file(tmp_path):
_test_rohmu_with_local_storage("zstd", 0, tmp_path)


def test_lzma_1byte_file(tmp_path):
_test_rohmu_with_local_storage("lzma", 1, tmp_path)


def test_snappy_1byte_file(tmp_path):
_test_rohmu_with_local_storage("snappy", 1, tmp_path)


def test_zstd_1byte_file(tmp_path):
_test_rohmu_with_local_storage("zstd", 1, tmp_path)


def _test_rohmu_with_local_storage(compress_algorithm: str,
file_size: int,
tmp_path):
hash_algorithm = "sha1"
compression_level = 0

# 0 - Prepare the file
work_dir = tmp_path
orig_file = work_dir / "hello.bin"
content = os.urandom(file_size)
with open(orig_file, "wb") as file_out:
file_out.write(content) # replace 1024 with a size in kilobytes if it is not unreasonably large

with open(orig_file, "rb") as file_in:
assert file_in.read() == content

# 1 - Compressed the file
compressed_filepath = work_dir / "compressed" / "hello_compressed"
compressed_filepath.parent.mkdir(exist_ok=True)
hasher = hashlib.new(hash_algorithm)
with (open(orig_file, "rb") as input_obj,
NamedTemporaryFile(dir=os.path.dirname(compressed_filepath),
prefix=os.path.basename(compressed_filepath),
suffix=".tmp-compress") as output_obj):
original_file_size, compressed_file_size = rohmufile.write_file(
data_callback=hasher.update,
input_obj=input_obj,
output_obj=output_obj,
compression_algorithm=compress_algorithm,
compression_level=compression_level,
rsa_public_key=CONSTANT_TEST_RSA_PUBLIC_KEY,
log_func=log.debug,
)
os.link(output_obj.name, compressed_filepath)

log.info(f"original_file_size: {original_file_size}, compressed_file_size: {compressed_file_size}")
assert original_file_size == len(content)
file_hash = hasher.hexdigest()
log.info(f"original_file_hash: {file_hash}")

# 2 - Upload the compressed file
upload_dir = work_dir / "uploaded"
upload_dir.mkdir()
storage_config = {
"directory": str(upload_dir),
"storage_type": "local",
}
metadata = {
"encryption-key-id": "No matter",
"compression-algorithm": compress_algorithm,
"compression-level": compression_level,
}
storage = get_transfer(storage_config)

metadata_copy = metadata.copy()
metadata_copy["Content-Length"] = str(compressed_file_size)
file_key = "compressed/hello_compressed"

def upload_progress_callback(n_bytes: int) -> None:
log.debug(f"File: '{file_key}', uploaded {n_bytes} bytes")

with open(compressed_filepath, "rb") as f:
storage.store_file_object(
file_key, f, metadata=metadata_copy, upload_progress_fn=upload_progress_callback
)

# 3 - Decrypt and decompress
# 3.1 Use file downloading rohmu API
decompressed_filepath = work_dir / "hello_decompressed_1"

decompressed_size = _download_and_decompress_with_file(storage, str(decompressed_filepath), file_key, metadata)
assert len(content) == decompressed_size
# Compare content
with open(decompressed_filepath, "rb") as file_in:
content_decrypted = file_in.read()
hasher = hashlib.new(hash_algorithm)
hasher.update(content_decrypted)
assert hasher.hexdigest() == file_hash
assert content_decrypted == content

# 3.2 Use rohmu SinkIO API
decompressed_filepath = work_dir / "hello_decompressed_2"
decompressed_size = _download_and_decompress_with_sink(storage, str(decompressed_filepath), file_key, metadata)
assert len(content) == decompressed_size

# Compare content
hasher.hexdigest()
with open(decompressed_filepath, "rb") as file_in:
content_decrypted = file_in.read()
hasher = hashlib.new(hash_algorithm)
hasher.update(content_decrypted)
assert hasher.hexdigest() == file_hash
assert content_decrypted == content

if file_size == 0:
empty_file_sha1 = "da39a3ee5e6b4b0d3255bfef95601890afd80709"
assert empty_file_sha1 == hasher.hexdigest()


def _key_lookup(key_id: str):
return CONSTANT_TEST_RSA_PRIVATE_KEY


def _download_and_decompress_with_sink(storage, output_path: str, file_key: str, metadata: dict):
data, _ = storage.get_contents_to_string(file_key)
if isinstance(data, str):
data = data.encode("latin1")
file_size = len(data)

with open(output_path, "wb") as target_file:
output = create_sink_pipeline(
output=target_file, file_size=file_size, metadata=metadata, key_lookup=_key_lookup, throttle_time=0
)
output.write(data)
decompressed_size = os.path.getsize(output_path)
return decompressed_size


def _download_and_decompress_with_file(storage, output_path: str, file_key: str, metadata: dict):
# Download the compressed file
file_download_path = output_path + ".tmp"

def download_progress_callback(bytes_written: int, input_size: int) -> None:
log.debug(f"File: '{file_key}', downloaded {bytes_written} of {input_size} bytes")

with open(file_download_path, "wb") as f:
storage.get_contents_to_fileobj(file_key, f, progress_callback=download_progress_callback)

# Decrypt and decompress
with (open(file_download_path, "rb") as input_obj,
open(output_path, "wb") as output_obj):
compressed_size, decompressed_size = rohmufile.read_file(
input_obj=input_obj,
output_obj=output_obj,
metadata=metadata,
key_lookup=_key_lookup,
log_func=log.debug,
)
output_obj.flush()

# Delete temporary file
os.unlink(file_download_path)
return decompressed_size

0 comments on commit 3395066

Please sign in to comment.