Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(v2): parallel batch processing #1620

Merged
merged 15 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 60 additions & 29 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ stateDiagram-v2

* Reports batch item failures to reduce number of retries for a record upon errors
* Simple interface to process each batch record
* Parallel processing of batches
* Integrates with Java Events library and the deserialization module
* Build your own batch processor by extending primitives

Expand Down Expand Up @@ -110,16 +111,9 @@ You can use your preferred deployment framework to set the correct configuration
while the `powertools-batch` module handles generating the response, which simply needs to be returned as the result of
your Lambda handler.

A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found
[here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering
all of the batch sources.

For more information on configuring `ReportBatchItemFailures`,
see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting),
[Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting),and
[DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting).

A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found [here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering all the batch sources.

For more information on configuring `ReportBatchItemFailures`, see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting), [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting), and [DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting).


!!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires."
Expand Down Expand Up @@ -150,12 +144,10 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
return handler.processBatch(sqsEvent, context);
}



private void processMessage(Product p, Context c) {
// Process the product
}

}
```

Expand Down Expand Up @@ -276,7 +268,6 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
private void processMessage(Product p, Context c) {
// process the product
}

}
```

Expand Down Expand Up @@ -475,6 +466,46 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
}
```

## Parallel processing
You can choose to process batch items in parallel using the `BatchMessageHandler#processBatchInParallel()`
instead of `BatchMessageHandler#processBatch()`. Partial batch failure works the same way but items are processed
in parallel rather than sequentially.

This feature is available for SQS, Kinesis and DynamoDB Streams but cannot be
used with SQS FIFO. In that case, items will be processed sequentially, even with the `processBatchInParallel` method.
jeromevdl marked this conversation as resolved.
Show resolved Hide resolved

!!! warning
Note that parallel processing is not always better than sequential processing,
and you should benchmark your code to determine the best approach for your use case.

!!! info
To get more threads available (more vCPUs), you need to increase the amount of memory allocated to your Lambda function.
While the exact vCPU allocation isn't published, from observing common patterns customers see an allocation of one vCPU per 1024 MB of memory.
jeromevdl marked this conversation as resolved.
Show resolved Hide resolved
scottgerring marked this conversation as resolved.
Show resolved Hide resolved

=== "Example with SQS"

```java hl_lines="13"
public class SqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {

private final BatchMessageHandler<SQSEvent, SQSBatchResponse> handler;

public SqsBatchHandler() {
handler = new BatchMessageHandlerBuilder()
.withSqsBatchHandler()
.buildWithMessageHandler(this::processMessage, Product.class);
}

@Override
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
return handler.processBatchInParallel(sqsEvent, context);
}

private void processMessage(Product p, Context c) {
// Process the product
}
}
```


## Handling Messages

Expand All @@ -490,7 +521,7 @@ In general, the deserialized message handler should be used unless you need acce

=== "Raw Message Handler"

