Skip to content

Commit

Permalink
document parallel processing
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromevdl committed Apr 5, 2024
1 parent f4515d2 commit cf625e4
Showing 1 changed file with 60 additions and 29 deletions.
89 changes: 60 additions & 29 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ stateDiagram-v2

* Reports batch item failures to reduce number of retries for a record upon errors
* Simple interface to process each batch record
* Parallel processing of batches
* Integrates with Java Events library and the deserialization module
* Build your own batch processor by extending primitives

Expand Down Expand Up @@ -110,16 +111,9 @@ You can use your preferred deployment framework to set the correct configuration
while the `powertools-batch` module handles generating the response, which simply needs to be returned as the result of
your Lambda handler.

A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found
[here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering
all of the batch sources.

For more information on configuring `ReportBatchItemFailures`,
see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting),
[Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting),and
[DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting).

A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found [here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering all the batch sources.

For more information on configuring `ReportBatchItemFailures`, see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting), [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting), and [DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting).


!!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires."
Expand Down Expand Up @@ -150,12 +144,10 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
return handler.processBatch(sqsEvent, context);
}



private void processMessage(Product p, Context c) {
// Process the product
}

}
```

Expand Down Expand Up @@ -276,7 +268,6 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
private void processMessage(Product p, Context c) {
// process the product
}

}
```

Expand Down Expand Up @@ -475,6 +466,46 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.
}
```

## Parallel processing
You can choose to process batch items in parallel using the `BatchMessageHandler#processBatchInParallel()`
instead of `BatchMessageHandler#processBatch()`. Partial batch failure works the same way but items are processed
in parallel rather than sequentially.

This feature is available for SQS, Kinesis and DynamoDB Streams but cannot be
used with SQS FIFO. In that case, items will be processed sequentially, even with the `processBatchInParallel` method.

!!! warning
Note that parallel processing is not always better than sequential processing,
and you should benchmark your code to determine the best approach for your use case.

!!! info
To get more threads available (more vCPUs), you need to increase the amount of memory allocated to your Lambda function.
While the exact vCPU allocation isn't published, from observing common patterns customers see an allocation of one vCPU per 1024 MB of memory.

=== "Example with SQS"

```java hl_lines="13"
public class SqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {

private final BatchMessageHandler<SQSEvent, SQSBatchResponse> handler;

public SqsBatchHandler() {
handler = new BatchMessageHandlerBuilder()
.withSqsBatchHandler()
.buildWithMessageHandler(this::processMessage, Product.class);
}

@Override
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
return handler.processBatchInParallel(sqsEvent, context);
}

private void processMessage(Product p, Context c) {
// Process the product
}
}
```


## Handling Messages

Expand All @@ -490,7 +521,7 @@ In general, the deserialized message handler should be used unless you need acce

=== "Raw Message Handler"

```java
```java hl_lines="4 7"
public void setup() {
BatchMessageHandler<SQSEvent, SQSBatchResponse> handler = new BatchMessageHandlerBuilder()
.withSqsBatchHandler()
Expand All @@ -505,7 +536,7 @@ In general, the deserialized message handler should be used unless you need acce

=== "Deserialized Message Handler"

```java
```java hl_lines="4 7"
public void setup() {
BatchMessageHandler<SQSEvent, SQSBatchResponse> handler = new BatchMessageHandlerBuilder()
.withSqsBatchHandler()
Expand All @@ -529,20 +560,20 @@ provide a custom failure handler.
Handlers can be provided when building the batch processor and are available for all event sources.
For instance for DynamoDB:

```java
BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler = new BatchMessageHandlerBuilder()
.withDynamoDbBatchHandler()
.withSuccessHandler((m) -> {
// Success handler receives the raw message
LOGGER.info("Message with sequenceNumber {} was successfully processed",
m.getDynamodb().getSequenceNumber());
})
.withFailureHandler((m, e) -> {
// Failure handler receives the raw message and the exception thrown.
LOGGER.info("Message with sequenceNumber {} failed to be processed: {}"
, e.getDynamodb().getSequenceNumber(), e);
})
.buildWithMessageHander(this::processMessage);
```java hl_lines="3 8"
BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler = new BatchMessageHandlerBuilder()
.withDynamoDbBatchHandler()
.withSuccessHandler((m) -> {
// Success handler receives the raw message
LOGGER.info("Message with sequenceNumber {} was successfully processed",
m.getDynamodb().getSequenceNumber());
})
.withFailureHandler((m, e) -> {
// Failure handler receives the raw message and the exception thrown.
LOGGER.info("Message with sequenceNumber {} failed to be processed: {}"
, e.getDynamodb().getSequenceNumber(), e);
})
.buildWithMessageHander(this::processMessage);
```

!!! info
Expand Down

0 comments on commit cf625e4

Please sign in to comment.