diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 7b571bc6e..7693ac98f 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -30,6 +30,7 @@ stateDiagram-v2 * Reports batch item failures to reduce number of retries for a record upon errors * Simple interface to process each batch record +* Parallel processing of batches * Integrates with Java Events library and the deserialization module * Build your own batch processor by extending primitives @@ -110,16 +111,9 @@ You can use your preferred deployment framework to set the correct configuration while the `powertools-batch` module handles generating the response, which simply needs to be returned as the result of your Lambda handler. -A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found -[here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering -all of the batch sources. - -For more information on configuring `ReportBatchItemFailures`, -see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting), -[Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting),and -[DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting). - +A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found [here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering all the batch sources. +For more information on configuring `ReportBatchItemFailures`, see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting), [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting), and [DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting). !!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires." @@ -150,12 +144,10 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs. public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { return handler.processBatch(sqsEvent, context); } - - + private void processMessage(Product p, Context c) { // Process the product } - } ``` @@ -276,7 +268,6 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs. private void processMessage(Product p, Context c) { // process the product } - } ``` @@ -475,6 +466,51 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs. } ``` +## Parallel processing +You can choose to process batch items in parallel using the `BatchMessageHandler#processBatchInParallel()` +instead of `BatchMessageHandler#processBatch()`. Partial batch failure works the same way but items are processed +in parallel rather than sequentially. + +This feature is available for SQS, Kinesis and DynamoDB Streams but cannot be +used with SQS FIFO. In that case, an `UnsupportedOperationException` is thrown. + +!!! warning + Note that parallel processing is not always better than sequential processing, + and you should benchmark your code to determine the best approach for your use case. + +!!! info + To get more threads available (more vCPUs), you need to increase the amount of memory allocated to your Lambda function. + While it is possible to increase the number of threads using Java options or custom thread pools, + in most cases the defaults work well, and changing them is more likely to decrease performance + (see [here](https://www.baeldung.com/java-when-to-use-parallel-stream#fork-join-framework) + and [here](https://dzone.com/articles/be-aware-of-forkjoinpoolcommonpool)). + In situations where this may be useful - such as performing IO-bound work in parallel - make sure to measure before and after! + + +=== "Example with SQS" + + ```java hl_lines="13" + public class SqsBatchHandler implements RequestHandler { + + private final BatchMessageHandler handler; + + public SqsBatchHandler() { + handler = new BatchMessageHandlerBuilder() + .withSqsBatchHandler() + .buildWithMessageHandler(this::processMessage, Product.class); + } + + @Override + public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { + return handler.processBatchInParallel(sqsEvent, context); + } + + private void processMessage(Product p, Context c) { + // Process the product + } + } + ``` + ## Handling Messages @@ -490,7 +526,7 @@ In general, the deserialized message handler should be used unless you need acce === "Raw Message Handler" - ```java + ```java hl_lines="4 7" public void setup() { BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -505,7 +541,7 @@ In general, the deserialized message handler should be used unless you need acce === "Deserialized Message Handler" - ```java + ```java hl_lines="4 7" public void setup() { BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -529,20 +565,20 @@ provide a custom failure handler. Handlers can be provided when building the batch processor and are available for all event sources. For instance for DynamoDB: -```java - BatchMessageHandler handler = new BatchMessageHandlerBuilder() - .withDynamoDbBatchHandler() - .withSuccessHandler((m) -> { - // Success handler receives the raw message - LOGGER.info("Message with sequenceNumber {} was successfully processed", - m.getDynamodb().getSequenceNumber()); - }) - .withFailureHandler((m, e) -> { - // Failure handler receives the raw message and the exception thrown. - LOGGER.info("Message with sequenceNumber {} failed to be processed: {}" - , e.getDynamodb().getSequenceNumber(), e); - }) - .buildWithMessageHander(this::processMessage); +```java hl_lines="3 8" +BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withDynamoDbBatchHandler() + .withSuccessHandler((m) -> { + // Success handler receives the raw message + LOGGER.info("Message with sequenceNumber {} was successfully processed", + m.getDynamodb().getSequenceNumber()); + }) + .withFailureHandler((m, e) -> { + // Failure handler receives the raw message and the exception thrown. + LOGGER.info("Message with sequenceNumber {} failed to be processed: {}" + , e.getDynamodb().getSequenceNumber(), e); + }) + .buildWithMessageHander(this::processMessage); ``` !!! info diff --git a/examples/powertools-examples-batch/deploy/sqs/template.yml b/examples/powertools-examples-batch/deploy/sqs/template.yml index 764ba4863..2f1d6c363 100644 --- a/examples/powertools-examples-batch/deploy/sqs/template.yml +++ b/examples/powertools-examples-batch/deploy/sqs/template.yml @@ -7,12 +7,10 @@ Globals: Function: Timeout: 20 Runtime: java11 - MemorySize: 512 - Tracing: Active + MemorySize: 5400 Environment: Variables: POWERTOOLS_LOG_LEVEL: INFO - POWERTOOLS_LOGGER_SAMPLE_RATE: 1.0 POWERTOOLS_LOGGER_LOG_EVENT: true Resources: @@ -45,6 +43,9 @@ Resources: AliasName: alias/powertools-batch-sqs-demo TargetKeyId: !Ref CustomerKey + Bucket: + Type: AWS::S3::Bucket + DemoDlqSqsQueue: Type: AWS::SQS::Queue Properties: @@ -96,11 +97,57 @@ Resources: DemoSQSConsumerFunction: Type: AWS::Serverless::Function Properties: + Tracing: Active CodeUri: ../.. Handler: org.demo.batch.sqs.SqsBatchHandler::handleRequest Environment: Variables: POWERTOOLS_SERVICE_NAME: sqs-demo + BUCKET: !Ref Bucket + Policies: + - Statement: + - Sid: SQSDeleteGetAttribute + Effect: Allow + Action: + - sqs:DeleteMessageBatch + - sqs:GetQueueAttributes + Resource: !GetAtt DemoSqsQueue.Arn + - Sid: SQSSendMessageBatch + Effect: Allow + Action: + - sqs:SendMessageBatch + - sqs:SendMessage + Resource: !GetAtt DemoDlqSqsQueue.Arn + - Sid: SQSKMSKey + Effect: Allow + Action: + - kms:GenerateDataKey + - kms:Decrypt + Resource: !GetAtt CustomerKey.Arn + - Sid: WriteToS3 + Effect: Allow + Action: + - s3:PutObject + Resource: !Sub ${Bucket.Arn}/* + +# Events: +# MySQSEvent: +# Type: SQS +# Properties: +# Queue: !GetAtt DemoSqsQueue.Arn +# BatchSize: 100 +# MaximumBatchingWindowInSeconds: 60 + + DemoSQSParallelConsumerFunction: + Type: AWS::Serverless::Function + Properties: + Tracing: Active + CodeUri: ../.. + Handler: org.demo.batch.sqs.SqsParallelBatchHandler::handleRequest + Environment: + Variables: + POWERTOOLS_SERVICE_NAME: sqs-demo + BUCKET: !Ref Bucket Policies: - Statement: - Sid: SQSDeleteGetAttribute @@ -121,13 +168,19 @@ Resources: - kms:GenerateDataKey - kms:Decrypt Resource: !GetAtt CustomerKey.Arn + - Sid: WriteToS3 + Effect: Allow + Action: + - s3:PutObject + Resource: !Sub ${Bucket.Arn}/* + Events: MySQSEvent: Type: SQS Properties: Queue: !GetAtt DemoSqsQueue.Arn - BatchSize: 2 - MaximumBatchingWindowInSeconds: 300 + BatchSize: 100 + MaximumBatchingWindowInSeconds: 60 Outputs: DemoSqsQueue: diff --git a/examples/powertools-examples-batch/pom.xml b/examples/powertools-examples-batch/pom.xml index b9a523d04..73e4cbe55 100644 --- a/examples/powertools-examples-batch/pom.xml +++ b/examples/powertools-examples-batch/pom.xml @@ -42,6 +42,17 @@ software.amazon.awssdk sdk-core ${sdk.version} + + + org.slf4j + slf4j-api + + + + + software.amazon.awssdk + s3 + ${sdk.version} software.amazon.awssdk diff --git a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/AbstractSqsBatchHandler.java b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/AbstractSqsBatchHandler.java new file mode 100644 index 000000000..25dba47bb --- /dev/null +++ b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/AbstractSqsBatchHandler.java @@ -0,0 +1,70 @@ +/* + * Copyright 2024 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.demo.batch.sqs; + +import com.amazonaws.services.lambda.runtime.Context; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; +import org.demo.batch.model.Product; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.lambda.powertools.logging.Logging; +import software.amazon.lambda.powertools.tracing.Tracing; +import software.amazon.lambda.powertools.tracing.TracingUtils; + +public class AbstractSqsBatchHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSqsBatchHandler.class); + private final ObjectMapper mapper = new ObjectMapper(); + private final String bucket = System.getenv("BUCKET"); + private final S3Client s3 = S3Client.builder().httpClient(UrlConnectionHttpClient.create()).build(); + private final Random r = new Random(); + + /** + * Simulate some processing (I/O + S3 put request) + * @param p deserialized product + * @param context Lambda context + */ + @Logging + @Tracing + protected void processMessage(Product p, Context context) { + TracingUtils.putAnnotation("productId", p.getId()); + TracingUtils.putAnnotation("Thread", Thread.currentThread().getName()); + MDC.put("product", String.valueOf(p.getId())); + LOGGER.info("Processing product {}", p); + + char c = (char)(r.nextInt(26) + 'a'); + char[] chars = new char[1024 * 1000]; + Arrays.fill(chars, c); + p.setName(new String(chars)); + try { + File file = new File("/tmp/"+p.getId()+".json"); + mapper.writeValue(file, p); + s3.putObject( + PutObjectRequest.builder().bucket(bucket).key(p.getId()+".json").build(), RequestBody.fromFile(file)); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + MDC.remove("product"); + } + } +} diff --git a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchHandler.java b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchHandler.java index 27689485c..bc0f57cb8 100644 --- a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchHandler.java +++ b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchHandler.java @@ -4,13 +4,15 @@ import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import org.demo.batch.model.Product; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.demo.batch.model.Product; import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder; import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; +import software.amazon.lambda.powertools.logging.Logging; +import software.amazon.lambda.powertools.tracing.Tracing; -public class SqsBatchHandler implements RequestHandler { +public class SqsBatchHandler extends AbstractSqsBatchHandler implements RequestHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SqsBatchHandler.class); private final BatchMessageHandler handler; @@ -20,14 +22,11 @@ public SqsBatchHandler() { .buildWithMessageHandler(this::processMessage, Product.class); } + @Logging + @Tracing @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { + LOGGER.info("Processing batch of {} messages", sqsEvent.getRecords().size()); return handler.processBatch(sqsEvent, context); } - - - private void processMessage(Product p, Context c) { - LOGGER.info("Processing product " + p); - } - } diff --git a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchSender.java b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchSender.java index 4050ab98b..58b24d735 100644 --- a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchSender.java +++ b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchSender.java @@ -10,14 +10,13 @@ import java.security.SecureRandom; import java.util.List; import java.util.stream.IntStream; +import org.demo.batch.model.Product; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.demo.batch.model.Product; import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; -import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; /** @@ -45,16 +44,12 @@ public SqsBatchSender() { public String handleRequest(ScheduledEvent scheduledEvent, Context context) { String queueUrl = System.getenv("QUEUE_URL"); - LOGGER.info("handleRequest"); - - // Push 5 messages on each invoke. - List batchRequestEntries = IntStream.range(0, 5) + List batchRequestEntries = IntStream.range(0, 50) .mapToObj(value -> { - long id = random.nextLong(); - float price = random.nextFloat(); + long id = Math.abs(random.nextLong()); + float price = Math.abs(random.nextFloat() * 3465); Product product = new Product(id, "product-" + id, price); try { - return SendMessageBatchRequestEntry.builder() .id(scheduledEvent.getId() + value) .messageBody(objectMapper.writeValueAsString(product)) @@ -65,12 +60,12 @@ public String handleRequest(ScheduledEvent scheduledEvent, Context context) { } }).collect(toList()); - SendMessageBatchResponse sendMessageBatchResponse = sqsClient.sendMessageBatch(SendMessageBatchRequest.builder() - .queueUrl(queueUrl) - .entries(batchRequestEntries) - .build()); - - LOGGER.info("Sent Message {}", sendMessageBatchResponse); + for (int i = 0; i < 50; i += 10) { + sqsClient.sendMessageBatch(SendMessageBatchRequest.builder() + .queueUrl(queueUrl) + .entries(batchRequestEntries.subList(i, i + 10)) + .build()); + } return "Success"; } diff --git a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsParallelBatchHandler.java b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsParallelBatchHandler.java new file mode 100644 index 000000000..0151c0a32 --- /dev/null +++ b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsParallelBatchHandler.java @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.demo.batch.sqs; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import org.demo.batch.model.Product; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder; +import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; +import software.amazon.lambda.powertools.logging.Logging; +import software.amazon.lambda.powertools.tracing.Tracing; + +public class SqsParallelBatchHandler extends AbstractSqsBatchHandler implements RequestHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(SqsParallelBatchHandler.class); + private final BatchMessageHandler handler; + + public SqsParallelBatchHandler() { + handler = new BatchMessageHandlerBuilder() + .withSqsBatchHandler() + .buildWithMessageHandler(this::processMessage, Product.class); + } + + @Logging + @Tracing + @Override + public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { + LOGGER.info("Processing batch of {} messages", sqsEvent.getRecords().size()); + MDC.put("requestId", context.getAwsRequestId()); // should be propagated to other threads + return handler.processBatchInParallel(sqsEvent, context); + } +} diff --git a/examples/powertools-examples-batch/src/main/resources/LogLayout.json b/examples/powertools-examples-batch/src/main/resources/LogLayout.json new file mode 100644 index 000000000..60f102e09 --- /dev/null +++ b/examples/powertools-examples-batch/src/main/resources/LogLayout.json @@ -0,0 +1,75 @@ +{ + "level": { + "$resolver": "level", + "field": "name" + }, + "message": { + "$resolver": "message" + }, + "error": { + "message": { + "$resolver": "exception", + "field": "message" + }, + "name": { + "$resolver": "exception", + "field": "className" + }, + "stack": { + "$resolver": "exception", + "field": "stackTrace", + "stackTrace": { + "stringified": true + } + } + }, + "cold_start": { + "$resolver": "powertools", + "field": "cold_start" + }, + "thread": { + "$resolver": "thread", + "field": "name" + }, + "function_arn": { + "$resolver": "powertools", + "field": "function_arn" + }, + "function_memory_size": { + "$resolver": "powertools", + "field": "function_memory_size" + }, + "function_name": { + "$resolver": "powertools", + "field": "function_name" + }, + "function_request_id": { + "$resolver": "powertools", + "field": "function_request_id" + }, + "function_version": { + "$resolver": "powertools", + "field": "function_version" + }, + "sampling_rate": { + "$resolver": "powertools", + "field": "sampling_rate" + }, + "service": { + "$resolver": "powertools", + "field": "service" + }, + "timestamp": { + "$resolver": "timestamp", + "pattern": { + "format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" + } + }, + "xray_trace_id": { + "$resolver": "powertools", + "field": "xray_trace_id" + }, + "": { + "$resolver": "powertools" + } +} \ No newline at end of file diff --git a/examples/powertools-examples-batch/src/main/resources/log4j2.xml b/examples/powertools-examples-batch/src/main/resources/log4j2.xml index ea3ecf474..c48ca3ef4 100644 --- a/examples/powertools-examples-batch/src/main/resources/log4j2.xml +++ b/examples/powertools-examples-batch/src/main/resources/log4j2.xml @@ -2,7 +2,8 @@ - + + diff --git a/powertools-batch/pom.xml b/powertools-batch/pom.xml index 1886f56e6..66a5e3087 100644 --- a/powertools-batch/pom.xml +++ b/powertools-batch/pom.xml @@ -21,6 +21,16 @@ true + + org.apache.maven.plugins + maven-surefire-plugin + + + + -Djava.util.concurrent.ForkJoinPool.common.parallelism=4 + + + @@ -47,6 +57,12 @@ junit-jupiter-api test + + org.slf4j + slf4j-simple + 2.0.7 + test + org.assertj assertj-core diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java index 730211feb..18d74bb25 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java @@ -33,6 +33,21 @@ public interface BatchMessageHandler { * @param context The lambda context * @return A partial batch response */ - public abstract R processBatch(E event, Context context); + R processBatch(E event, Context context); + /** + * Processes the given batch in parallel returning a partial batch + * response indicating the success and failure of individual + * messages within the batch.
+ * Note that parallel processing is not always better than sequential processing, + * and you should benchmark your code to determine the best approach for your use case.
+ * Also note that to get more threads available (more vCPUs), + * you need to increase the amount of memory allocated to your Lambda function.
+ + * + * @param event The Lambda event containing the batch to process + * @param context The lambda context + * @return A partial batch response + */ + R processBatchInParallel(E event, Context context); } diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java index 83a8bf7dd..4b03d0947 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java @@ -17,12 +17,14 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; -import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC; /** * A batch message processor for DynamoDB Streams batches. @@ -46,35 +48,60 @@ public DynamoDbBatchMessageHandler(Consumer @Override public StreamsEventResponse processBatch(DynamodbEvent event, Context context) { - List batchFailures = new ArrayList<>(); + List batchItemFailures = event.getRecords() + .stream() + .map(eventRecord -> processBatchItem(eventRecord, context)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); - for (DynamodbEvent.DynamodbStreamRecord record : event.getRecords()) { - try { + return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build(); + } - rawMessageHandler.accept(record, context); - // Report success if we have a handler - if (this.successHandler != null) { - this.successHandler.accept(record); - } - } catch (Throwable t) { - String sequenceNumber = record.getDynamodb().getSequenceNumber(); - LOGGER.error("Error while processing record with id {}: {}, adding it to batch item failures", - sequenceNumber, t.getMessage()); - LOGGER.error("Error was", t); - batchFailures.add(new StreamsEventResponse.BatchItemFailure(sequenceNumber)); + @Override + public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context context) { + MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); + + List batchItemFailures = event.getRecords() + .parallelStream() // Parallel processing + .map(eventRecord -> { + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); + return processBatchItem(eventRecord, context); + }) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + + return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build(); + } - // Report failure if we have a handler - if (this.failureHandler != null) { - // A failing failure handler is no reason to fail the batch - try { - this.failureHandler.accept(record, t); - } catch (Throwable t2) { - LOGGER.warn("failureHandler threw handling failure", t2); - } + private Optional processBatchItem(DynamodbEvent.DynamodbStreamRecord streamRecord, Context context) { + try { + LOGGER.debug("Processing item {}", streamRecord.getEventID()); + + rawMessageHandler.accept(streamRecord, context); + + // Report success if we have a handler + if (this.successHandler != null) { + this.successHandler.accept(streamRecord); + } + return Optional.empty(); + } catch (Throwable t) { + String sequenceNumber = streamRecord.getDynamodb().getSequenceNumber(); + LOGGER.error("Error while processing record with id {}: {}, adding it to batch item failures", + sequenceNumber, t.getMessage()); + LOGGER.error("Error was", t); + + // Report failure if we have a handler + if (this.failureHandler != null) { + // A failing failure handler is no reason to fail the batch + try { + this.failureHandler.accept(streamRecord, t); + } catch (Throwable t2) { + LOGGER.warn("failureHandler threw handling failure", t2); } } + return Optional.of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(sequenceNumber).build()); } - - return new StreamsEventResponse(batchFailures); } } diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java index ad1dd302d..7b4179de7 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java @@ -18,12 +18,14 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; -import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC; import software.amazon.lambda.powertools.utilities.EventDeserializer; /** @@ -57,42 +59,67 @@ public KinesisStreamsBatchMessageHandler(BiConsumer batchFailures = new ArrayList<>(); - - for (KinesisEvent.KinesisEventRecord record : event.getRecords()) { - try { - if (this.rawMessageHandler != null) { - rawMessageHandler.accept(record, context); - } else { - M messageDeserialized = EventDeserializer.extractDataFrom(record).as(messageClass); - messageHandler.accept(messageDeserialized, context); - } + List batchItemFailures = event.getRecords() + .stream() + .map(eventRecord -> processBatchItem(eventRecord, context)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); - // Report success if we have a handler - if (this.successHandler != null) { - this.successHandler.accept(record); - } - } catch (Throwable t) { - String sequenceNumber = record.getEventID(); - LOGGER.error("Error while processing record with eventID {}: {}, adding it to batch item failures", - sequenceNumber, t.getMessage()); - LOGGER.error("Error was", t); - - batchFailures.add(new StreamsEventResponse.BatchItemFailure(record.getKinesis().getSequenceNumber())); - - // Report failure if we have a handler - if (this.failureHandler != null) { - // A failing failure handler is no reason to fail the batch - try { - this.failureHandler.accept(record, t); - } catch (Throwable t2) { - LOGGER.warn("failureHandler threw handling failure", t2); - } + return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build(); + } + + @Override + public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context context) { + MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); + + List batchItemFailures = event.getRecords() + .parallelStream() // Parallel processing + .map(eventRecord -> { + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); + return processBatchItem(eventRecord, context); + }) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + + return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build(); + } + + private Optional processBatchItem(KinesisEvent.KinesisEventRecord eventRecord, Context context) { + try { + LOGGER.debug("Processing item {}", eventRecord.getEventID()); + + if (this.rawMessageHandler != null) { + rawMessageHandler.accept(eventRecord, context); + } else { + M messageDeserialized = EventDeserializer.extractDataFrom(eventRecord).as(messageClass); + messageHandler.accept(messageDeserialized, context); + } + + // Report success if we have a handler + if (this.successHandler != null) { + this.successHandler.accept(eventRecord); + } + return Optional.empty(); + } catch (Throwable t) { + String sequenceNumber = eventRecord.getEventID(); + LOGGER.error("Error while processing record with eventID {}: {}, adding it to batch item failures", + sequenceNumber, t.getMessage()); + LOGGER.error("Error was", t); + + // Report failure if we have a handler + if (this.failureHandler != null) { + // A failing failure handler is no reason to fail the batch + try { + this.failureHandler.accept(eventRecord, t); + } catch (Throwable t2) { + LOGGER.warn("failureHandler threw handling failure", t2); } } - } - return new StreamsEventResponse(batchFailures); + return Optional.of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(eventRecord.getKinesis().getSequenceNumber()).build()); + } } } diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java index b634f9b62..2dfb0a28e 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java @@ -18,10 +18,15 @@ import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC; import software.amazon.lambda.powertools.utilities.EventDeserializer; /** @@ -61,57 +66,27 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) { // If we are working on a FIFO queue, when any message fails we should stop processing and return the // rest of the batch as failed too. We use this variable to track when that has happened. // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting - boolean failWholeBatch = false; + final AtomicBoolean failWholeBatch = new AtomicBoolean(false); int messageCursor = 0; - for (; messageCursor < event.getRecords().size() && !failWholeBatch; messageCursor++) { + for (; messageCursor < event.getRecords().size() && !failWholeBatch.get(); messageCursor++) { SQSEvent.SQSMessage message = event.getRecords().get(messageCursor); String messageGroupId = message.getAttributes() != null ? message.getAttributes().get(MESSAGE_GROUP_ID_KEY) : null; - try { - if (this.rawMessageHandler != null) { - rawMessageHandler.accept(message, context); - } else { - M messageDeserialized = EventDeserializer.extractDataFrom(message).as(messageClass); - messageHandler.accept(messageDeserialized, context); - } - - // Report success if we have a handler - if (this.successHandler != null) { - this.successHandler.accept(message); - } - - } catch (Throwable t) { - LOGGER.error("Error while processing message with messageId {}: {}, adding it to batch item failures", - message.getMessageId(), t.getMessage()); - LOGGER.error("Error was", t); - - response.getBatchItemFailures() - .add(SQSBatchResponse.BatchItemFailure.builder().withItemIdentifier(message.getMessageId()) - .build()); + processBatchItem(message, context).ifPresent(batchItemFailure -> { + response.getBatchItemFailures().add(batchItemFailure); if (messageGroupId != null) { - failWholeBatch = true; + failWholeBatch.set(true); LOGGER.info( "A message in a batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too" , messageGroupId, message.getMessageId()); } - - // Report failure if we have a handler - if (this.failureHandler != null) { - // A failing failure handler is no reason to fail the batch - try { - this.failureHandler.accept(message, t); - } catch (Throwable t2) { - LOGGER.warn("failureHandler threw handling failure", t2); - } - } - - } + }); } - if (failWholeBatch) { + if (failWholeBatch.get()) { // Add the remaining messages to the batch item failures event.getRecords() .subList(messageCursor, event.getRecords().size()) @@ -121,4 +96,60 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) { } return response; } + + @Override + public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context) { + if (!event.getRecords().isEmpty() && event.getRecords().get(0).getAttributes().get(MESSAGE_GROUP_ID_KEY) != null) { + throw new UnsupportedOperationException("FIFO queues are not supported in parallel mode, use the processBatch method instead"); + } + + MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); + List batchItemFailures = event.getRecords() + .parallelStream() // Parallel processing + .map(sqsMessage -> { + + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); + return processBatchItem(sqsMessage, context); + }) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + + return SQSBatchResponse.builder().withBatchItemFailures(batchItemFailures).build(); + } + + private Optional processBatchItem(SQSEvent.SQSMessage message, Context context) { + try { + LOGGER.debug("Processing message {}", message.getMessageId()); + + if (this.rawMessageHandler != null) { + rawMessageHandler.accept(message, context); + } else { + M messageDeserialized = EventDeserializer.extractDataFrom(message).as(messageClass); + messageHandler.accept(messageDeserialized, context); + } + + // Report success if we have a handler + if (this.successHandler != null) { + this.successHandler.accept(message); + } + return Optional.empty(); + } catch (Throwable t) { + LOGGER.error("Error while processing message with messageId {}: {}, adding it to batch item failures", + message.getMessageId(), t.getMessage()); + LOGGER.error("Error was", t); + + // Report failure if we have a handler + if (this.failureHandler != null) { + // A failing failure handler is no reason to fail the batch + try { + this.failureHandler.accept(message, t); + } catch (Throwable t2) { + LOGGER.warn("failureHandler threw handling failure", t2); + } + } + return Optional.of(SQSBatchResponse.BatchItemFailure.builder().withItemIdentifier(message.getMessageId()) + .build()); + } + } } diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/MultiThreadMDC.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/MultiThreadMDC.java new file mode 100644 index 000000000..df1c2e7a0 --- /dev/null +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/MultiThreadMDC.java @@ -0,0 +1,47 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package software.amazon.lambda.powertools.batch.internal; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * MDC (SLF4J) is not passed to other threads (ThreadLocal). + * This class permits to manually copy the MDC to a given thread. + */ +public class MultiThreadMDC { + + private static final Logger LOGGER = LoggerFactory.getLogger(MultiThreadMDC.class); + + private final List mdcAwareThreads = new ArrayList<>(); + private final Map contextMap; + + public MultiThreadMDC() { + mdcAwareThreads.add("main"); + contextMap = MDC.getCopyOfContextMap(); + } + + public void copyMDCToThread(String thread) { + if (!mdcAwareThreads.contains(thread)) { + LOGGER.debug("Copy MDC to thread {}", thread); + MDC.setContextMap(contextMap); + mdcAwareThreads.add(thread); + } + } +} diff --git a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/DdbBatchProcessorTest.java b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/DdbBatchProcessorTest.java index 9e2c211e2..6bb247323 100644 --- a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/DdbBatchProcessorTest.java +++ b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/DdbBatchProcessorTest.java @@ -20,16 +20,27 @@ import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.tests.annotations.Event; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; import org.mockito.Mock; import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; -public class DdbBatchProcessorTest { +class DdbBatchProcessorTest { @Mock private Context context; + private final List threadList = Collections.synchronizedList(new ArrayList<>()); + + @AfterEach + public void clear() { + threadList.clear(); + } + private void processMessageSucceeds(DynamodbEvent.DynamodbStreamRecord record, Context context) { // Great success } @@ -40,9 +51,36 @@ private void processMessageFailsForFixedMessage(DynamodbEvent.DynamodbStreamReco } } + private void processMessageInParallelSucceeds(DynamodbEvent.DynamodbStreamRecord record, Context context) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void processMessageInParallelFailsForFixedMessage(DynamodbEvent.DynamodbStreamRecord record, Context context) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (record.getDynamodb().getSequenceNumber().equals("4421584500000000017450439091")) { + throw new RuntimeException("fake exception"); + } + } + @ParameterizedTest @Event(value = "dynamo_event.json", type = DynamodbEvent.class) - public void batchProcessingSucceedsAndReturns(DynamodbEvent event) { + void batchProcessingSucceedsAndReturns(DynamodbEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withDynamoDbBatchHandler() @@ -52,12 +90,28 @@ public void batchProcessingSucceedsAndReturns(DynamodbEvent event) { StreamsEventResponse dynamodbBatchResponse = handler.processBatch(event, context); // Assert - assertThat(dynamodbBatchResponse.getBatchItemFailures()).hasSize(0); + assertThat(dynamodbBatchResponse.getBatchItemFailures()).isEmpty(); + } + + @ParameterizedTest + @Event(value = "dynamo_event_big.json", type = DynamodbEvent.class) + void parallelBatchProcessingSucceedsAndReturns(DynamodbEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withDynamoDbBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelSucceeds); + + // Act + StreamsEventResponse dynamodbBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(dynamodbBatchResponse.getBatchItemFailures()).isEmpty(); + assertThat(threadList).hasSizeGreaterThan(1); } @ParameterizedTest @Event(value = "dynamo_event.json", type = DynamodbEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withMessage(DynamodbEvent event) { + void shouldAddMessageToBatchFailure_whenException_withMessage(DynamodbEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withDynamoDbBatchHandler() @@ -72,9 +126,27 @@ public void shouldAddMessageToBatchFailure_whenException_withMessage(DynamodbEve assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("4421584500000000017450439091"); } + @ParameterizedTest + @Event(value = "dynamo_event_big.json", type = DynamodbEvent.class) + void parallelBatchProcessing_shouldAddMessageToBatchFailure_whenException_withMessage(DynamodbEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withDynamoDbBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelFailsForFixedMessage); + + // Act + StreamsEventResponse dynamodbBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(dynamodbBatchResponse.getBatchItemFailures()).hasSize(1); + StreamsEventResponse.BatchItemFailure batchItemFailure = dynamodbBatchResponse.getBatchItemFailures().get(0); + assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("4421584500000000017450439091"); + assertThat(threadList).hasSizeGreaterThan(1); + } + @ParameterizedTest @Event(value = "dynamo_event.json", type = DynamodbEvent.class) - public void failingFailureHandlerShouldntFailBatch(DynamodbEvent event) { + void failingFailureHandlerShouldntFailBatch(DynamodbEvent event) { // Arrange AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -92,7 +164,7 @@ public void failingFailureHandlerShouldntFailBatch(DynamodbEvent event) { // Assert assertThat(dynamodbBatchResponse).isNotNull(); - assertThat(dynamodbBatchResponse.getBatchItemFailures().size()).isEqualTo(1); + assertThat(dynamodbBatchResponse.getBatchItemFailures()).hasSize(1); assertThat(wasCalledAndFailed.get()).isTrue(); StreamsEventResponse.BatchItemFailure batchItemFailure = dynamodbBatchResponse.getBatchItemFailures().get(0); assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("4421584500000000017450439091"); @@ -100,7 +172,7 @@ public void failingFailureHandlerShouldntFailBatch(DynamodbEvent event) { @ParameterizedTest @Event(value = "dynamo_event.json", type = DynamodbEvent.class) - public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(DynamodbEvent event) { + void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(DynamodbEvent event) { // Arrange AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -118,7 +190,7 @@ public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(DynamodbE // Assert assertThat(dynamodbBatchResponse).isNotNull(); - assertThat(dynamodbBatchResponse.getBatchItemFailures().size()).isEqualTo(1); + assertThat(dynamodbBatchResponse.getBatchItemFailures()).hasSize(1); assertThat(wasCalledAndFailed.get()).isTrue(); StreamsEventResponse.BatchItemFailure batchItemFailure = dynamodbBatchResponse.getBatchItemFailures().get(0); assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("4421584500000000017450439091"); diff --git a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/KinesisBatchProcessorTest.java b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/KinesisBatchProcessorTest.java index d78638e1d..059a4d2d0 100644 --- a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/KinesisBatchProcessorTest.java +++ b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/KinesisBatchProcessorTest.java @@ -20,17 +20,28 @@ import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.tests.annotations.Event; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; import org.mockito.Mock; import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; import software.amazon.lambda.powertools.batch.model.Product; -public class KinesisBatchProcessorTest { +class KinesisBatchProcessorTest { @Mock private Context context; + private final List threadList = Collections.synchronizedList(new ArrayList<>()); + + @AfterEach + public void clear() { + threadList.clear(); + } + private void processMessageSucceeds(KinesisEvent.KinesisEventRecord record, Context context) { // Great success } @@ -42,6 +53,34 @@ private void processMessageFailsForFixedMessage(KinesisEvent.KinesisEventRecord } } + private void processMessageInParallelSucceeds(KinesisEvent.KinesisEventRecord record, Context context) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void processMessageInParallelFailsForFixedMessage(KinesisEvent.KinesisEventRecord record, Context context) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (record.getKinesis().getSequenceNumber() + .equals("49545115243490985018280067714973144582180062593244200961")) { + throw new RuntimeException("fake exception"); + } + } + // A handler that throws an exception for _one_ of the deserialized products in the same messages public void processMessageFailsForFixedProduct(Product product, Context context) { if (product.getId() == 1234) { @@ -51,7 +90,7 @@ public void processMessageFailsForFixedProduct(Product product, Context context) @ParameterizedTest @Event(value = "kinesis_event.json", type = KinesisEvent.class) - public void batchProcessingSucceedsAndReturns(KinesisEvent event) { + void batchProcessingSucceedsAndReturns(KinesisEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withKinesisBatchHandler() @@ -61,12 +100,28 @@ public void batchProcessingSucceedsAndReturns(KinesisEvent event) { StreamsEventResponse kinesisBatchResponse = handler.processBatch(event, context); // Assert - assertThat(kinesisBatchResponse.getBatchItemFailures()).hasSize(0); + assertThat(kinesisBatchResponse.getBatchItemFailures()).isEmpty(); + } + + @ParameterizedTest + @Event(value = "kinesis_event_big.json", type = KinesisEvent.class) + void batchProcessingInParallelSucceedsAndReturns(KinesisEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withKinesisBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelSucceeds); + + // Act + StreamsEventResponse kinesisBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(kinesisBatchResponse.getBatchItemFailures()).isEmpty(); + assertThat(threadList).hasSizeGreaterThan(1); } @ParameterizedTest @Event(value = "kinesis_event.json", type = KinesisEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withMessage(KinesisEvent event) { + void shouldAddMessageToBatchFailure_whenException_withMessage(KinesisEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withKinesisBatchHandler() @@ -82,9 +137,28 @@ public void shouldAddMessageToBatchFailure_whenException_withMessage(KinesisEven "49545115243490985018280067714973144582180062593244200961"); } + @ParameterizedTest + @Event(value = "kinesis_event_big.json", type = KinesisEvent.class) + void batchProcessingInParallel_shouldAddMessageToBatchFailure_whenException_withMessage(KinesisEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withKinesisBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelFailsForFixedMessage); + + // Act + StreamsEventResponse kinesisBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(kinesisBatchResponse.getBatchItemFailures()).hasSize(1); + StreamsEventResponse.BatchItemFailure batchItemFailure = kinesisBatchResponse.getBatchItemFailures().get(0); + assertThat(batchItemFailure.getItemIdentifier()).isEqualTo( + "49545115243490985018280067714973144582180062593244200961"); + assertThat(threadList).hasSizeGreaterThan(1); + } + @ParameterizedTest @Event(value = "kinesis_event.json", type = KinesisEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withProduct(KinesisEvent event) { + void shouldAddMessageToBatchFailure_whenException_withProduct(KinesisEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withKinesisBatchHandler() @@ -102,7 +176,7 @@ public void shouldAddMessageToBatchFailure_whenException_withProduct(KinesisEven @ParameterizedTest @Event(value = "kinesis_event.json", type = KinesisEvent.class) - public void failingFailureHandlerShouldntFailBatch(KinesisEvent event) { + void failingFailureHandlerShouldntFailBatch(KinesisEvent event) { // Arrange AtomicBoolean wasCalled = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -118,7 +192,7 @@ public void failingFailureHandlerShouldntFailBatch(KinesisEvent event) { // Assert assertThat(kinesisBatchResponse).isNotNull(); - assertThat(kinesisBatchResponse.getBatchItemFailures().size()).isEqualTo(1); + assertThat(kinesisBatchResponse.getBatchItemFailures()).hasSize(1); assertThat(wasCalled.get()).isTrue(); StreamsEventResponse.BatchItemFailure batchItemFailure = kinesisBatchResponse.getBatchItemFailures().get(0); assertThat(batchItemFailure.getItemIdentifier()).isEqualTo( @@ -127,7 +201,7 @@ public void failingFailureHandlerShouldntFailBatch(KinesisEvent event) { @ParameterizedTest @Event(value = "kinesis_event.json", type = KinesisEvent.class) - public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(KinesisEvent event) { + void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(KinesisEvent event) { // Arrange AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -146,7 +220,7 @@ public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(KinesisEv // Assert assertThat(kinesisBatchResponse).isNotNull(); - assertThat(kinesisBatchResponse.getBatchItemFailures().size()).isEqualTo(1); + assertThat(kinesisBatchResponse.getBatchItemFailures()).hasSize(1); assertThat(wasCalledAndFailed.get()).isTrue(); StreamsEventResponse.BatchItemFailure batchItemFailure = kinesisBatchResponse.getBatchItemFailures().get(0); assertThat(batchItemFailure.getItemIdentifier()).isEqualTo( diff --git a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/SQSBatchProcessorTest.java b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/SQSBatchProcessorTest.java index 2f9429fa3..7dd51374e 100644 --- a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/SQSBatchProcessorTest.java +++ b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/SQSBatchProcessorTest.java @@ -20,20 +20,43 @@ import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.tests.annotations.Event; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; import org.mockito.Mock; import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; import software.amazon.lambda.powertools.batch.model.Product; -public class SQSBatchProcessorTest { +class SQSBatchProcessorTest { @Mock private Context context; + private final List threadList = Collections.synchronizedList(new ArrayList<>()); + + @AfterEach + public void clear() { + threadList.clear(); + } + // A handler that works private void processMessageSucceeds(SQSEvent.SQSMessage sqsMessage) { } + private void processMessageInParallelSucceeds(SQSEvent.SQSMessage sqsMessage) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + // A handler that throws an exception for _one_ of the sample messages private void processMessageFailsForFixedMessage(SQSEvent.SQSMessage message, Context context) { if (message.getMessageId().equals("e9144555-9a4f-4ec3-99a0-34ce359b4b54")) { @@ -41,8 +64,23 @@ private void processMessageFailsForFixedMessage(SQSEvent.SQSMessage message, Con } } + private void processMessageInParallelFailsForFixedMessage(SQSEvent.SQSMessage message, Context context) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (message.getMessageId().equals("e9144555-9a4f-4ec3-99a0-34ce359b4b54")) { + throw new RuntimeException("fake exception"); + } + } + // A handler that throws an exception for _one_ of the deserialized products in the same messages - public void processMessageFailsForFixedProduct(Product product, Context context) { + private void processMessageFailsForFixedProduct(Product product, Context context) { if (product.getId() == 12345) { throw new RuntimeException("fake exception"); } @@ -50,7 +88,7 @@ public void processMessageFailsForFixedProduct(Product product, Context context) @ParameterizedTest @Event(value = "sqs_event.json", type = SQSEvent.class) - public void batchProcessingSucceedsAndReturns(SQSEvent event) { + void batchProcessingSucceedsAndReturns(SQSEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -60,13 +98,28 @@ public void batchProcessingSucceedsAndReturns(SQSEvent event) { SQSBatchResponse sqsBatchResponse = handler.processBatch(event, context); // Assert - assertThat(sqsBatchResponse.getBatchItemFailures()).hasSize(0); + assertThat(sqsBatchResponse.getBatchItemFailures()).isEmpty(); } + @ParameterizedTest + @Event(value = "sqs_event_big.json", type = SQSEvent.class) + void parallelBatchProcessingSucceedsAndReturns(SQSEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withSqsBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelSucceeds); + + // Act + SQSBatchResponse sqsBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(sqsBatchResponse.getBatchItemFailures()).isEmpty(); + assertThat(threadList).hasSizeGreaterThan(1); + } @ParameterizedTest @Event(value = "sqs_event.json", type = SQSEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withMessage(SQSEvent event) { + void shouldAddMessageToBatchFailure_whenException_withMessage(SQSEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -81,9 +134,27 @@ public void shouldAddMessageToBatchFailure_whenException_withMessage(SQSEvent ev assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("e9144555-9a4f-4ec3-99a0-34ce359b4b54"); } + @ParameterizedTest + @Event(value = "sqs_event_big.json", type = SQSEvent.class) + void parallelBatchProcessing_shouldAddMessageToBatchFailure_whenException_withMessage(SQSEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withSqsBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelFailsForFixedMessage); + + // Act + SQSBatchResponse sqsBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(sqsBatchResponse.getBatchItemFailures()).hasSize(1); + SQSBatchResponse.BatchItemFailure batchItemFailure = sqsBatchResponse.getBatchItemFailures().get(0); + assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("e9144555-9a4f-4ec3-99a0-34ce359b4b54"); + assertThat(threadList).hasSizeGreaterThan(1); + } + @ParameterizedTest @Event(value = "sqs_fifo_event.json", type = SQSEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withSQSFIFO(SQSEvent event) { + void shouldAddMessageToBatchFailure_whenException_withSQSFIFO(SQSEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -103,7 +174,7 @@ public void shouldAddMessageToBatchFailure_whenException_withSQSFIFO(SQSEvent ev @ParameterizedTest @Event(value = "sqs_event.json", type = SQSEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withProduct(SQSEvent event) { + void shouldAddMessageToBatchFailure_whenException_withProduct(SQSEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -121,7 +192,7 @@ public void shouldAddMessageToBatchFailure_whenException_withProduct(SQSEvent ev @ParameterizedTest @Event(value = "sqs_event.json", type = SQSEvent.class) - public void failingFailureHandlerShouldntFailBatch(SQSEvent event) { + void failingFailureHandlerShouldntFailBatch(SQSEvent event) { // Arrange AtomicBoolean wasCalled = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -144,7 +215,7 @@ public void failingFailureHandlerShouldntFailBatch(SQSEvent event) { @ParameterizedTest @Event(value = "sqs_event.json", type = SQSEvent.class) - public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(SQSEvent event) { + void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(SQSEvent event) { // Arrange AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() diff --git a/powertools-batch/src/test/resources/dynamo_event_big.json b/powertools-batch/src/test/resources/dynamo_event_big.json new file mode 100644 index 000000000..fa0a75c24 --- /dev/null +++ b/powertools-batch/src/test/resources/dynamo_event_big.json @@ -0,0 +1,376 @@ +{ + "Records": [ + { + "eventID": "c4ca4238a0b923820dcc509a6f75849b", + "eventName": "INSERT", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439001", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899", + "userIdentity": { + "principalId": "dynamodb.amazonaws.com", + "type": "Service" + } + }, + { + "eventID": "c81e728d9d4c2f636f067f89cc14862c", + "eventName": "MODIFY", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439092", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3", + "eventName": "REMOVE", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439093", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "c4ca4238a0b923820dcc509a6f75849b", + "eventName": "INSERT", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439031", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899", + "userIdentity": { + "principalId": "dynamodb.amazonaws.com", + "type": "Service" + } + }, + { + "eventID": "c81e728d9d4c2f636f067f89cc14862c", + "eventName": "MODIFY", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439092", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3", + "eventName": "REMOVE", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439093", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "c4ca4238a0b923820dcc509a6f75849b", + "eventName": "INSERT", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439001", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899", + "userIdentity": { + "principalId": "dynamodb.amazonaws.com", + "type": "Service" + } + }, + { + "eventID": "c81e728d9d4c2f636f067f89cc14862c", + "eventName": "MODIFY", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439092", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3", + "eventName": "REMOVE", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439093", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "c4ca4238a0b923820dcc509a6f75849b", + "eventName": "INSERT", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439031", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899", + "userIdentity": { + "principalId": "dynamodb.amazonaws.com", + "type": "Service" + } + }, + { + "eventID": "c81e728d9d4c2f636f067f89cc14862c", + "eventName": "MODIFY", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439091", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3", + "eventName": "REMOVE", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439093", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + } + ] +} \ No newline at end of file diff --git a/powertools-batch/src/test/resources/kinesis_event_big.json b/powertools-batch/src/test/resources/kinesis_event_big.json new file mode 100644 index 000000000..57f702d27 --- /dev/null +++ b/powertools-batch/src/test/resources/kinesis_event_big.json @@ -0,0 +1,224 @@ +{ + "Records": [ + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200962", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200962", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200963", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200963", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200964", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200964", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200965", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200965", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + },{ + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200966", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200966", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200961", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200967", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200967", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200968", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200968", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200969", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200969", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200971", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200971", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200981", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200981", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200992", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200992", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244210961", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244210961", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + } + ] +} \ No newline at end of file diff --git a/powertools-batch/src/test/resources/sqs_event_big.json b/powertools-batch/src/test/resources/sqs_event_big.json new file mode 100644 index 000000000..f5c83f442 --- /dev/null +++ b/powertools-batch/src/test/resources/sqs_event_big.json @@ -0,0 +1,429 @@ +{ + "Records": [ + { + "messageId": "d9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1234,\n \"name\": \"product\",\n \"price\": 42\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "f9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1235,\n \"name\": \"product\",\n \"price\": 43\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "g9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1236,\n \"name\": \"product\",\n \"price\": 44\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "c9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1237,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "b4144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1238,\n \"name\": \"product\",\n \"price\": 486\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "a2144552-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1239,\n \"name\": \"product\",\n \"price\": 430\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "32144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1240,\n \"name\": \"product\",\n \"price\": 445\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "a9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1241,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "d9144555-9a4f-4ec3-99a0-34ce359b354", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1242,\n \"name\": \"product\",\n \"price\": 42\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "e9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1243,\n \"name\": \"product\",\n \"price\": 43\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "19144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1244,\n \"name\": \"product\",\n \"price\": 44\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "c9144555-9a4f-4ec3-99a0-34ce3a9b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1245,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "b4144555-9a4f-4ec3-99a5-34ce359b4b54", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1247,\n \"name\": \"product\",\n \"price\": 486\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "a2144555-9a4f-4ec3-97a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1248,\n \"name\": \"product\",\n \"price\": 430\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "3k144555-9a4f-4ec2-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1249,\n \"name\": \"product\",\n \"price\": 445\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "h9144555-9aaf-4ec3-99a0-34ce359b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1250,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "d9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1234,\n \"name\": \"product\",\n \"price\": 42\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "f9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1235,\n \"name\": \"product\",\n \"price\": 43\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "g9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1236,\n \"name\": \"product\",\n \"price\": 44\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "c9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1237,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "b4144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1238,\n \"name\": \"product\",\n \"price\": 486\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "a2144552-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1239,\n \"name\": \"product\",\n \"price\": 430\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "32144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1240,\n \"name\": \"product\",\n \"price\": 445\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "a9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1241,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "d9144555-9a4f-4ec3-99a0-34ce359b354", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1242,\n \"name\": \"product\",\n \"price\": 42\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + } + ] +} \ No newline at end of file