From 40db3b43099a2b87aa1a6b18f0b370f8bdb590c0 Mon Sep 17 00:00:00 2001 From: Sergey Fedoseev Date: Tue, 19 Nov 2024 12:11:02 +0100 Subject: [PATCH] Stop using S3 Select in indexer (#4212) Co-authored-by: Alexei Mochalov --- lambdas/indexer/CHANGELOG.md | 21 ++++++ lambdas/indexer/index.py | 97 ++++++++++++++++----------- lambdas/indexer/pytest.ini | 4 +- lambdas/indexer/test-requirements.txt | 1 + lambdas/indexer/test/test_index.py | 50 +------------- 5 files changed, 85 insertions(+), 88 deletions(-) create mode 100644 lambdas/indexer/CHANGELOG.md diff --git a/lambdas/indexer/CHANGELOG.md b/lambdas/indexer/CHANGELOG.md new file mode 100644 index 00000000000..c7ea99597d5 --- /dev/null +++ b/lambdas/indexer/CHANGELOG.md @@ -0,0 +1,21 @@ + +# Changelog + +Changes are listed in reverse chronological order (newer entries at the top). +The entry format is + +```markdown +- [Verb] Change description ([#](https://github.com/quiltdata/quilt/pull/)) +``` + +where verb is one of + +- Removed +- Added +- Fixed +- Changed + +## Changes + +- [Changed] Stop using S3 select ([#4212](https://github.com/quiltdata/quilt/pull/4212)) +- [Added] Bootstrap the change log ([#4212](https://github.com/quiltdata/quilt/pull/4212)) diff --git a/lambdas/indexer/index.py b/lambdas/indexer/index.py index 80b6861a11f..bb6a9422229 100644 --- a/lambdas/indexer/index.py +++ b/lambdas/indexer/index.py @@ -47,6 +47,7 @@ import datetime +import functools import json import os import pathlib @@ -92,7 +93,6 @@ POINTER_PREFIX_V1, get_available_memory, get_quilt_logger, - query_manifest_content, separated_env_to_iter, ) @@ -168,12 +168,7 @@ # currently only affects .parquet, TODO: extend to other extensions assert 'SKIP_ROWS_EXTS' in os.environ SKIP_ROWS_EXTS = separated_env_to_iter('SKIP_ROWS_EXTS') -SELECT_PACKAGE_META = "SELECT * from S3Object o WHERE o.version IS NOT MISSING LIMIT 1" -# No WHERE clause needed for aggregations since S3 Select skips missing fields for aggs -SELECT_PACKAGE_STATS = ( - "SELECT COALESCE(SUM(obj['size']), 0) as total_bytes," - " COUNT(obj['size']) as total_files from S3Object obj" -) +DUCKDB_SELECT_LAMBDA_ARN = os.environ["DUCKDB_SELECT_LAMBDA_ARN"] TEST_EVENT = "s3:TestEvent" # we need to filter out GetObject and HeadObject calls generated by the present # lambda in order to display accurate analytics in the Quilt catalog @@ -182,6 +177,7 @@ logger = get_quilt_logger() +s3_client = boto3.client("s3", config=botocore.config.Config(user_agent_extra=USER_AGENT_EXTRA)) def now_like_boto3(): @@ -247,13 +243,10 @@ def select_manifest_meta(s3_client, bucket: str, key: str): wrapper for retry and returning a string """ try: - raw = query_manifest_content( - s3_client, - bucket=bucket, - key=key, - sql_stmt=SELECT_PACKAGE_META - ) - return json.load(raw) + body = s3_client.get_object(Bucket=bucket, Key=key)["Body"] + with body: # this *might* be needed to close the stream ASAP + for line in body.iter_lines(): + return json.loads(line) except (botocore.exceptions.ClientError, json.JSONDecodeError) as cle: print(f"Unable to S3 select manifest: {cle}") @@ -439,7 +432,7 @@ def get_pkg_data(): first = select_manifest_meta(s3_client, bucket, manifest_key) if not first: return - stats = select_package_stats(s3_client, bucket, manifest_key) + stats = select_package_stats(bucket, manifest_key) if not stats: return @@ -472,33 +465,54 @@ def get_pkg_data(): return True -def select_package_stats(s3_client, bucket, manifest_key) -> str: +@functools.lru_cache(maxsize=None) +def get_bucket_region(bucket: str) -> str: + resp = s3_client.head_bucket(Bucket=bucket) + return resp["ResponseMetadata"]["HTTPHeaders"]["x-amz-bucket-region"] + + +@functools.lru_cache(maxsize=None) +def get_presigner_client(bucket: str): + return boto3.client( + "s3", + region_name=get_bucket_region(bucket), + config=botocore.config.Config(signature_version="s3v4"), + ) + + +def select_package_stats(bucket, manifest_key) -> Optional[dict]: """use s3 select to generate file stats for package""" logger_ = get_quilt_logger() - try: - raw_stats = query_manifest_content( - s3_client, - bucket=bucket, - key=manifest_key, - sql_stmt=SELECT_PACKAGE_STATS - ).read() - - if raw_stats: - stats = json.loads(raw_stats) - assert isinstance(stats['total_bytes'], int) - assert isinstance(stats['total_files'], int) - - return stats - - except ( - AssertionError, - botocore.exceptions.ClientError, - json.JSONDecodeError, - KeyError, - ) as err: - logger_.exception("Unable to compute package stats via S3 select") + presigner_client = get_presigner_client(bucket) + url = presigner_client.generate_presigned_url( + ClientMethod="get_object", + Params={ + "Bucket": bucket, + "Key": manifest_key, + }, + ) + lambda_ = make_lambda_client() + q = f""" + SELECT + COALESCE(SUM(size), 0) AS total_bytes, + COUNT(size) AS total_files FROM read_ndjson('{url}', columns={{size: 'UBIGINT'}}) obj + """ + resp = lambda_.invoke( + FunctionName=DUCKDB_SELECT_LAMBDA_ARN, + Payload=json.dumps({"query": q, "user_agent": f"DuckDB Select {USER_AGENT_EXTRA}"}), + ) - return None + payload = resp["Payload"].read() + if "FunctionError" in resp: + logger_.error("DuckDB select unhandled error: %s", payload) + return None + parsed = json.loads(payload) + if "error" in parsed: + logger_.error("DuckDB select error: %s", parsed["error"]) + return None + + rows = parsed["rows"] + return rows[0] if rows else None def extract_pptx(fileobj, max_size: int) -> str: @@ -732,6 +746,11 @@ def make_s3_client(): return boto3.client("s3", config=configuration) +@functools.lru_cache(maxsize=None) +def make_lambda_client(): + return boto3.client("lambda") + + def map_event_name(event: dict): """transform eventbridge names into S3-like ones""" input_ = event["eventName"] diff --git a/lambdas/indexer/pytest.ini b/lambdas/indexer/pytest.ini index dd07825516f..f9355a4fbaf 100644 --- a/lambdas/indexer/pytest.ini +++ b/lambdas/indexer/pytest.ini @@ -1,4 +1,6 @@ [pytest] +env = + DUCKDB_SELECT_LAMBDA_ARN = "arn:aws:lambda:us-west-2:123456789012:function:select-lambda" log_cli = True # This is set above critical to prevent logger events from confusing output in CI -log_level = 51 +log_level = 51 diff --git a/lambdas/indexer/test-requirements.txt b/lambdas/indexer/test-requirements.txt index e75e43e319b..b8fc13134ea 100644 --- a/lambdas/indexer/test-requirements.txt +++ b/lambdas/indexer/test-requirements.txt @@ -5,4 +5,5 @@ pluggy==0.9 py==1.10.0 pytest==4.4.0 pytest-cov==2.6.1 +pytest-env==0.6.2 responses==0.10.14 diff --git a/lambdas/indexer/test/test_index.py b/lambdas/indexer/test/test_index.py index c53e3bfa8de..05cc0c85a1f 100644 --- a/lambdas/indexer/test/test_index.py +++ b/lambdas/indexer/test/test_index.py @@ -23,7 +23,6 @@ import responses from botocore import UNSIGNED from botocore.client import Config -from botocore.exceptions import ParamValidationError from botocore.stub import Stubber from dateutil.tz import tzutc from document_queue import EVENT_PREFIX, RetryError @@ -979,7 +978,7 @@ def test_index_if_package_select_stats_fail(self, append_mock, select_meta_mock, ) select_meta_mock.assert_called_once_with(self.s3_client, bucket, manifest_key) - select_stats_mock.assert_called_once_with(self.s3_client, bucket, manifest_key) + select_stats_mock.assert_called_once_with(bucket, manifest_key) append_mock.assert_called_once_with({ "_index": bucket + PACKAGE_INDEX_SUFFIX, "_id": key, @@ -1023,7 +1022,7 @@ def test_index_if_package(self, append_mock, select_meta_mock, select_stats_mock ) select_meta_mock.assert_called_once_with(self.s3_client, bucket, manifest_key) - select_stats_mock.assert_called_once_with(self.s3_client, bucket, manifest_key) + select_stats_mock.assert_called_once_with(bucket, manifest_key) append_mock.assert_called_once_with({ "_index": bucket + PACKAGE_INDEX_SUFFIX, "_id": key, @@ -1182,51 +1181,6 @@ def test_extension_overrides(self): assert self._get_contents('foo.txt', '.txt') == "" assert self._get_contents('foo.ipynb', '.ipynb') == "" - @pytest.mark.xfail( - raises=ParamValidationError, - reason="boto bug https://github.com/boto/botocore/issues/1621", - strict=True, - ) - def test_stub_select_object_content(self): - """Demonstrate that mocking S3 select with boto3 is broken""" - sha_hash = "50f4d0fc2c22a70893a7f356a4929046ce529b53c1ef87e28378d92b884691a5" - manifest_key = f"{MANIFEST_PREFIX_V1}{sha_hash}" - # this SHOULD work, but due to botocore bugs it does not - self.s3_stubber.add_response( - method="select_object_content", - service_response={ - "ResponseMetadata": ANY, - # it is sadly not possible to mock S3 select responses because - # boto incorrectly believes "Payload"'s value should be a dict - # but it's really an iterable in realworld code - # see https://github.com/boto/botocore/issues/1621 - "Payload": [ - { - "Stats": {} - }, - { - "Records": { - "Payload": json.dumps(MANIFEST_DATA).encode(), - }, - }, - { - "End": {} - }, - ] - }, - expected_params={ - "Bucket": "test-bucket", - "Key": manifest_key, - "Expression": index.SELECT_PACKAGE_META, - "ExpressionType": "SQL", - "InputSerialization": { - 'JSON': {'Type': 'LINES'}, - 'CompressionType': 'NONE' - }, - "OutputSerialization": {'JSON': {'RecordDelimiter': '\n'}} - } - ) - def test_synthetic_copy_event(self): """check synthetic ObjectCreated:Copy event vs organic obtained on 26-May-2020 (bucket versioning on)