Skip to content

Commit

Permalink
Use chunked SHA256 hashes calculated server-side by S3 (#2782)
Browse files Browse the repository at this point in the history
Co-authored-by: Dr. Ernie Prabhakar <[email protected]>
Co-authored-by: Sergey Fedoseev <[email protected]>
Co-authored-by: Alexei Mochalov <[email protected]>
  • Loading branch information
4 people authored Feb 27, 2024
1 parent 51a53c6 commit ef1161a
Show file tree
Hide file tree
Showing 8 changed files with 588 additions and 198 deletions.
333 changes: 255 additions & 78 deletions api/python/quilt3/data_transfer.py

Large diffs are not rendered by default.

111 changes: 76 additions & 35 deletions api/python/quilt3/packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
from . import util, workflows
from .backends import get_package_registry
from .data_transfer import (
calculate_sha256,
calculate_checksum,
calculate_checksum_bytes,
copy_file,
copy_file_list,
get_bytes,
get_size_and_version,
legacy_calculate_checksum,
legacy_calculate_checksum_bytes,
list_object_versions,
list_url,
put_bytes,
Expand Down Expand Up @@ -66,8 +69,12 @@
if MANIFEST_MAX_RECORD_SIZE is None:
MANIFEST_MAX_RECORD_SIZE = DEFAULT_MANIFEST_MAX_RECORD_SIZE

SHA256_HASH_NAME = 'SHA256'
SHA256_CHUNKED_HASH_NAME = 'sha2-256-chunked'

SUPPORTED_HASH_TYPES = (
"SHA256",
SHA256_HASH_NAME,
SHA256_CHUNKED_HASH_NAME,
)


Expand All @@ -77,7 +84,7 @@ def __call__(
file_list: T.List[T.Tuple[PhysicalKey, PhysicalKey, int]],
message: T.Optional[str] = None,
callback: T.Optional[T.Callable] = None,
) -> T.List[PhysicalKey]:
) -> T.List[T.Tuple[PhysicalKey, T.Optional[str]]]:
...


Expand Down Expand Up @@ -220,9 +227,17 @@ def _verify_hash(self, read_bytes):
"""
if self.hash is None:
raise QuiltException("Hash missing - need to build the package")
_check_hash_type_support(self.hash.get('type'))
digest = hashlib.sha256(read_bytes).hexdigest()
if digest != self.hash.get('value'):
hash_type = self.hash.get('type')
_check_hash_type_support(hash_type)

if hash_type == SHA256_CHUNKED_HASH_NAME:
expected_value = calculate_checksum_bytes(read_bytes)
elif hash_type == SHA256_HASH_NAME:
expected_value = legacy_calculate_checksum_bytes(read_bytes)
else:
assert False

if expected_value != self.hash.get('value'):
raise QuiltException("Hash validation failed")

def set(self, path=None, meta=None):
Expand Down Expand Up @@ -969,13 +984,13 @@ def _fix_sha256(self):
physical_keys.append(entry.physical_key)
sizes.append(entry.size)

results = calculate_sha256(physical_keys, sizes)
results = calculate_checksum(physical_keys, sizes)
exc = None
for entry, obj_hash in zip(self._incomplete_entries, results):
if isinstance(obj_hash, Exception):
exc = obj_hash
for entry, result in zip(self._incomplete_entries, results):
if isinstance(result, Exception):
exc = result
else:
entry.hash = dict(type='SHA256', value=obj_hash)
entry.hash = dict(type=SHA256_CHUNKED_HASH_NAME, value=result)
if exc:
incomplete_manifest_path = self._dump_manifest_to_scratch()
msg = "Unable to reach S3 for some hash values. Incomplete manifest saved to {path}."
Expand Down Expand Up @@ -1347,8 +1362,7 @@ def push(
Args:
name: name for package in registry
dest: where to copy the objects in the package. Must be either an S3 URI prefix (e.g., s3://$bucket/$key)
in the registry bucket, or a callable that takes logical_key, package_entry, and top_hash
and returns an S3 URI.
in the registry bucket, or a callable that takes logical_key and package_entry, and returns an S3 URI.
registry: registry where to create the new package
message: the commit message for the new package
selector_fn: An optional function that determines which package entries should be copied to S3.
Expand Down Expand Up @@ -1466,21 +1480,9 @@ def check_hash_conficts(latest_hash):
if not force:
check_hash_conficts(latest_hash)

self._fix_sha256()

pkg = self.__class__()
pkg._meta = self._meta
pkg._set_commit_message(message)
top_hash = self._calculate_top_hash(pkg._meta, self.walk())
pkg._origin = PackageRevInfo(str(registry.base), name, top_hash)

if dedupe and top_hash == latest_hash:
if print_info:
print(
f"Skipping since package with hash {latest_hash} already exists "
"at the destination and dedupe parameter is true."
)
return self

# Since all that is modified is physical keys, pkg will have the same top hash
file_list = []
Expand All @@ -1494,7 +1496,7 @@ def check_hash_conficts(latest_hash):
# Copy the datafiles in the package.
physical_key = entry.physical_key

new_physical_key = dest_fn(logical_key, entry, top_hash)
new_physical_key = dest_fn(logical_key, entry)
if (
physical_key.bucket == new_physical_key.bucket and
physical_key.path == new_physical_key.path
Expand All @@ -1507,12 +1509,29 @@ def check_hash_conficts(latest_hash):

results = copy_file_list_fn(file_list, message="Copying objects")

for (logical_key, entry), versioned_key in zip(entries, results):
for (logical_key, entry), (versioned_key, checksum) in zip(entries, results):
# Create a new package entry pointing to the new remote key.
assert versioned_key is not None
new_entry = entry.with_physical_key(versioned_key)
if checksum is not None:
new_entry.hash = dict(type=SHA256_CHUNKED_HASH_NAME, value=checksum)
pkg._set(logical_key, new_entry)

# Needed if the files already exist in S3, but were uploaded without ChecksumAlgorithm='SHA256'.
pkg._fix_sha256()

top_hash = pkg._calculate_top_hash(pkg._meta, pkg.walk())

if dedupe and top_hash == latest_hash:
if print_info:
print(
f"Skipping since package with hash {latest_hash} already exists "
"at the destination and dedupe parameter is true."
)
return self

pkg._origin = PackageRevInfo(str(registry.base), name, top_hash)

def physical_key_is_temp_file(pk):
if not pk.is_local():
return False
Expand Down Expand Up @@ -1683,26 +1702,48 @@ def verify(self, src, extra_files_ok=False):

src = PhysicalKey.from_url(fix_url(src))
src_dict = dict(list_url(src))

expected_hash_list = []
url_list = []
size_list = []

legacy_expected_hash_list = []
legacy_url_list = []
legacy_size_list = []

for logical_key, entry in self.walk():
src_size = src_dict.pop(logical_key, None)
if src_size is None:
return False
if entry.size != src_size:
if src_size is None or entry.size != src_size:
return False
entry_url = src.join(logical_key)
url_list.append(entry_url)
size_list.append(src_size)
hash_type = entry.hash['type']
hash_value = entry.hash['value']
if hash_type == SHA256_CHUNKED_HASH_NAME:
expected_hash_list.append(hash_value)
url_list.append(entry_url)
size_list.append(src_size)
elif hash_type == SHA256_HASH_NAME:
legacy_expected_hash_list.append(hash_value)
legacy_url_list.append(entry_url)
legacy_size_list.append(src_size)
else:
assert False, hash_type

if src_dict and not extra_files_ok:
return False

hash_list = calculate_sha256(url_list, size_list)
for (logical_key, entry), url_hash in zip(self.walk(), hash_list):
hash_list = calculate_checksum(url_list, size_list)
for expected_hash, url_hash in zip(expected_hash_list, hash_list):
if isinstance(url_hash, Exception):
raise url_hash
if expected_hash != url_hash:
return False

legacy_hash_list = legacy_calculate_checksum(legacy_url_list, legacy_size_list)
for expected_hash, url_hash in zip(legacy_expected_hash_list, legacy_hash_list):
if isinstance(url_hash, Exception):
raise url_hash
if entry.hash['value'] != url_hash:
if expected_hash != url_hash:
return False

return True
2 changes: 1 addition & 1 deletion api/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def run(self):
install_requires=[
'platformdirs>=2',
'aws-requests-auth>=0.4.2',
'boto3>=1.10.0',
'boto3>=1.21.7',
'jsonlines==1.2.0',
'PyYAML>=5.1',
'requests>=2.12.4',
Expand Down
Loading

0 comments on commit ef1161a

Please sign in to comment.