```java
```java hl_lines="4 7"
public void setup() {
BatchMessageHandler<SQSEvent, SQSBatchResponse> handler = new BatchMessageHandlerBuilder()
.withSqsBatchHandler()
Expand All @@ -505,7 +536,7 @@ In general, the deserialized message handler should be used unless you need acce

=== "Deserialized Message Handler"

```java
```java hl_lines="4 7"
public void setup() {
BatchMessageHandler<SQSEvent, SQSBatchResponse> handler = new BatchMessageHandlerBuilder()
.withSqsBatchHandler()
Expand All @@ -529,20 +560,20 @@ provide a custom failure handler.
Handlers can be provided when building the batch processor and are available for all event sources.
For instance for DynamoDB:

```java
BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler = new BatchMessageHandlerBuilder()
.withDynamoDbBatchHandler()
.withSuccessHandler((m) -> {
// Success handler receives the raw message
LOGGER.info("Message with sequenceNumber {} was successfully processed",
m.getDynamodb().getSequenceNumber());
})
.withFailureHandler((m, e) -> {
// Failure handler receives the raw message and the exception thrown.
LOGGER.info("Message with sequenceNumber {} failed to be processed: {}"
, e.getDynamodb().getSequenceNumber(), e);
})
.buildWithMessageHander(this::processMessage);
```java hl_lines="3 8"
BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler = new BatchMessageHandlerBuilder()
.withDynamoDbBatchHandler()
.withSuccessHandler((m) -> {
// Success handler receives the raw message
LOGGER.info("Message with sequenceNumber {} was successfully processed",
m.getDynamodb().getSequenceNumber());
})
.withFailureHandler((m, e) -> {
// Failure handler receives the raw message and the exception thrown.
LOGGER.info("Message with sequenceNumber {} failed to be processed: {}"
, e.getDynamodb().getSequenceNumber(), e);
})
.buildWithMessageHander(this::processMessage);
```

!!! info
Expand Down
16 changes: 16 additions & 0 deletions powertools-batch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<!-- enforce multiple threads for parallel processing tests -->
<argLine>
-Djava.util.concurrent.ForkJoinPool.common.parallelism=4
</argLine>
</configuration>
</plugin>
</plugins>
</build>

Expand All @@ -47,6 +57,12 @@
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ public interface BatchMessageHandler<E, R> {
* @param context The lambda context
* @return A partial batch response
*/
public abstract R processBatch(E event, Context context);
R processBatch(E event, Context context);

/**
* Processes the given batch in parallel returning a partial batch
* response indicating the success and failure of individual
* messages within the batch. <br/>
* Note that parallel processing is not always better than sequential processing,
* and you should benchmark your code to determine the best approach for your use case. <br/>
* Also note that to get more threads available (more vCPUs),
* you need to increase the amount of memory allocated to your Lambda function. <br/>
jeromevdl marked this conversation as resolved.
Show resolved Hide resolved

*
* @param event The Lambda event containing the batch to process
* @param context The lambda context
* @return A partial batch response
*/
R processBatchInParallel(E event, Context context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC;

/**
* A batch message processor for DynamoDB Streams batches.
Expand All @@ -46,35 +49,60 @@ public DynamoDbBatchMessageHandler(Consumer<DynamodbEvent.DynamodbStreamRecord>

@Override
public StreamsEventResponse processBatch(DynamodbEvent event, Context context) {
List<StreamsEventResponse.BatchItemFailure> batchFailures = new ArrayList<>();
StreamsEventResponse response = StreamsEventResponse.builder().withBatchItemFailures(new ArrayList<>()).build();

for (DynamodbEvent.DynamodbStreamRecord record : event.getRecords()) {
try {
for (DynamodbEvent.DynamodbStreamRecord streamRecord : event.getRecords()) {
scottgerring marked this conversation as resolved.
Show resolved Hide resolved
processBatchItem(streamRecord, context).ifPresent(batchItemFailure -> response.getBatchItemFailures().add(batchItemFailure));
}

rawMessageHandler.accept(record, context);
// Report success if we have a handler
if (this.successHandler != null) {
this.successHandler.accept(record);
}
} catch (Throwable t) {
String sequenceNumber = record.getDynamodb().getSequenceNumber();
LOGGER.error("Error while processing record with id {}: {}, adding it to batch item failures",
sequenceNumber, t.getMessage());
LOGGER.error("Error was", t);
batchFailures.add(new StreamsEventResponse.BatchItemFailure(sequenceNumber));

// Report failure if we have a handler
if (this.failureHandler != null) {
// A failing failure handler is no reason to fail the batch
try {
this.failureHandler.accept(record, t);
} catch (Throwable t2) {
LOGGER.warn("failureHandler threw handling failure", t2);
}
return response;
}


@Override
public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context context) {
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();

List<StreamsEventResponse.BatchItemFailure> batchItemFailures = event.getRecords()
.parallelStream() // Parallel processing
.map(eventRecord -> {
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
jeromevdl marked this conversation as resolved.
Show resolved Hide resolved
return processBatchItem(eventRecord, context);
})
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());

return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
}

private Optional<StreamsEventResponse.BatchItemFailure> processBatchItem(DynamodbEvent.DynamodbStreamRecord streamRecord, Context context) {
try {
LOGGER.debug("Processing item {}", streamRecord.getEventID());

rawMessageHandler.accept(streamRecord, context);

// Report success if we have a handler
if (this.successHandler != null) {
this.successHandler.accept(streamRecord);
}
return Optional.empty();
} catch (Throwable t) {
String sequenceNumber = streamRecord.getDynamodb().getSequenceNumber();
LOGGER.error("Error while processing record with id {}: {}, adding it to batch item failures",
sequenceNumber, t.getMessage());
LOGGER.error("Error was", t);

// Report failure if we have a handler
if (this.failureHandler != null) {
// A failing failure handler is no reason to fail the batch
try {
this.failureHandler.accept(streamRecord, t);
} catch (Throwable t2) {
LOGGER.warn("failureHandler threw handling failure", t2);
}
}
return Optional.of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(sequenceNumber).build());
}

return new StreamsEventResponse(batchFailures);
}
}
Loading
Loading