From cf625e4daea8d61a00408bfb379d86616970ea30 Mon Sep 17 00:00:00 2001 From: Jerome Van Der Linden Date: Fri, 5 Apr 2024 14:55:45 +0200 Subject: [PATCH] document parallel processing --- docs/utilities/batch.md | 89 +++++++++++++++++++++++++++-------------- 1 file changed, 60 insertions(+), 29 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 7b571bc6e..d6e5e3654 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -30,6 +30,7 @@ stateDiagram-v2 * Reports batch item failures to reduce number of retries for a record upon errors * Simple interface to process each batch record +* Parallel processing of batches * Integrates with Java Events library and the deserialization module * Build your own batch processor by extending primitives @@ -110,16 +111,9 @@ You can use your preferred deployment framework to set the correct configuration while the `powertools-batch` module handles generating the response, which simply needs to be returned as the result of your Lambda handler. -A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found -[here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering -all of the batch sources. - -For more information on configuring `ReportBatchItemFailures`, -see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting), -[Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting),and -[DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting). - +A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found [here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering all the batch sources. +For more information on configuring `ReportBatchItemFailures`, see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting), [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting), and [DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting). !!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires." @@ -150,12 +144,10 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs. public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { return handler.processBatch(sqsEvent, context); } - - + private void processMessage(Product p, Context c) { // Process the product } - } ``` @@ -276,7 +268,6 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs. private void processMessage(Product p, Context c) { // process the product } - } ``` @@ -475,6 +466,46 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs. } ``` +## Parallel processing +You can choose to process batch items in parallel using the `BatchMessageHandler#processBatchInParallel()` +instead of `BatchMessageHandler#processBatch()`. Partial batch failure works the same way but items are processed +in parallel rather than sequentially. + +This feature is available for SQS, Kinesis and DynamoDB Streams but cannot be +used with SQS FIFO. In that case, items will be processed sequentially, even with the `processBatchInParallel` method. + +!!! warning + Note that parallel processing is not always better than sequential processing, + and you should benchmark your code to determine the best approach for your use case. + +!!! info + To get more threads available (more vCPUs), you need to increase the amount of memory allocated to your Lambda function. + While the exact vCPU allocation isn't published, from observing common patterns customers see an allocation of one vCPU per 1024 MB of memory. + +=== "Example with SQS" + + ```java hl_lines="13" + public class SqsBatchHandler implements RequestHandler { + + private final BatchMessageHandler handler; + + public SqsBatchHandler() { + handler = new BatchMessageHandlerBuilder() + .withSqsBatchHandler() + .buildWithMessageHandler(this::processMessage, Product.class); + } + + @Override + public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { + return handler.processBatchInParallel(sqsEvent, context); + } + + private void processMessage(Product p, Context c) { + // Process the product + } + } + ``` + ## Handling Messages @@ -490,7 +521,7 @@ In general, the deserialized message handler should be used unless you need acce === "Raw Message Handler" - ```java + ```java hl_lines="4 7" public void setup() { BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -505,7 +536,7 @@ In general, the deserialized message handler should be used unless you need acce === "Deserialized Message Handler" - ```java + ```java hl_lines="4 7" public void setup() { BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -529,20 +560,20 @@ provide a custom failure handler. Handlers can be provided when building the batch processor and are available for all event sources. For instance for DynamoDB: -```java - BatchMessageHandler handler = new BatchMessageHandlerBuilder() - .withDynamoDbBatchHandler() - .withSuccessHandler((m) -> { - // Success handler receives the raw message - LOGGER.info("Message with sequenceNumber {} was successfully processed", - m.getDynamodb().getSequenceNumber()); - }) - .withFailureHandler((m, e) -> { - // Failure handler receives the raw message and the exception thrown. - LOGGER.info("Message with sequenceNumber {} failed to be processed: {}" - , e.getDynamodb().getSequenceNumber(), e); - }) - .buildWithMessageHander(this::processMessage); +```java hl_lines="3 8" +BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withDynamoDbBatchHandler() + .withSuccessHandler((m) -> { + // Success handler receives the raw message + LOGGER.info("Message with sequenceNumber {} was successfully processed", + m.getDynamodb().getSequenceNumber()); + }) + .withFailureHandler((m, e) -> { + // Failure handler receives the raw message and the exception thrown. + LOGGER.info("Message with sequenceNumber {} failed to be processed: {}" + , e.getDynamodb().getSequenceNumber(), e); + }) + .buildWithMessageHander(this::processMessage); ``` !!! info