Skip to content

Commit

Permalink
feat(v2): parallel batch processing (#1620)
Browse files Browse the repository at this point in the history
* implement parallel processing

* test parallel processing

* document parallel processing

* code review

* sqs // batch processing example

* rollback to AtomicBoolean

* complete sqs batch example with logs and traces annotations

* update doc

* update doc

* update doc
  • Loading branch information
jeromevdl authored Jul 3, 2024
1 parent 749e973 commit 1fa11c1
Show file tree
Hide file tree
Showing 21 changed files with 1,877 additions and 180 deletions.
94 changes: 65 additions & 29 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ stateDiagram-v2

* Reports batch item failures to reduce number of retries for a record upon errors
* Simple interface to process each batch record
* Parallel processing of batches
* Integrates with Java Events library and the deserialization module
* Build your own batch processor by extending primitives

Expand Down Expand Up @@ -110,16 +111,9 @@ You can use your preferred deployment framework to set the correct configuration
while the `powertools-batch` module handles generating the response, which simply needs to be returned as the result of
your Lambda handler.

A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found
[here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering
all of the batch sources.

For more information on configuring `ReportBatchItemFailures`,
see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting),
[Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting),and
[DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting).

A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found [here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering all the batch sources.

For more information on configuring `ReportBatchItemFailures`, see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting), [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting), and [DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting).


!!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires."
Expand Down Expand Up @@ -150,12 +144,10 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
return handler.processBatch(sqsEvent, context);
}



private void processMessage(Product p, Context c) {
// Process the product
}

}
```

Expand Down Expand Up @@ -276,7 +268,6 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
private void processMessage(Product p, Context c) {
// process the product
}

}
```

Expand Down Expand Up @@ -475,6 +466,51 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
}
```

## Parallel processing
You can choose to process batch items in parallel using the `BatchMessageHandler#processBatchInParallel()`
instead of `BatchMessageHandler#processBatch()`. Partial batch failure works the same way but items are processed
in parallel rather than sequentially.

This feature is available for SQS, Kinesis and DynamoDB Streams but cannot be
used with SQS FIFO. In that case, an `UnsupportedOperationException` is thrown.

!!! warning
Note that parallel processing is not always better than sequential processing,
and you should benchmark your code to determine the best approach for your use case.

