Skip to content

Commit

Permalink
Speedup promote in pkgpush (#3884)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexei Mochalov <[email protected]>
  • Loading branch information
sir-sigurd and nl0 authored Feb 23, 2024
1 parent 84867b0 commit 57095a7
Show file tree
Hide file tree
Showing 12 changed files with 392 additions and 73 deletions.
1 change: 1 addition & 0 deletions lambdas/pkgpush/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ where verb is one of

## Changes

- [Changed] Speed-up copying of large files during promotion ([#3884](https://github.com/quiltdata/quilt/pull/3884))
- [Changed] Bump quilt3 to set max_pool_connections, this improves performance ([#3870](https://github.com/quiltdata/quilt/pull/3870))
- [Changed] Compute multipart checksums ([#3402](https://github.com/quiltdata/quilt/pull/3402))
- [Added] Bootstrap the change log ([#3402](https://github.com/quiltdata/quilt/pull/3402))
4 changes: 2 additions & 2 deletions lambdas/pkgpush/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ python-dateutil==2.8.2
# via botocore
pyyaml==6.0.1
# via quilt3
quilt-shared[boto,pydantic,quilt] @ git+https://github.com/quiltdata/quilt@33b7e601baee6dff4ef32342b493f55d3dd16210#subdirectory=py-shared
quilt-shared[boto,pydantic,quilt] @ git+https://github.com/quiltdata/quilt@7a82dfcd869035c5e71a5cc7cd912af35d72515c#subdirectory=py-shared
# via t4_lambda_pkgpush (setup.py)
quilt3 @ git+https://github.com/quiltdata/quilt@299b1da851004386ab43423172c4405997fd9c53#subdirectory=api/python
quilt3 @ git+https://github.com/quiltdata/quilt@5c2b79128fe4d5d1e6093ff6a7d11d09d3315843#subdirectory=api/python
# via
# quilt-shared
# t4_lambda_pkgpush (setup.py)
Expand Down
4 changes: 2 additions & 2 deletions lambdas/pkgpush/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
"pydantic ~= 1.10",
(
"quilt3 @ git+https://github.com/quiltdata/quilt@"
"299b1da851004386ab43423172c4405997fd9c53"
"5c2b79128fe4d5d1e6093ff6a7d11d09d3315843"
"#subdirectory=api/python"
),
(
"quilt_shared[pydantic,boto,quilt] @ git+https://github.com/quiltdata/quilt@"
"33b7e601baee6dff4ef32342b493f55d3dd16210"
"7a82dfcd869035c5e71a5cc7cd912af35d72515c"
"#subdirectory=py-shared"
),
],
Expand Down
86 changes: 76 additions & 10 deletions lambdas/pkgpush/src/t4_lambda_pkgpush/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@
from quilt_shared.pkgpush import (
Checksum,
ChecksumResult,
CopyResult,
PackageConstructEntry,
PackagePromoteParams,
PackagePushParams,
PackagePushResult,
S3CopyLambdaParams,
S3HashLambdaParams,
S3ObjectDestination,
S3ObjectSource,
TopHash,
)
Expand All @@ -50,10 +53,12 @@
PROMOTE_PKG_MAX_FILES = int(os.environ["PROMOTE_PKG_MAX_FILES"])
MAX_BYTES_TO_HASH = int(os.environ["MAX_BYTES_TO_HASH"])
MAX_FILES_TO_HASH = int(os.environ["MAX_FILES_TO_HASH"])
# To dispatch separate, stack-created lambda function.
# To dispatch separate, stack-created lambda functions.
S3_HASH_LAMBDA = os.environ["S3_HASH_LAMBDA"]
S3_COPY_LAMBDA = os.environ["S3_COPY_LAMBDA"]
# CFN template guarantees S3_HASH_LAMBDA_CONCURRENCY concurrent invocation of S3 hash lambda without throttling.
S3_HASH_LAMBDA_CONCURRENCY = int(os.environ["S3_HASH_LAMBDA_CONCURRENCY"])
S3_COPY_LAMBDA_CONCURRENCY = int(os.environ["S3_COPY_LAMBDA_CONCURRENCY"])
S3_HASH_LAMBDA_MAX_FILE_SIZE_BYTES = int(os.environ["S3_HASH_LAMBDA_MAX_FILE_SIZE_BYTES"])

SERVICE_BUCKET = os.environ["SERVICE_BUCKET"]
Expand Down Expand Up @@ -89,24 +94,33 @@ def from_quilt_exception(cls, qe: quilt3.util.QuiltException):
return cls(name, {"details": qe.message})


def invoke_hash_lambda(pk: PhysicalKey, credentials: AWSCredentials) -> Checksum:
def invoke_lambda(*, function_name: str, params: pydantic.BaseModel, err_prefix: str):
resp = lambda_.invoke(
FunctionName=S3_HASH_LAMBDA,
Payload=S3HashLambdaParams(
credentials=credentials,
location=S3ObjectSource.from_pk(pk),
).json(exclude_defaults=True),
FunctionName=function_name,
Payload=params.json(exclude_defaults=True),
)

parsed = json.load(resp["Payload"])

if "FunctionError" in resp:
raise PkgpushException("S3HashLambdaUnhandledError", parsed)
raise PkgpushException(f"{err_prefix}UnhandledError", parsed)

if "error" in parsed:
raise PkgpushException("S3HashLambdaError", parsed["error"])
raise PkgpushException(f"{err_prefix}rror", parsed["error"])

return ChecksumResult(**parsed["result"]).checksum
return parsed["result"]


def invoke_hash_lambda(pk: PhysicalKey, credentials: AWSCredentials) -> Checksum:
result = invoke_lambda(
function_name=S3_HASH_LAMBDA,
params=S3HashLambdaParams(
credentials=credentials,
location=S3ObjectSource.from_pk(pk),
),
err_prefix="S3HashLambda",
)
return ChecksumResult(**result).checksum


def calculate_pkg_entry_hash(
Expand Down Expand Up @@ -148,6 +162,57 @@ def calculate_pkg_hashes(pkg: quilt3.Package):
f.result()


def invoke_copy_lambda(credentials: AWSCredentials, src: PhysicalKey, dst: PhysicalKey) -> str:
result = invoke_lambda(
function_name=S3_COPY_LAMBDA,
params=S3CopyLambdaParams(
credentials=credentials,
location=S3ObjectSource.from_pk(src),
target=S3ObjectDestination.from_pk(dst),
),
err_prefix="S3CopyLambda",
)
return CopyResult(**result).version


def copy_pkg_entry_data(
credentials: AWSCredentials,
src: PhysicalKey,
dst: PhysicalKey,
idx: int,
) -> T.Tuple[int, PhysicalKey]:
version_id = invoke_copy_lambda(credentials, src, dst)
return idx, PhysicalKey(bucket=dst.bucket, path=dst.path, version_id=version_id)


def copy_file_list(
file_list: T.List[T.Tuple[PhysicalKey, PhysicalKey, int]],
message=None,
callback=None,
) -> T.List[PhysicalKey]:
# TODO: Copy single part files directly, because using lambda for that just adds overhead,
# this can be done is a separate thread pool providing higher concurrency.
# TODO: Use checksums to deduplicate?
# Schedule longer tasks first so we don't end up waiting for a single long task.
file_list_enumerated = list(enumerate(file_list))
file_list_enumerated.sort(key=lambda x: x[1][2], reverse=True)

with concurrent.futures.ThreadPoolExecutor(max_workers=S3_COPY_LAMBDA_CONCURRENCY) as pool:
credentials = AWSCredentials.from_boto_session(user_boto_session)
fs = [
pool.submit(copy_pkg_entry_data, credentials, src, dst, idx)
for idx, (src, dst, _) in file_list_enumerated
]
results = [
f.result()
for f in concurrent.futures.as_completed(fs)
]
# Sort by idx to restore original order.
results.sort(key=lambda x: x[0])

return [x[1] for x in results]


# Isolated for test-ability.
get_user_boto_session = boto3.Session

Expand Down Expand Up @@ -304,6 +369,7 @@ def _push_pkg_to_successor(
# TODO: we use force=True to keep the existing behavior,
# but it should be re-considered.
force=True,
copy_file_list_fn=copy_file_list,
)
assert result._origin is not None
return PackagePushResult(top_hash=result._origin.top_hash)
Expand Down
2 changes: 2 additions & 0 deletions lambdas/pkgpush/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@ def pytest_configure(config):
str(2 ** 64), # Value big enough to serve as 'unlimited'.
),
S3_HASH_LAMBDA='s3-hash-lambda-name',
S3_COPY_LAMBDA='s3-copy-lambda-name',
S3_HASH_LAMBDA_CONCURRENCY='40',
S3_COPY_LAMBDA_CONCURRENCY='40',
)
45 changes: 45 additions & 0 deletions lambdas/pkgpush/tests/test_copy_file_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from unittest import mock

import t4_lambda_pkgpush
from quilt3.util import PhysicalKey


@mock.patch("t4_lambda_pkgpush.S3_COPY_LAMBDA_CONCURRENCY", 1)
def test_copy_file_list():
BUCKET = "bucket"
VERSION_ID = "version-id"
CREDENTIALS = t4_lambda_pkgpush.AWSCredentials(
key="a",
secret="b",
token="c",
)
ENTRIES = [
("a", 4),
("b", 5),
("c", 1),
]
ENTRIES = {
key: {
"src": PhysicalKey(BUCKET, key, "src-version"),
"dst": PhysicalKey(BUCKET, key, None),
"result": PhysicalKey(BUCKET, key, VERSION_ID),
"size": size,
}
for key, size in ENTRIES
}

with mock.patch("t4_lambda_pkgpush.invoke_copy_lambda", return_value=VERSION_ID) as invoke_copy_lambda_mock:
with mock.patch("t4_lambda_pkgpush.AWSCredentials.from_boto_session", return_value=CREDENTIALS):
# Check results has the same order as in supplied list.
assert t4_lambda_pkgpush.copy_file_list([(e["src"], e["dst"], e["size"]) for e in ENTRIES.values()]) == [
e["result"] for e in ENTRIES.values()
]
# Check that larger files are processed first.
assert invoke_copy_lambda_mock.call_args_list == [
mock.call(
CREDENTIALS,
e["src"],
e["dst"],
)
for e in map(ENTRIES.__getitem__, ["b", "a", "c"])
]
76 changes: 51 additions & 25 deletions lambdas/pkgpush/tests/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,31 +233,13 @@ def setup_s3_load_pkg_source(self):
},
)

def setup_s3(self, expected_pkg, *, copy_data):
def setup_s3(self, expected_pkg):
manifest = io.BytesIO()
expected_pkg.dump(manifest)
top_hash = expected_pkg.top_hash

self.setup_s3_load_pkg_source()

if copy_data:
for src, (lk, dst) in zip(self.entries.values(), expected_pkg.walk()):
self.s3_stubber.add_response(
method='copy_object',
service_response={
'VersionId': 'dst_' + src.physical_key.version_id,
},
expected_params={
'CopySource': {
'Bucket': src.physical_key.bucket,
'Key': src.physical_key.path,
'VersionId': src.physical_key.version_id,
},
'Bucket': self.dst_bucket,
'Key': f'{self.dst_pkg_name}/{lk}',
}
)

# Push new manifest.
self.s3_stubber.add_response(
'put_object',
Expand Down Expand Up @@ -337,10 +319,27 @@ def test(self):
with self.subTest(config_params=config_params, expected_copy_data=expected_copy_data):
expected_pkg = self.prepare_pkg(copy_data=expected_copy_data)
top_hash = expected_pkg.top_hash
self.setup_s3(expected_pkg=expected_pkg, copy_data=expected_copy_data)

with self.mock_successors({self.dst_registry: config_params}):
self.setup_s3(expected_pkg=expected_pkg)

with self.mock_successors({self.dst_registry: config_params}), \
mock.patch("t4_lambda_pkgpush.copy_file_list") as copy_file_list_mock:
copy_file_list_mock.return_value = [
e.physical_key
for lk, e in expected_pkg.walk()
]
response = self.make_request(params)
if expected_copy_data:
copy_file_list_mock.assert_called_once_with(
[
(
src.physical_key,
PhysicalKey(dst.physical_key.bucket, dst.physical_key.path, None),
src.size
)
for src, (lk, dst) in zip(self.entries.values(), expected_pkg.walk())
],
message=mock.ANY,
)
assert response == {
"result": {
'top_hash': top_hash,
Expand Down Expand Up @@ -404,7 +403,7 @@ def test_files_exceeded(self):
**self.dst_pkg_loc_params,
}
expected_pkg = self.prepare_pkg(copy_data=True)
self.setup_s3(expected_pkg=expected_pkg, copy_data=True)
self.setup_s3(expected_pkg=expected_pkg)

with self.mock_successors({self.dst_registry: {'copy_data': True}}), \
mock.patch(f't4_lambda_pkgpush.{self.max_files_const}', 1):
Expand Down Expand Up @@ -464,7 +463,7 @@ def test_copy_data(self):
**self.dst_pkg_loc_params,
}
expected_pkg = self.prepare_pkg(copy_data=True)
self.setup_s3(expected_pkg=expected_pkg, copy_data=True)
self.setup_s3(expected_pkg=expected_pkg)

with self.mock_successors({self.dst_registry: {'copy_data': True}}):
response = self.make_request(params)
Expand All @@ -486,7 +485,7 @@ def test_no_copy_data(self):
}
expected_pkg = self.prepare_pkg(copy_data=False)
top_hash = expected_pkg.top_hash
self.setup_s3(expected_pkg=expected_pkg, copy_data=False)
self.setup_s3(expected_pkg=expected_pkg)

with self.mock_successors({self.dst_registry: {'copy_data': False}}):
response = self.make_request(params)
Expand Down Expand Up @@ -980,3 +979,30 @@ def test_invoke_hash_lambda_error(self):
t4_lambda_pkgpush.invoke_hash_lambda(pk, CREDENTIALS)
assert excinfo.value.name == "S3HashLambdaUnhandledError"
lambda_client_stubber.assert_no_pending_responses()


def test_invoke_copy_lambda():
SRC_BUCKET = "src-bucket"
SRC_KEY = "src-key"
SRC_VERSION_ID = "src-version-id"
DST_BUCKET = "dst-bucket"
DST_KEY = "dst-key"
DST_VERSION_ID = "dst-version-id"

stubber = Stubber(t4_lambda_pkgpush.lambda_)
stubber.add_response(
"invoke",
{
"Payload": io.BytesIO(b'{"result": {"version": "%s"}}' % DST_VERSION_ID.encode())
}
)
stubber.activate()
try:
assert t4_lambda_pkgpush.invoke_copy_lambda(
CREDENTIALS,
PhysicalKey(SRC_BUCKET, SRC_KEY, SRC_VERSION_ID),
PhysicalKey(DST_BUCKET, DST_KEY, None),
) == DST_VERSION_ID
stubber.assert_no_pending_responses()
finally:
stubber.deactivate()
1 change: 1 addition & 0 deletions lambdas/s3hash/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ where verb is one of

## Changes

- [Added] Lambda handler for file copy ([#3884](https://github.com/quiltdata/quilt/pull/3884))
- [Changed] Compute multipart checksums ([#3402](https://github.com/quiltdata/quilt/pull/3402))
- [Added] Bootstrap the change log ([#3402](https://github.com/quiltdata/quilt/pull/3402))
11 changes: 3 additions & 8 deletions lambdas/s3hash/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ boto3==1.34.27
# quilt-shared
# quilt3
boto3-stubs[s3,sts]==1.34.31
# via
# boto3-stubs
# quilt-shared
# via quilt-shared
botocore==1.34.27
# via
# aiobotocore
Expand Down Expand Up @@ -85,10 +83,8 @@ python-dateutil==2.8.2
# via botocore
pyyaml==6.0.1
# via quilt3
quilt-shared[boto,pydantic,quilt] @ git+https://github.com/quiltdata/quilt@33b7e601baee6dff4ef32342b493f55d3dd16210#subdirectory=py-shared
# via
# quilt-shared
# t4_lambda_s3hash (setup.py)
quilt-shared[boto,pydantic,quilt] @ git+https://github.com/quiltdata/quilt@7a82dfcd869035c5e71a5cc7cd912af35d72515c#subdirectory=py-shared
# via t4_lambda_s3hash (setup.py)
quilt3==5.4.0
# via quilt-shared
referencing==0.33.0
Expand Down Expand Up @@ -120,7 +116,6 @@ types-aiobotocore[s3]==2.11.1
# via
# quilt-shared
# t4_lambda_s3hash (setup.py)
# types-aiobotocore
types-aiobotocore-s3==2.11.1
# via types-aiobotocore
types-awscrt==0.20.3
Expand Down
2 changes: 1 addition & 1 deletion lambdas/s3hash/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"types-aiobotocore[s3] ~= 2.11",
(
"quilt_shared[pydantic,boto,quilt] @ git+https://github.com/quiltdata/quilt@"
"33b7e601baee6dff4ef32342b493f55d3dd16210"
"7a82dfcd869035c5e71a5cc7cd912af35d72515c"
"#subdirectory=py-shared"
),
],
Expand Down
Loading

0 comments on commit 57095a7

Please sign in to comment.