From 0aa8f6a1ab721711d33460d2040c88c4fb3d1f79 Mon Sep 17 00:00:00 2001 From: Phil Snyder Date: Tue, 27 Aug 2024 12:16:30 -0700 Subject: [PATCH 1/5] raw lambda initial commit --- .../develop/namespaced/lambda-raw-role.yaml | 13 +++ config/develop/namespaced/lambda-raw.yaml | 16 ++++ .../namespaced/sqs-dispatch-to-raw.yaml | 2 +- src/lambda_function/raw/README.md | 34 ++++++++ src/lambda_function/raw/app.py | 8 ++ src/lambda_function/raw/template.yaml | 70 ++++++++++++++++ templates/lambda-raw-role.yaml | 81 +++++++++++++++++++ 7 files changed, 223 insertions(+), 1 deletion(-) create mode 100644 config/develop/namespaced/lambda-raw-role.yaml create mode 100644 config/develop/namespaced/lambda-raw.yaml create mode 100644 src/lambda_function/raw/README.md create mode 100644 src/lambda_function/raw/app.py create mode 100644 src/lambda_function/raw/template.yaml create mode 100644 templates/lambda-raw-role.yaml diff --git a/config/develop/namespaced/lambda-raw-role.yaml b/config/develop/namespaced/lambda-raw-role.yaml new file mode 100644 index 00000000..b5acd4be --- /dev/null +++ b/config/develop/namespaced/lambda-raw-role.yaml @@ -0,0 +1,13 @@ +template: + path: lambda-raw-role.yaml +stack_name: "{{ stack_group_config.namespace }}-lambda-raw-role" +dependencies: + - develop/namespaced/sqs-dispatch-to-raw.yaml + - develop/s3-cloudformation-bucket.yaml + - develop/s3-raw-bucket.yaml +parameters: + SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-dispatch-to-raw::PrimaryQueueArn" + S3SourceBucketName: {{ stack_group_config.input_bucket_name }} + S3TargetBucketName: {{ stack_group_config.raw_bucket_name }} +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/lambda-raw.yaml b/config/develop/namespaced/lambda-raw.yaml new file mode 100644 index 00000000..e709a35b --- /dev/null +++ b/config/develop/namespaced/lambda-raw.yaml @@ -0,0 +1,16 @@ +template: + type: sam + path: src/lambda_function/raw/template.yaml + artifact_bucket_name: {{ stack_group_config.template_bucket_name }} + artifact_prefix: "{{ stack_group_config.namespace }}/src/lambda" +dependencies: + - develop/namespaced/lambda-raw-role.yaml + - develop/namespaced/sqs-dispatch-to-raw.yaml + - develop/s3-cloudformation-bucket.yaml + - develop/s3-raw-bucket.yaml +stack_name: "{{ stack_group_config.namespace }}-lambda-raw" +parameters: + RoleArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-raw-role::RoleArn" + SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-dispatch-to-raw::PrimaryQueueArn" + S3RawBucket: {{ stack_group_config.raw_bucket_name }} +stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/sqs-dispatch-to-raw.yaml b/config/develop/namespaced/sqs-dispatch-to-raw.yaml index b5c4638d..1ab3b45e 100644 --- a/config/develop/namespaced/sqs-dispatch-to-raw.yaml +++ b/config/develop/namespaced/sqs-dispatch-to-raw.yaml @@ -3,7 +3,7 @@ template: parameters: MessageRetentionPeriod: "1209600" ReceiveMessageWaitTimeSeconds: "20" - VisibilityTimeout: "120" + VisibilityTimeout: "900" SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-dispatch::SnsTopicArn" dependencies: - develop/namespaced/sns-dispatch.yaml diff --git a/src/lambda_function/raw/README.md b/src/lambda_function/raw/README.md new file mode 100644 index 00000000..12da1859 --- /dev/null +++ b/src/lambda_function/raw/README.md @@ -0,0 +1,34 @@ +# Raw Lambda + +The raw Lambda polls the dispatch-to-raw SQS queue and uploads an object to the raw S3 bucket. +Its purpose is to decompress a single file from an export, recompress that file, and store it to S3. + +## Development + +The Serverless Application Model Command Line Interface (SAM CLI) is an +extension of the AWS CLI that adds functionality for building and testing +Lambda applications. + +To use the SAM CLI, you need the following tools. + +* SAM CLI - [Install the SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) +* Docker - [Install Docker community edition](https://hub.docker.com/search/?type=edition&offering=community) + +You may need the following for local testing. +* [Python 3 installed](https://www.python.org/downloads/) + +You will also need to configure your AWS credentials, if you have not already done so. + +## Creating a local build + +Use the SAM CLI to build and test your lambda locally. +Build your application with the `sam build` command. + +```bash +cd src/lambda_function/raw/ +sam build +``` + +## Tests + +Tests are available in `tests/test_raw_lambda.py`. diff --git a/src/lambda_function/raw/app.py b/src/lambda_function/raw/app.py new file mode 100644 index 00000000..308461bf --- /dev/null +++ b/src/lambda_function/raw/app.py @@ -0,0 +1,8 @@ +import json + +import boto3 + + +def lambda_handler(event, context): + s3_client = boto3.client("s3") + return {"statusCode": 200, "body": json.dumps("Hello from Lambda!")} diff --git a/src/lambda_function/raw/template.yaml b/src/lambda_function/raw/template.yaml new file mode 100644 index 00000000..50efff22 --- /dev/null +++ b/src/lambda_function/raw/template.yaml @@ -0,0 +1,70 @@ +AWSTemplateFormatVersion: '2010-09-09' + +Transform: AWS::Serverless-2016-10-31 + +Description: > + SAM Template for the raw Lambda. The raw Lambda polls the dispatch-to-raw SQS + queue and uploads an object to the raw S3 bucket. Its purpose is to decompress + a single file from an export, recompress that file, and store it to S3. + +Parameters: + + RoleArn: + Type: String + Description: ARN of the raw Lambda role. + + SQSQueueArn: + Type: String + Description: ARN of the dispatch-to-raw SQS queue. + + S3RawBucket: + Type: String + Description: Name of the Raw S3 bucket. + + LambdaPythonVersion: + Type: String + Description: Python version to use for this lambda function + Default: "3.11" + + LambdaBatchSize: + Type: Number + Default: 1 + Description: >- + The maximum number of messages in an SQS event that Lambda will process in a batch + + LambdaMaximumBatchingWindowInSeconds: + Type: Number + Default: 300 + Description: >- + The maximum amount of time in seconds Lambda will batch messages before polling + the SQS queue and processing them + +Resources: + RawFunction: + Type: AWS::Serverless::Function + Properties: + PackageType: Zip + CodeUri: ./ + Handler: app.lambda_handler + Runtime: !Sub "python${LambdaPythonVersion}" + Role: !Ref RoleArn + MemorySize: 1024 + EphemeralStorage: + Size: 2048 + Timeout: 900 + Environment: + Variables: + RAW_S3_BUCKET: !Ref S3RawBucket + Events: + SQSEvent: + Type: SQS + Properties: + BatchSize: !Ref LambdaBatchSize + Queue: !Ref SQSQueueArn + +Outputs: + RawFunctionArn: + Description: Arn of the raw Lambda. + Value: !GetAtt RawFunction.Arn + Export: + Name: !Sub "${AWS::Region}-${AWS::StackName}-RawFunctionArn" diff --git a/templates/lambda-raw-role.yaml b/templates/lambda-raw-role.yaml new file mode 100644 index 00000000..f33e7255 --- /dev/null +++ b/templates/lambda-raw-role.yaml @@ -0,0 +1,81 @@ +AWSTemplateFormatVersion: '2010-09-09' + +Transform: AWS::Serverless-2016-10-31 + +Description: > + An IAM Role for the raw Lambda + +Parameters: + SQSQueueArn: + Type: String + Description: ARN of the SQS queue for lambda to poll messages from. + + S3SourceBucketName: + Type: String + Description: Name of the S3 bucket where exports are deposited. + + S3TargetBucketName: + Type: String + Description: Name of the S3 bucket where compressed JSON are written to. + +Resources: + RawRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - lambda.amazonaws.com + Action: + - sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + Policies: + - PolicyName: PollSQSQueue + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - sqs:DeleteMessage + - sqs:GetQueueAttributes + - sqs:ReceiveMessage + Resource: + - !Ref SQSQueueArn + - PolicyName: ReadS3 + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - s3:Get* + - s3:List* + Resource: + - !Sub arn:aws:s3:::${S3SourceBucketName} + - !Sub arn:aws:s3:::${S3SourceBucketName}/* + - PolicyName: WriteS3 + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - "s3:PutObject" + - "s3:PutObjectAcl" + - "s3:GetBucketLocation" + Resource: + - !Sub arn:aws:s3:::${S3TargetBucketName} + - !Sub arn:aws:s3:::${S3TargetBucketName}/* + +Outputs: + RoleName: + Value: !Ref RawRole + Export: + Name: !Sub '${AWS::Region}-${AWS::StackName}-RoleName' + + RoleArn: + Value: !GetAtt RawRole.Arn + Export: + Name: !Sub '${AWS::Region}-${AWS::StackName}-RoleArn' From a3bf5cb779a9043a760200d244536a133902c356 Mon Sep 17 00:00:00 2001 From: Phil Snyder Date: Tue, 27 Aug 2024 17:03:13 -0700 Subject: [PATCH 2/5] yield compressed data up to part threshold --- src/lambda_function/raw/app.py | 91 ++++++++++++++++++++++++++- src/lambda_function/raw/template.yaml | 2 +- 2 files changed, 90 insertions(+), 3 deletions(-) diff --git a/src/lambda_function/raw/app.py b/src/lambda_function/raw/app.py index 308461bf..b780c2e3 100644 --- a/src/lambda_function/raw/app.py +++ b/src/lambda_function/raw/app.py @@ -1,8 +1,95 @@ +import gzip +import io import json +import logging +import zipfile import boto3 +logger = logging.getLogger() +logger.setLevel(logging.INFO) -def lambda_handler(event, context): + +def lambda_handler(event: dict, context: dict): + """ + Entrypoint for this Lambda. + + Args: + event (dict) An SQS message from the dispatch-to-raw SQS queue. + context (dict): Information about the runtime environment and + the current invocation + """ s3_client = boto3.client("s3") - return {"statusCode": 200, "body": json.dumps("Hello from Lambda!")} + main(event=event, s3_client=s3_client) + + +def upload_part(gzip_stream: gzip.GzipFile): + pass + + +def yield_compressed_data(object_stream: io.BytesIO, path: str, part_threshold=None): + if part_threshold is None: + part_threshold = 50 * 1024 * 1024 # 50 MB + with zipfile.ZipFile(object_stream, "r") as zip_stream: + with zip_stream.open(path, "r") as json_stream: + compressed_data = io.BytesIO() + with gzip.GzipFile(fileobj=compressed_data, mode="wb") as gzip_file: + # Read/write the JSON file in 1 MB chunks + for chunk in iter(lambda: json_stream.read(1024 * 1024), b""): + gzip_file.write(chunk) + if compressed_data.tell() >= part_threshold: + yield compressed_data + compressed_data.seek(0) + compressed_data.truncate(0) + yield compressed_data + + +def main(event: dict, s3_client: boto3.client): + """ + This function should be invoked by `lambda_handler`. + + Args: + event (dict): The dispatch-to-raw SQS event. + The payload from the dispatch Lambda is in .["Records"][0]["body"]["Message"] + and contains the keys: + + * Bucket - The name of the S3 bucket + * Key - The S3 key + * Path - The path within the archive which identifies this file + * FileSize - The uncompressed size in bytes of this file + context (dict): Information about the runtime environment and + the current invocation + s3_client (botocore.client.S3): An S3 client + + Returns: + (None): Logs and uploads an object to the raw S3 bucket. + """ + for sqs_record in event["Records"]: + sns_notification = json.loads(sqs_record["body"]) + sns_message = json.loads(sns_notification["Message"]) + logger.info(f"Received SNS message: {sns_message}") + # Step 1: Stream the zip file from S3 + with io.BytesIO() as object_stream: + s3_client.download_fileobj( + Bucket=sns_message["Bucket"], + Key=sns_message["Key"], + Fileobj=object_stream, + ) + object_stream.seek(0) + for compressed_data in yield_compressed_data( + object_stream=object_stream, + path=sns_message["Path"], + part_threshold=102, + ): + logger.info(compressed_data.tell()) + object_stream.close() + + # # Define the S3 key for the gzipped file (you can customize this) + # gzip_key = f"{s3_details['Path']}.gz" + + # # Step 4: Upload the gzipped JSON file to S3 + # s3_client.upload_fileobj( + # compressed_data, s3_details["Bucket"], gzip_key + # ) + + # print(f"Uploaded gzipped file to s3://{s3_details['Bucket']}/{gzip_key}") diff --git a/src/lambda_function/raw/template.yaml b/src/lambda_function/raw/template.yaml index 50efff22..fe69cf83 100644 --- a/src/lambda_function/raw/template.yaml +++ b/src/lambda_function/raw/template.yaml @@ -24,7 +24,7 @@ Parameters: LambdaPythonVersion: Type: String Description: Python version to use for this lambda function - Default: "3.11" + Default: "3.9" LambdaBatchSize: Type: Number From efb79fe9a0cdace49d92a33bd488dd889c3a200b Mon Sep 17 00:00:00 2001 From: Phil Snyder Date: Thu, 29 Aug 2024 17:30:30 -0700 Subject: [PATCH 3/5] complete implementation and add tests --- config/develop/namespaced/lambda-raw.yaml | 1 + src/lambda_function/raw/app.py | 296 ++++++++++++++++++++-- src/lambda_function/raw/template.yaml | 5 + tests/test_lambda_raw.py | 162 ++++++++++++ 4 files changed, 437 insertions(+), 27 deletions(-) create mode 100644 tests/test_lambda_raw.py diff --git a/config/develop/namespaced/lambda-raw.yaml b/config/develop/namespaced/lambda-raw.yaml index e709a35b..fc4ddbf9 100644 --- a/config/develop/namespaced/lambda-raw.yaml +++ b/config/develop/namespaced/lambda-raw.yaml @@ -13,4 +13,5 @@ parameters: RoleArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-raw-role::RoleArn" SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-dispatch-to-raw::PrimaryQueueArn" S3RawBucket: {{ stack_group_config.raw_bucket_name }} + S3RawKeyPrefix: "{{ stack_group_config.namespace }}/json/" stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/src/lambda_function/raw/app.py b/src/lambda_function/raw/app.py index b780c2e3..92d64fe1 100644 --- a/src/lambda_function/raw/app.py +++ b/src/lambda_function/raw/app.py @@ -1,7 +1,26 @@ +""" +Raw Lambda + +A module for an AWS Lambda function that compresses JSON data contained in an +export (zip archive) from S3 and uploads it to the raw S3 bucket. This module +makes heavy use of Python file objects and multipart uploads and can +decompress/compress/upload with a relatively low, fixed memory overhead +with respect to the size of the uncompressed JSON. + +Example Usage: +The module is intended to be deployed as an AWS Lambda function that listens to +events from the `dispatch-to-raw` SQS queue. + +Environment Variables: +- `RAW_S3_BUCKET`: The raw S3 bucket name where compressed data will be stored. +- `RAW_S3_KEY_PREFIX`: The S3 key prefix within the raw bucket where data is written. +""" + import gzip import io import json import logging +import os import zipfile import boto3 @@ -20,31 +39,233 @@ def lambda_handler(event: dict, context: dict): the current invocation """ s3_client = boto3.client("s3") - main(event=event, s3_client=s3_client) + raw_bucket = os.environ.get("RAW_S3_BUCKET") + raw_key_prefix = os.environ.get("RAW_S3_KEY_PREFIX") + main( + event=event, + s3_client=s3_client, + raw_bucket=raw_bucket, + raw_key_prefix=raw_key_prefix, + ) + + +def construct_raw_key(path: str, key: str, raw_key_prefix: str): + """ + Constructs an S3 key for data to be uploaded to the raw S3 bucket. + + Args: + path (str): The relative file path of the JSON data within its zip archive + key (str): The S3 key of the export/zip archive, formatted as `{namespace}/{cohort}/{export_basename}`. + raw_key_prefix (str): The raw S3 bucket key prefix where all raw data is written. + Returns: + str: An S3 key in the format: + `{raw_key_prefix}/dataset={data_type}/cohort={cohort}/{basename}.ndjson.gz` + + Notes: + - The function expects the input `key` to be formatted as `{namespace}/{cohort}/{export_basename}`. + - The data type is derived from the first underscore-delimited component of the file basename. + """ + key_components = key.split("/") + # input bucket keys are formatted like `{namespace}/{cohort}/{export_basename}` + cohort = key_components[1] + file_basename = os.path.basename(path) + # The first underscore-delimited component of the JSON basename is the datatype + data_type = file_basename.split("_")[0] + raw_basename = f"{ os.path.splitext(file_basename)[0] }.ndjson.gz" + raw_key = os.path.join( + raw_key_prefix, + f"dataset={data_type}", + f"cohort={cohort}", + raw_basename, + ) + return raw_key -def upload_part(gzip_stream: gzip.GzipFile): - pass + +def upload_part( + s3_client: boto3.client, + body: bytes, + bucket: str, + key: str, + upload_id: str = None, + part_number: int = None, +): + """ + Uploads a part of data to an S3 object as part of a multipart upload. + + If an `upload_id` is not provided, the function initiates a new multipart + upload. Each part is identified by its ETag and part number, which are + required for the completion the multipart upload. + + Args: + s3_client (boto3.client): The Boto3 S3 client used to interact with AWS S3. + body (bytes): The data to be uploaded as a part, in bytes. + bucket (str): The name of the raw S3 bucket where the object is to be stored. + key (str): The S3 key of the object being uploaded. + upload_id (str, optional): The ID of an ongoing multipart upload. If not provided, + a new multipart upload is initiated. + part_number (int, optional): The part number for this chunk of the upload. + If not provided, defaults to 1 when initiating a new upload. + + Returns: + dict: A dictionary containing the following keys: + - part (dict): This dictionary must be included with the other parts + in a list when completing the multipart upload and is provided + in this format for convenience. A dictionary with keys: + * ETag (str): The ETag of this part. + * part_number (int) The associated part number. + - upload_id (str): The ID of the multipart upload. + - part_number (int): The part number associated with this upload part. + + Logs: + - Logs the start of a new multipart upload if `upload_id` is not provided. + - Logs the upload size and S3 destination for each part. + - Logs the response from the S3 upload operation, including the ETag. + + Example: + # Upload a part to an existing multipart upload + s3 = boto3.client('s3') + part = upload_part( + s3_client=s3, + body=b'some data chunk', + bucket='my-bucket', + key='my-object', + upload_id='existing-upload-id', + part_number=2 + ) + print(part) + # Output: + # { + # 'part': + # { + # 'ETag': 'etag-value', + # 'PartNumber': 2 + # }, + # 'upload_id': 'existing-upload-id', + # 'part_number': 2 + # } + + Notes: + - Parts must be larger than AWS minimum requirements (typically 5 MB), + excepting the last part. If an object only has one part, than it can + be any size. + """ + if upload_id is None: + multipart_upload = s3_client.create_multipart_upload( + Bucket=bucket, + Key=key, + ) + upload_id = multipart_upload["UploadId"] + part_number = 1 + logger.info(f"Began multipart upload {upload_id}") + logger.info(f"Uploading { len(body) } bytes to s3://{bucket}/{key}") + upload_response = s3_client.upload_part( + Body=body, + Bucket=bucket, + Key=key, + UploadId=upload_id, + PartNumber=part_number, + ) + logger.info(f"Upload part response: {upload_response}") + part_identifier = { + "ETag": upload_response["ETag"], + "PartNumber": part_number, + } + part_wrapper = { + "part": part_identifier, + "upload_id": upload_id, + "part_number": part_number, + } + return part_wrapper def yield_compressed_data(object_stream: io.BytesIO, path: str, part_threshold=None): + """ + A generator function which yields chunks of compressed JSON data. + + This function reads chunks from a JSON file in `object_stream`, + compresses the data using gzip, and yields the compressed data in chunks + that are at least as large as the `part_threshold`, except for the final + chunk, potentially. The compressed data is yielded as dictionaries containing + the compressed bytes and a chunk number to preserve ordinality. + + Args: + object_stream (io.BytesIO): A BytesIO object over a zip archive which contains + the JSON file to be compressed. + path (str): The path within the zip archive to the JSON file. + part_threshold (int, optional): The size threshold in bytes for each yielded + compressed chunk. Defaults to 8 MB (8 * 1024 * 1024 bytes). + + Yields: + dict: A dictionary containing the keys: + - data (bytes): The content of the compressed BytesIO object as bytes. + - chunk_number (int): The associated chunk number of the data. + + Notes: + - We use compression level 6 (out of 9). This is the default compression level + of the linux `gzip` tool. This is faster to write, but produces slightly (~10-20%) + larger files. + """ if part_threshold is None: - part_threshold = 50 * 1024 * 1024 # 50 MB + # 8 MB, supports up to 80 GB compressed multipart upload + part_threshold = 8 * 1024 * 1024 with zipfile.ZipFile(object_stream, "r") as zip_stream: with zip_stream.open(path, "r") as json_stream: compressed_data = io.BytesIO() - with gzip.GzipFile(fileobj=compressed_data, mode="wb") as gzip_file: - # Read/write the JSON file in 1 MB chunks - for chunk in iter(lambda: json_stream.read(1024 * 1024), b""): + # analogous to the part number of a multipart upload + chunk_number = 1 + with gzip.GzipFile( + filename=os.path.basename(path), + fileobj=compressed_data, + compresslevel=6, + mode="wb", + ) as gzip_file: + # We can expect at least 10x compression, so reading/writing the + # JSON in 10*part_threshold chunks ensures we do not flush the + # gzip buffer too often, which can slow the write process significantly. + compression_factor = 10 + for chunk in iter( + lambda: json_stream.read(compression_factor * part_threshold), b"" + ): gzip_file.write(chunk) + # .flush() ensures that .tell() gives us an accurate byte count, + gzip_file.flush() if compressed_data.tell() >= part_threshold: - yield compressed_data - compressed_data.seek(0) - compressed_data.truncate(0) - yield compressed_data + yield compressed_data_wrapper( + compressed_data=compressed_data, chunk_number=chunk_number + ) + compressed_data.seek(0) + compressed_data.truncate(0) + chunk_number = chunk_number + 1 + yield compressed_data_wrapper( + compressed_data=compressed_data, chunk_number=chunk_number + ) + + +def compressed_data_wrapper(compressed_data: io.BytesIO, chunk_number: int): + """ + A wrapper for the data produced as part of `yield_compressed_data` + The chunk number is useful for maintaining ordinality during a + multipart upload. -def main(event: dict, s3_client: boto3.client): + Args: + compressed_data (io.BytesIO): A BytesIO object containing the compressed data. + chunk_number (int): The chunk number associated with this segment of data. + + Returns: + dict: A dictionary containing the keys: + - data (bytes): The content of the compressed BytesIO object as bytes. + - chunk_number (int): The associated chunk number of the data. + """ + compressed_data_wrapper = { + "data": compressed_data.getvalue(), + "chunk_number": chunk_number, + } + return compressed_data_wrapper + + +def main(event: dict, s3_client: boto3.client, raw_bucket: str, raw_key_prefix: str): """ This function should be invoked by `lambda_handler`. @@ -60,15 +281,22 @@ def main(event: dict, s3_client: boto3.client): context (dict): Information about the runtime environment and the current invocation s3_client (botocore.client.S3): An S3 client + raw_bucket (str): The name of the raw S3 bucket + raw_key_prefix (str): The S3 prefix where we write our data to. Returns: - (None): Logs and uploads an object to the raw S3 bucket. + (None): Produces logs and uploads a gzipped JSON object to the raw S3 bucket. """ for sqs_record in event["Records"]: sns_notification = json.loads(sqs_record["body"]) sns_message = json.loads(sns_notification["Message"]) logger.info(f"Received SNS message: {sns_message}") - # Step 1: Stream the zip file from S3 + raw_key = construct_raw_key( + path=sns_message["Path"], + key=sns_message["Key"], + raw_key_prefix=raw_key_prefix, + ) + # Stream the zip file from S3 with io.BytesIO() as object_stream: s3_client.download_fileobj( Bucket=sns_message["Bucket"], @@ -76,20 +304,34 @@ def main(event: dict, s3_client: boto3.client): Fileobj=object_stream, ) object_stream.seek(0) + multipart_upload = { + "parts": [], + "upload_id": None, + } + # Upload each chunk of compressed data to S3 as part + # of a multipart upload for compressed_data in yield_compressed_data( object_stream=object_stream, path=sns_message["Path"], - part_threshold=102, ): - logger.info(compressed_data.tell()) - object_stream.close() - - # # Define the S3 key for the gzipped file (you can customize this) - # gzip_key = f"{s3_details['Path']}.gz" - - # # Step 4: Upload the gzipped JSON file to S3 - # s3_client.upload_fileobj( - # compressed_data, s3_details["Bucket"], gzip_key - # ) - - # print(f"Uploaded gzipped file to s3://{s3_details['Bucket']}/{gzip_key}") + part = upload_part( + s3_client=s3_client, + body=compressed_data["data"], + bucket=raw_bucket, + key=raw_key, + upload_id=multipart_upload["upload_id"], + part_number=compressed_data["chunk_number"], + ) + multipart_upload["parts"].append(part["part"]) + multipart_upload["upload_id"] = part["upload_id"] + # Complete our multipart upload + completed_upload_response = s3_client.complete_multipart_upload( + Bucket=raw_bucket, + Key=raw_key, + UploadId=multipart_upload["upload_id"], + MultipartUpload={"Parts": multipart_upload["parts"]}, + ) + logger.info( + f"Complete multipart upload response: {completed_upload_response}" + ) + return completed_upload_response diff --git a/src/lambda_function/raw/template.yaml b/src/lambda_function/raw/template.yaml index fe69cf83..971998ca 100644 --- a/src/lambda_function/raw/template.yaml +++ b/src/lambda_function/raw/template.yaml @@ -21,6 +21,10 @@ Parameters: Type: String Description: Name of the Raw S3 bucket. + S3RawKeyPrefix: + Type: String + Description: S3 key prefix where files are written. + LambdaPythonVersion: Type: String Description: Python version to use for this lambda function @@ -55,6 +59,7 @@ Resources: Environment: Variables: RAW_S3_BUCKET: !Ref S3RawBucket + RAW_S3_KEY_PREFIX: !Ref S3RawKeyPrefix Events: SQSEvent: Type: SQS diff --git a/tests/test_lambda_raw.py b/tests/test_lambda_raw.py new file mode 100644 index 00000000..453853b3 --- /dev/null +++ b/tests/test_lambda_raw.py @@ -0,0 +1,162 @@ +import gzip +import hashlib +import io +import zipfile + +import boto3 +import pytest +from moto import mock_s3 + +import src.lambda_function.raw.app as app + + +def test_construct_raw_key(): + path = "path/to/data/FitbitIntradayCombined_20220401-20230112.json" + key = "main/adults_v1/FitbitIntradayCombined_20220401-20230112.json" + raw_key_prefix = "json" + expected_raw_key = ( + "json/dataset=FitbitIntradayCombined/cohort=adults_v1/" + "FitbitIntradayCombined_20220401-20230112.ndjson.gz" + ) + result = app.construct_raw_key(path=path, key=key, raw_key_prefix=raw_key_prefix) + assert result == expected_raw_key + + +@pytest.fixture +def s3_setup(): + # Fixture to set up a mock S3 client and a test bucket. + with mock_s3(): + # Set up the mock S3 client and bucket + s3_client = boto3.client("s3") + bucket = "test-bucket" + key = "test-object" + s3_client.create_bucket(Bucket=bucket) + yield { + "s3_client": s3_client, + "bucket": bucket, + "key": key, + } + + +def test_upload_part_starts_new_multipart_upload(s3_setup): + # Retrieve the mock S3 client, bucket, and key from the fixture + s3_client = s3_setup["s3_client"] + bucket = s3_setup["bucket"] + key = s3_setup["key"] + + # Act: Call upload_part without an upload_id, expecting it to start a new multipart upload + body = b"some data chunk" + result = app.upload_part(s3_client, body, bucket, key) + + # Assert: Verify that a multipart upload was initiated and the part was uploaded + assert "upload_id" in result + assert result["part"]["PartNumber"] == 1 + assert "ETag" in result["part"] + + # Additional checks: Ensure the bucket and object exist in the mocked S3 + uploads = s3_client.list_multipart_uploads(Bucket=bucket) + assert len(uploads.get("Uploads", [])) == 1 + + +def test_upload_part_to_existing_multipart_upload(s3_setup): + # Retrieve the mock S3 client, bucket, and key from the fixture + s3_client = s3_setup["s3_client"] + bucket = s3_setup["bucket"] + key = s3_setup["key"] + + # Setup: Initiate a multipart upload + multipart_upload = s3_client.create_multipart_upload(Bucket=bucket, Key=key) + upload_id = multipart_upload["UploadId"] + + # Act: Upload a part to the existing multipart upload + body = b"another data chunk" + part_number = 2 + result = app.upload_part( + s3_client, body, bucket, key, upload_id=upload_id, part_number=part_number + ) + + # Assert: Verify the correct part was uploaded with the right part number and ETag + assert result["upload_id"] == upload_id + assert result["part"]["PartNumber"] == part_number + assert "ETag" in result["part"] + + # Additional checks: Ensure the part is listed in the multipart upload parts + parts = s3_client.list_parts(Bucket=bucket, Key=key, UploadId=upload_id) + assert len(parts["Parts"]) == 1 + assert parts["Parts"][0]["PartNumber"] == part_number + + +def md5_hash(data): + """Utility function to compute the MD5 hash of data.""" + hash_md5 = hashlib.md5() + hash_md5.update(data) + return hash_md5.hexdigest() + + +def concatenate_gzip_parts(parts): + """Concatenate gzip parts into a complete gzip stream.""" + complete_data = io.BytesIO() + for part in parts: + complete_data.write(part["data"]) + complete_data.seek(0) + return complete_data + + +def test_yield_compressed_data(shared_datadir): + # Test parameters + test_file_path = shared_datadir / "2023-01-13T21--08--51Z_TESTDATA" + # This file is 21 KB uncompressed + json_file_path = "FitbitIntradayCombined_20230112-20230114.json" + part_threshold = 1024 + + # Load the original JSON file to compare + with zipfile.ZipFile(test_file_path, "r") as zipf: + with zipf.open(json_file_path) as original_json_file: + original_json_data = original_json_file.read() + + # Calculate MD5 of the original uncompressed JSON file + original_md5 = md5_hash(original_json_data) + + # Run the yield_compressed_data function + parts = list( + app.yield_compressed_data( + io.BytesIO(test_file_path.read_bytes()), json_file_path, part_threshold + ) + ) + + # Verify that each part surpasses the threshold except possibly the last one + part_sizes = [len(part["data"]) for part in parts] + for part_size in part_sizes[:-1]: + assert ( + part_size >= part_threshold + ), "Part size is smaller than expected threshold." + + # Concatenate all the parts into a complete gzip file + complete_data = concatenate_gzip_parts(parts) + + # Decompress the complete gzip archive + with gzip.GzipFile(fileobj=complete_data, mode="rb") as gzip_file: + decompressed_data = gzip_file.read() + + # Verify that the decompressed data matches the original MD5 hash + decompressed_md5 = md5_hash(decompressed_data) + assert ( + decompressed_md5 == original_md5 + ), "Decompressed data does not match the original file." + + +def test_compressed_data_wrapper(): + # Setup: Create a BytesIO object with some sample data and define a chunk number + sample_data = b"compressed data" + chunk_number = 1 + compressed_data = io.BytesIO(sample_data) + + # Act: Call the function with the sample data and chunk number + result = app.compressed_data_wrapper(compressed_data, chunk_number) + + # Assert: Verify the output matches the expected format and content + expected_result = { + "data": sample_data, + "chunk_number": chunk_number, + } + assert result == expected_result From 6ac1233aab00f3b04a280710b2d355f3c321383f Mon Sep 17 00:00:00 2001 From: Phil Snyder Date: Thu, 29 Aug 2024 17:31:14 -0700 Subject: [PATCH 4/5] minor update to dispatch lambda module docstring --- src/lambda_function/dispatch/app.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/lambda_function/dispatch/app.py b/src/lambda_function/dispatch/app.py index 00564073..9edbbb1c 100644 --- a/src/lambda_function/dispatch/app.py +++ b/src/lambda_function/dispatch/app.py @@ -2,9 +2,10 @@ Dispatch Lambda This Lambda polls the input-to-dispatch SQS queue and publishes to the dispatch SNS topic. -Its purpose is to inspect each export and dispatch each file as a separate job in which -the file will be decompressed and uploaded to S3. +Its purpose is to inspect each export and dispatch each file with a non-zero size as a +separate job. """ + import json import logging import os From 29e77acd5740ec93ad36d6bc1eb8a6ac7e376c27 Mon Sep 17 00:00:00 2001 From: Phil Snyder Date: Thu, 29 Aug 2024 17:47:50 -0700 Subject: [PATCH 5/5] Add analogous prod stacks --- config/prod/namespaced/lambda-raw-role.yaml | 13 +++++++++++++ config/prod/namespaced/lambda-raw.yaml | 17 +++++++++++++++++ config/prod/namespaced/sqs-dispatch-to-raw.yaml | 2 +- src/lambda_function/raw/README.md | 4 +++- 4 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 config/prod/namespaced/lambda-raw-role.yaml create mode 100644 config/prod/namespaced/lambda-raw.yaml diff --git a/config/prod/namespaced/lambda-raw-role.yaml b/config/prod/namespaced/lambda-raw-role.yaml new file mode 100644 index 00000000..92a419ed --- /dev/null +++ b/config/prod/namespaced/lambda-raw-role.yaml @@ -0,0 +1,13 @@ +template: + path: lambda-raw-role.yaml +stack_name: "{{ stack_group_config.namespace }}-lambda-raw-role" +dependencies: + - prod/namespaced/sqs-dispatch-to-raw.yaml + - prod/s3-cloudformation-bucket.yaml + - prod/s3-raw-bucket.yaml +parameters: + SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-dispatch-to-raw::PrimaryQueueArn" + S3SourceBucketName: {{ stack_group_config.input_bucket_name }} + S3TargetBucketName: {{ stack_group_config.raw_bucket_name }} +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/lambda-raw.yaml b/config/prod/namespaced/lambda-raw.yaml new file mode 100644 index 00000000..0dbfbb91 --- /dev/null +++ b/config/prod/namespaced/lambda-raw.yaml @@ -0,0 +1,17 @@ +template: + type: sam + path: src/lambda_function/raw/template.yaml + artifact_bucket_name: {{ stack_group_config.template_bucket_name }} + artifact_prefix: "{{ stack_group_config.namespace }}/src/lambda" +dependencies: + - prod/namespaced/lambda-raw-role.yaml + - prod/namespaced/sqs-dispatch-to-raw.yaml + - prod/s3-cloudformation-bucket.yaml + - prod/s3-raw-bucket.yaml +stack_name: "{{ stack_group_config.namespace }}-lambda-raw" +parameters: + RoleArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-raw-role::RoleArn" + SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-dispatch-to-raw::PrimaryQueueArn" + S3RawBucket: {{ stack_group_config.raw_bucket_name }} + S3RawKeyPrefix: "{{ stack_group_config.namespace }}/json/" +stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/sqs-dispatch-to-raw.yaml b/config/prod/namespaced/sqs-dispatch-to-raw.yaml index 4870eb3b..b4bd8607 100644 --- a/config/prod/namespaced/sqs-dispatch-to-raw.yaml +++ b/config/prod/namespaced/sqs-dispatch-to-raw.yaml @@ -3,7 +3,7 @@ template: parameters: MessageRetentionPeriod: "1209600" ReceiveMessageWaitTimeSeconds: "20" - VisibilityTimeout: "120" + VisibilityTimeout: "900" SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-dispatch::SnsTopicArn" dependencies: - prod/namespaced/sns-dispatch.yaml diff --git a/src/lambda_function/raw/README.md b/src/lambda_function/raw/README.md index 12da1859..5834b9cd 100644 --- a/src/lambda_function/raw/README.md +++ b/src/lambda_function/raw/README.md @@ -1,7 +1,9 @@ # Raw Lambda The raw Lambda polls the dispatch-to-raw SQS queue and uploads an object to the raw S3 bucket. -Its purpose is to decompress a single file from an export, recompress that file, and store it to S3. +Its purpose is to compress a single JSON file from an export (zip archive) and store it to S3. +It makes heavy use of Python file objects and multipart uploads and can download/compress/upload +with a relatively low, fixed memory overhead with respect to the size of the uncompressed JSON. ## Development