!!! info
To get more threads available (more vCPUs), you need to increase the amount of memory allocated to your Lambda function.
While it is possible to increase the number of threads using Java options or custom thread pools,
in most cases the defaults work well, and changing them is more likely to decrease performance
(see [here](https://www.baeldung.com/java-when-to-use-parallel-stream#fork-join-framework)
and [here](https://dzone.com/articles/be-aware-of-forkjoinpoolcommonpool)).
In situations where this may be useful - such as performing IO-bound work in parallel - make sure to measure before and after!


=== "Example with SQS"

```java hl_lines="13"
public class SqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {

private final BatchMessageHandler<SQSEvent, SQSBatchResponse> handler;

public SqsBatchHandler() {
handler = new BatchMessageHandlerBuilder()
.withSqsBatchHandler()
.buildWithMessageHandler(this::processMessage, Product.class);
}

@Override
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
return handler.processBatchInParallel(sqsEvent, context);
}

private void processMessage(Product p, Context c) {
// Process the product
}
}
```


## Handling Messages

Expand All @@ -490,7 +526,7 @@ In general, the deserialized message handler should be used unless you need acce

=== "Raw Message Handler"

```java
```java hl_lines="4 7"
public void setup() {
BatchMessageHandler<SQSEvent, SQSBatchResponse> handler = new BatchMessageHandlerBuilder()
.withSqsBatchHandler()
Expand All @@ -505,7 +541,7 @@ In general, the deserialized message handler should be used unless you need acce

=== "Deserialized Message Handler"

```java
```java hl_lines="4 7"
public void setup() {
BatchMessageHandler<SQSEvent, SQSBatchResponse> handler = new BatchMessageHandlerBuilder()
.withSqsBatchHandler()
Expand All @@ -529,20 +565,20 @@ provide a custom failure handler.
Handlers can be provided when building the batch processor and are available for all event sources.
For instance for DynamoDB:

```java
BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler = new BatchMessageHandlerBuilder()
.withDynamoDbBatchHandler()
.withSuccessHandler((m) -> {
// Success handler receives the raw message
LOGGER.info("Message with sequenceNumber {} was successfully processed",
m.getDynamodb().getSequenceNumber());
})
.withFailureHandler((m, e) -> {
// Failure handler receives the raw message and the exception thrown.
LOGGER.info("Message with sequenceNumber {} failed to be processed: {}"
, e.getDynamodb().getSequenceNumber(), e);
})
.buildWithMessageHander(this::processMessage);
```java hl_lines="3 8"
BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler = new BatchMessageHandlerBuilder()
.withDynamoDbBatchHandler()
.withSuccessHandler((m) -> {
// Success handler receives the raw message
LOGGER.info("Message with sequenceNumber {} was successfully processed",
m.getDynamodb().getSequenceNumber());
})
.withFailureHandler((m, e) -> {
// Failure handler receives the raw message and the exception thrown.
LOGGER.info("Message with sequenceNumber {} failed to be processed: {}"
, e.getDynamodb().getSequenceNumber(), e);
})
.buildWithMessageHander(this::processMessage);
```

!!! info
Expand Down
63 changes: 58 additions & 5 deletions examples/powertools-examples-batch/deploy/sqs/template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ Globals:
Function:
Timeout: 20
Runtime: java11
MemorySize: 512
Tracing: Active
MemorySize: 5400
Environment:
Variables:
POWERTOOLS_LOG_LEVEL: INFO
POWERTOOLS_LOGGER_SAMPLE_RATE: 1.0
POWERTOOLS_LOGGER_LOG_EVENT: true

Resources:
Expand Down Expand Up @@ -45,6 +43,9 @@ Resources:
AliasName: alias/powertools-batch-sqs-demo
TargetKeyId: !Ref CustomerKey

Bucket:
Type: AWS::S3::Bucket

DemoDlqSqsQueue:
Type: AWS::SQS::Queue
Properties:
Expand Down Expand Up @@ -96,11 +97,57 @@ Resources:
DemoSQSConsumerFunction:
Type: AWS::Serverless::Function
Properties:
Tracing: Active
CodeUri: ../..
Handler: org.demo.batch.sqs.SqsBatchHandler::handleRequest
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: sqs-demo
BUCKET: !Ref Bucket
Policies:
- Statement:
- Sid: SQSDeleteGetAttribute
Effect: Allow
Action:
- sqs:DeleteMessageBatch
- sqs:GetQueueAttributes
Resource: !GetAtt DemoSqsQueue.Arn
- Sid: SQSSendMessageBatch
Effect: Allow
Action:
- sqs:SendMessageBatch
- sqs:SendMessage
Resource: !GetAtt DemoDlqSqsQueue.Arn
- Sid: SQSKMSKey
Effect: Allow
Action:
- kms:GenerateDataKey
- kms:Decrypt
Resource: !GetAtt CustomerKey.Arn
- Sid: WriteToS3
Effect: Allow
Action:
- s3:PutObject
Resource: !Sub ${Bucket.Arn}/*

# Events:
# MySQSEvent:
# Type: SQS
# Properties:
# Queue: !GetAtt DemoSqsQueue.Arn
# BatchSize: 100
# MaximumBatchingWindowInSeconds: 60

DemoSQSParallelConsumerFunction:
Type: AWS::Serverless::Function
Properties:
Tracing: Active
CodeUri: ../..
Handler: org.demo.batch.sqs.SqsParallelBatchHandler::handleRequest
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: sqs-demo
BUCKET: !Ref Bucket
Policies:
- Statement:
- Sid: SQSDeleteGetAttribute
Expand All @@ -121,13 +168,19 @@ Resources:
- kms:GenerateDataKey
- kms:Decrypt
Resource: !GetAtt CustomerKey.Arn
- Sid: WriteToS3
Effect: Allow
Action:
- s3:PutObject
Resource: !Sub ${Bucket.Arn}/*

Events:
MySQSEvent:
Type: SQS
Properties:
Queue: !GetAtt DemoSqsQueue.Arn
BatchSize: 2
MaximumBatchingWindowInSeconds: 300
BatchSize: 100
MaximumBatchingWindowInSeconds: 60

Outputs:
DemoSqsQueue:
Expand Down
11 changes: 11 additions & 0 deletions examples/powertools-examples-batch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
<version>${sdk.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2024 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.demo.batch.sqs;

import com.amazonaws.services.lambda.runtime.Context;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import org.demo.batch.model.Product;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.lambda.powertools.logging.Logging;
import software.amazon.lambda.powertools.tracing.Tracing;
import software.amazon.lambda.powertools.tracing.TracingUtils;

public class AbstractSqsBatchHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSqsBatchHandler.class);
private final ObjectMapper mapper = new ObjectMapper();
private final String bucket = System.getenv("BUCKET");
private final S3Client s3 = S3Client.builder().httpClient(UrlConnectionHttpClient.create()).build();
private final Random r = new Random();

/**
* Simulate some processing (I/O + S3 put request)
* @param p deserialized product
* @param context Lambda context
*/
@Logging
@Tracing
protected void processMessage(Product p, Context context) {
TracingUtils.putAnnotation("productId", p.getId());
TracingUtils.putAnnotation("Thread", Thread.currentThread().getName());
MDC.put("product", String.valueOf(p.getId()));
LOGGER.info("Processing product {}", p);

char c = (char)(r.nextInt(26) + 'a');
char[] chars = new char[1024 * 1000];
Arrays.fill(chars, c);
p.setName(new String(chars));
try {
File file = new File("/tmp/"+p.getId()+".json");
mapper.writeValue(file, p);
s3.putObject(
PutObjectRequest.builder().bucket(bucket).key(p.getId()+".json").build(), RequestBody.fromFile(file));
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
MDC.remove("product");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import org.demo.batch.model.Product;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.demo.batch.model.Product;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
import software.amazon.lambda.powertools.logging.Logging;
import software.amazon.lambda.powertools.tracing.Tracing;

public class SqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {
public class SqsBatchHandler extends AbstractSqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {
private static final Logger LOGGER = LoggerFactory.getLogger(SqsBatchHandler.class);
private final BatchMessageHandler<SQSEvent, SQSBatchResponse> handler;

Expand All @@ -20,14 +22,11 @@ public SqsBatchHandler() {
.buildWithMessageHandler(this::processMessage, Product.class);
}

@Logging
@Tracing
@Override
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
LOGGER.info("Processing batch of {} messages", sqsEvent.getRecords().size());
return handler.processBatch(sqsEvent, context);
}


private void processMessage(Product p, Context c) {
LOGGER.info("Processing product " + p);
}

}
Loading

0 comments on commit 1fa11c1

Please sign in to comment.