Skip to content

Latest commit

 

History

History

kinesis-lambda-dynamodb

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 

python: 3.9 AWS: DynamoDB AWS: Kinesis test: unit test: integration

Python: Amazon Kinesis, AWS Lambda, Amazon DynamoDB Example

Introduction

This project contains an example of testing a small data processing system that processes records from an Amazon Kinesis Data Stream and stores the processed records in an Amazon DynamoDB table.

The project uses the AWS Serverless Application Model (SAM) CLI for configuration, testing and deployment.


Contents


Key Files in the Project

Top


About this Pattern

System Under Test (SUT)

The SUT is a streaming data processing system. A Lambda function has an Event Source Mapping to a Kinesis Data Stream. The Lambda Event Source Mapping (ESM) polls the Kinesis Data Stream and then synchronously invokes the Lambda function with a batch of messages. The Lambda function processes batches of messages and writes results to a DynamoDB Table.

System Under Test (SUT)

Goal

The goal of this example is to show how to test Lambda functions that are part of a streaming data processing application. In streaming workloads, the number of messages that are sent to Lambda in a batch can change with the rate of messages being published to the stream, so we show testing with different sized batches.

Description

In this pattern you will deploy a streaming workload where a Lambda function is triggered by messages in a Kinesis Data Stream. This project demonstrates several techniques for executing tests including running Lambda function locally with a simulated payload as well integration tests in the cloud.

System Under Test Description (SUT)

About this Example

This example contains an Amazon Kinesis Data Stream, AWS Lambda and Amazon DynamoDB table core resources.

The Amazon Kinesis Data Stream can stream and data but the AWS Lambda function in this example expects Kinesis Stream Event data to contain a JSON object with 2 properties, batch and id:

{
    "batch": "string",
    "id": "string"
}
  • batch: should be unique identifier that represents batch of test records that should all be processed before test is considered completed. Each record in the test batch should have a matching batch property value
  • id: unique identifier for each individual record. Each records should have a unique id property value

The AWS Lambda function processes records by writing them in batches into the DynamoDB table. The DynamoDB table item is a JSON object with format:

{
    "PK": "string",
    "SK": "string"
}

The AWS Lambda function converts the incoming event data into the processed record JSON, setting the PK (DDB Partition Key) to be the value of batch event record property and SK (DDB Sort Key) to be the value of id event record propert.

Unit Test

Unit Test description

This example contains a sample event with pre-generated records but we can also use the SAM CLI to generate a test event that matches the shape of the event that will be received by a Lambda functions subscribed to a Kinesis Data Stream. Another option to get test events is to log them, when logging is set to debug, in the Lambda function. This allows you to capture real world messages to use for testing, just ensure that they don’t contain any sensitive data.

When you configure the Lambda ESM for Kinesis you specify a batch size. This batch size functions as a maximum batch size. Your Lambda function may still be invoked with batches smaller than this, but not larger. Create a test events that contain a single message, your expected average message batch, and your max batch size. If the size of the individual records varies by a large amount you should also create some test payloads that have batches of larger records.

Unit Test Description

Run the Unit Test

mock_test.py

In the unit test, all references and calls to the DynamoDB service are mocked using aws-sdk-client-mock client. To run the unit tests

# Run from the project directory serverless-test-samples/python-test-samples/kinesis-lambda-dynamodb
# Create and Activate a Python Virtual Environment
# One-time setup

pip3 install virtualenv
python3 -m venv venv
source ./venv/bin/activate

# install dependencies
pip3 install -r tests/requirements.txt --use-pep517

# run unit tests with mocks
python3 -m pytest -s tests/unit  -v

Top


Integration Test

Integration Test description

In order to run integration tests in the cloud we will use an event listener pattern to capture the items processed in the SUT. While we could just look directly at the DynamoDB table in the SUT, using a second table allows us to capture timings and other metadata, it also makes this pattern more re-usable across different types of streaming applications. This event listener should only be deployed in non-production environments for the purposes of testing. In Step 1 in the diagram below the test establishes a long polling pattern with DynamoDB. In Step 2, the test uses the Kinesis Data Streams APIs to send messages into the SUT. The streaming application processes those messages and in Step 3 the Event Listener Lambda function receives the messages and writes them to the Event Listener DynamoDB table. The test receives the results via the long polling mechanism and examines the results.

Integration Test Description

Run the Integration Tests

test_kinesis.py

For integration tests, deploy the full stack before testing:

# Run from the project directory serverless-test-samples/python-test-samples/kinesis-lambda-dynamodb

sam build
sam deploy --guided

The integration tests needs to be provided a single environment variable AWS_SAM_STACK_NAME - the AWS CloudFormation Stack name of the stack that was deployed using the sam deploy command.

Set up the environment variables, replacing the <PLACEHOLDERS> with your values:

# Run from the project directory serverless-test-samples/python-test-samples/kinesis-lambda-dynamodb
# Set the environment variables AWS_SAM_STACK_NAME and (optionally)AWS_DEFAULT_REGION 
# to match the name of the stack and the region where you will test

AWS_SAM_STACK_NAME=<stack-name> AWS_DEFAULT_REGION=<region_name> python -m pytest -s tests/integration -v

Cleanup

To remove the stack, use the command:

sam delete

Top