Skip to content

Commit

Permalink
extend SQS event handler
Browse files Browse the repository at this point in the history
- report_batch_item_failures for partial batch processing
- maximum_concurrency to provide limits to SQS event source scaling
- remove outdated documentation about FIFO queues being unsupported
  • Loading branch information
eviltwin committed Apr 8, 2024
1 parent 2efe244 commit 297d937
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ Optionally you can add [SNS message filters](http://docs.aws.amazon.com/sns/late
]
```

[SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) is also pulling messages from a stream. At this time, [only "Standard" queues can trigger lambda events, not "FIFO" queues](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html). Read the AWS Documentation carefully since Lambda calls the SQS DeleteMessage API on your behalf once your function completes successfully.
[SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) is also pulling messages from a stream. Read the AWS Documentation carefully since Lambda calls the SQS DeleteMessage API on your behalf once your function completes successfully.

```javascript
"events": [
Expand All @@ -629,6 +629,8 @@ Optionally you can add [SNS message filters](http://docs.aws.amazon.com/sns/late
"event_source": {
"arn": "arn:aws:sqs:us-east-1:12341234:your-queue-name-arn",
"batch_size": 10, // Max: 10. Use 1 to trigger immediate processing
"report_batch_item_failures": false, // whether the handler returns a `batchItemFailures` field
"maximum_concurrency": 0, // maximum number of parallel lambda instances for this event source.
"enabled": true // Default is false
}
}
Expand Down
19 changes: 19 additions & 0 deletions zappa/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,23 @@ def __init__(self, context, config):
def batch_window(self):
return self._config.get("batch_window", 1 if self.batch_size > 10 else 0)

@property
def function_response_types(self):
return (
["ReportBatchItemFailures"]
if self._config.get("report_batch_item_failures", False)
else []
)

@property
def scaling_config(self):
maximum_concurrency = self._config.get("maximum_concurrency", None)

if maximum_concurrency is None:
return {}

return {"MaximumConcurrency": maximum_concurrency}

def _get_uuid(self, function):
uuid = None
response = self._lambda.call(
Expand All @@ -297,6 +314,8 @@ def add(self, function):
EventSourceArn=self.arn,
BatchSize=self.batch_size,
MaximumBatchingWindowInSeconds=self.batch_window,
FunctionResponseTypes=self.function_response_types,
ScalingConfig=self.scaling_config,
Enabled=self.enabled,
)
LOG.debug(response)
Expand Down

0 comments on commit 297d937

Please sign in to comment.