Skip to content

Commit

Permalink
update CloudWatch logs transformer Lambda (#5)
Browse files Browse the repository at this point in the history
* remove SAM deployment
* add a Makefile (just like other Lambdas)
* update CloudFormation to be invoked by Makefile
* change the names of some of the generated fields
* update code to be more Pythonic
  • Loading branch information
kdgregory authored Aug 28, 2021
1 parent fd5ee2c commit f47eb7e
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 358 deletions.
56 changes: 56 additions & 0 deletions lambda/cloudwatch-log-transform/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
.PHONY: default build upload deploy clean

DEPLOY_DIR ?= $(PWD)
ARTIFACT ?= log_transform.zip

S3_KEY ?= $(ARTIFACT)

STACK_NAME ?= LogsTransformer

DEPLOY_PARAMETERS = [
DEPLOY_PARAMETERS += {"ParameterKey": "LambdaName", "ParameterValue": "$(STACK_NAME)"},
DEPLOY_PARAMETERS += {"ParameterKey": "SourceBucket", "ParameterValue": "$(S3_BUCKET)"},
DEPLOY_PARAMETERS += {"ParameterKey": "SourceKey", "ParameterValue": "$(S3_KEY)"},
DEPLOY_PARAMETERS += {"ParameterKey": "SourceStreamName", "ParameterValue": "$(SOURCE_STREAM)"},
DEPLOY_PARAMETERS += {"ParameterKey": "DestinationStreamName", "ParameterValue": "$(DEST_STREAM)"}
DEPLOY_PARAMETERS += ]


default: build

build:
zip $(DEPLOY_DIR)/$(ARTIFACT) *.py

upload: build
aws s3 cp $(DEPLOY_DIR)/$(ARTIFACT) s3://$(S3_BUCKET)/$(S3_KEY)

deploy: upload
aws cloudformation create-stack \
--stack-name $(STACK_NAME) \
--template-body file://cloudformation.yml \
--capabilities CAPABILITY_NAMED_IAM \
--parameters '$(DEPLOY_PARAMETERS)'

subscribe:
STREAM_NAME=$$(aws cloudformation describe-stacks \
--stack-name $(STACK_NAME) \
--query "Stacks[].Parameters[?ParameterKey=='SourceStreamName'].ParameterValue" \
--output text) ; \
STREAM_ARN=$$(aws cloudformation describe-stacks \
--stack-name $(STACK_NAME) \
--query "Stacks[].Outputs[?OutputKey=='SourceArnOutput'].OutputValue" \
--output text) ; \
ROLE_ARN=$$(aws cloudformation describe-stacks \
--stack-name $(STACK_NAME) \
--query "Stacks[].Outputs[?OutputKey=='SubscriptionRoleOutput'].OutputValue" \
--output text) ; \
aws logs put-subscription-filter \
--log-group-name $(LOG_GROUP) \
--filter-name "Kinesis-$${STREAM_NAME}" \
--filter-pattern "" \
--destination-arn $${STREAM_ARN} \
--role-arn $${ROLE_ARN}

clean:
rm -rf $(DEPLOY_DIR)/$(ARTIFACT)
rm -rf __pycache__
174 changes: 74 additions & 100 deletions lambda/cloudwatch-log-transform/README.md
Original file line number Diff line number Diff line change
@@ -1,140 +1,114 @@
This function is part of a pipeline that copies events from CloudWatch Logs into
Elasticsearch. The entire pipeline consists of the following stages:

1. An application (typically a Lambda function) writes events to CloudWatch Logs.
2. A subscription on the log group copies records into a Kinesis stream.
3. This function reads the records from the source Kinesis stream, transforms them
if necessary, and writes them to another Kinesis stream.
4. Kinesis Firehose reads events from this second stream and writes them to
Elasticsearch.

As part of step 3, this function performs the following transformations on the source
records:

* If they're not already JSON, they're converted to JSON with `timestamp`, `message`,
and `level` fields. The timestamp is formatted as an ISO-8601 datetime, and the
level is always `INFO`.
This Lambda decomposes messages that have been written to a Kinesis stream by a CloudWatch
Logs subscription, writing them to a destination stream as individual log events. See [this
blog post](https://blog.kdgregory.com/2019/09/streaming-cloudwatch-logs-to.html) for more
information about why this is necessary.

The specific transformations are:

* Multiple log events are broken out of the source Kinesis record and written as distinct
records on the destination stream.
* If a log event is not JSON, it is transformed into a JSON object containing the fields
`timestamp`, `message`, and `level`. The timestamp is the log event timestamp reported
by CloudWatch, and is formatted as an ISO-8601 datetime (eg, "2021-08-23-11:15:12Z").
The level is always `INFO`.
* If the log event is already JSON, it is examined and a `timestamp` field added if one
doesn't already exist, using the value/format above.
* The origin log group and log stream names are added, as a child object under the key
`cloudwatch` (this object has two fields, `logGroup` and `logStream`).
`source`. This object has two fields, `logGroup` and `logStream`.
* If the message appears to be a Lambda execution report, it is parsed, and the stats
are stored in a sub-object under the key `lambda`.
* If the message appears to by output from the Python logger, it is parsed, the original
timestamp and logged message are extracted, and the Lambda request ID is stored in a
child object under the key `lambda`.
* If the message appears to be output from the [Lambda Python logging
library](https://docs.aws.amazon.com/lambda/latest/dg/python-logging.html#python-logging-lib),
it is parsed, the original timestamp and logged message are extracted, and the Lambda
request ID is stored in a child object under the key `lambda`.

## Warnings and Caveats
Warnings and Caveats

This function makes a _best-effort_ attempt to post messages to the destination stream:
it will retry any individual messages that are rejected by the destination stream
(typically due to throttling at the shard level), until the Lambda times out. Messages that
are rejected due to "internal error" are logged and dropped. Any other exception causes the
function to abort (they typically indicate misconfiguration, and are unrecoverable).
* All CloudWatch log groups must be subscribed to a _single_ Kinesis stream, which is then
processed by this Lambda and written to a _single_ Kinesis destination stream.

You may also find duplicate messages: the Kinesis trigger will retry on any failed send.
If this is due to persistent throttling, then the messages that _were_ successfully sent
in a prior batch will be resent.
* This function makes a _best-effort_ attempt to post messages to the destination stream:
it will retry any individual messages that are rejected by the destination stream
(typically due to throttling at the shard level), until the Lambda times out. Messages that
are rejected due to "internal error" are logged and dropped. Any other exception causes the
function to abort (they typically indicate misconfiguration, and are unrecoverable).

* You may also find duplicate messages: the Kinesis trigger will retry on any failed send.
If this is due to persistent throttling, then the messages that _were_ successfully sent
in a prior batch will be resent.

## Lambda Configuration

Runtime: Python 3.x
## Lambda Configuration

Required Memory: 128 MB
General:

Recommended Timeout: 60 sec
* Runtime: Python 3.7+
* Recommended Memory: 512 MB (for CPU; actual memory requirement is much lower)
* Recommended Timeout: 30 seconds


### Environment variables
Environment variables

* `DESTINATION_STREAM_NAME`: the name of the Kinesis stream where messages will be written.


### Permissions Required
Permissions Required

* `AWSLambdaBasicExecutionRole`
* Source stream: `kinesis:DescribeStream`, `kinesis:GetRecords`, `kinesis:GetShardIterator`,
`kinesis:ListStreams`
* Destination stream: `kinesis:PutRecords`


## Deployment

Deploying this function is a multi-step process, so I've created CloudFormation templates
to help. I've also created a Serverless Application Model (SAM) template for the Lambda
function.

### Subscription Stream

> Note: I assume that you already have a logging pipeline with destination stream, Firehose,
and Elasticsearch. If not, you can find CloudFormation templates to set up a pipeline
[here](https://github.com/kdgregory/log4j-aws-appenders/tree/master/examples/cloudformation).

The Kinesis stream and CloudWatch subscription are created as two separate steps: the
subscription can't be created until the stream has become active, and CloudFormation
doesn't support tracking of resources (other than wait conditions, which require manual
intervention).
## Building and Deploying

* The [Kinesis](cloudformation/kinesis.yml) template creates a single-shard Kinesis
stream and the IAM role that allows CloudWatch to write to that stream. The stream
name is specified via the `StreamName` parameter.
*Note:* The source and destination Kinesis streams must exist before deploying this Lambda.

* The [Subscription](cloudformation/subscription.yml) template subscribes a single
log group, specified with the `LogGroupName` parameter, to the Kinesis stream
specified with the `StreamName` parameter (the default values for this parameter
are the same in both templates).
The easiest way to build and deploy is with `make`. The provided Makefile has three targets
for the Lambda:

For actual use, you'll probably create multiple subscriptions; all can go to the
same stream (although you might need to increase the shard count to handle load).
In that case, simply replicate the subscription resource (giving each a unique
name), and hardcode the log group name rather than using a parameter.
* `build`: builds the deployment bundle (`log_transform.zip`) and stores it in the project directory.
You would normally invoke this target only if you're making changes and want to upload manually.

### Lambda (CloudFormation)
* `upload`: builds the deployment bundle and then uploads it to an S3 bucket. You must provide
the name of the bucket when invoking this target; you can optionally give the bundle a different
name:

The [Lambda](cloudformation/lambda.yml) template creates the Lambda function to
transform log events and write them to Kinesis, the execution role that lets it
do its job, and an event source that attaches it to the Kinesis stream created
above. It uses for following parameters to control its operation:
```
# option 1, use predefined key
make upload S3_BUCKET=my_deployment_bucket
* `LambdaName`: The name of the function to create (default: `CloudWatchLogsTransformer`)
* `SourceBucket: The S3 bucket where the deployment bundle can be found (see below).
* `SourceKey: The path in that bucket for the deployment bundle (see below).
* `SourceStreamName: The Kinesis stream that contains CloudWatch Logs events (which
you created above).
* `DestinationStreamName: The Kinesis stream for transformed log messages (which you
created previously).
# option 2, use explicit key
make upload S3_BUCKET=my_deployment_bucket S3_KEY=my_bundle_name.zip
```

CloudFormation requires you to provide a deployment bundle for a Lambda function, even
when it's just a single file. So, from the project directory, execute the following
commands, replacing `YOUR_BUCKET_NAME` with an existing bucket that belongs to you:
* `deploy`: builds the deployment bundle, uploads it to S3, and then creates a CloudFormation
stack (by default named `LogsTransformer`, which is also the name of the created Lamdba)
that creates the Lambda and all related resources. You must provide the names of the Kinesis
streams, and may override the stack (Lambda) name.

```
zip /tmp/cloudwatch_logs_transformer.zip lambda_function.py
aws s3 cp /tmp/cloudwatch_logs_transformer.zip s3://YOUR_BUCKET_NAME/cloudwatch_logs_transformer.zip
```
```
# option 1: use defaults
make deploy S3_BUCKET=my_deployment_bucket SOURCE_STREAM=subscription_dest DEST_STREAM=log_aggregator
The event source mapping is hardcoded to start reading from the end of the stream,
with a maximum batch size of 100 records and a maximum delay of 30 seconds.
# option 2: specify stack name and deployment bundle
make deploy STACK_NAME=CloudWatchSubscriptionTransformer S3_BUCKET=my_deployment_bucket S3_KEY=my_bundle_name.zip SOURCE_STREAM=subscription_dest DEST_STREAM=log_aggregator
```

In addition to creating all resources for the Lambda, this stack also creates a role that allows
CloudWatch logs to write to the Kinesis stream. This role has the name `STACKNAME-SubscriptionRole`.

### Serverless Application Model (SAM)

To avoid manually ZIPping and uploading the Lambda function, you can [install the SAM
cli](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html),
then execute the following commands from within the `sam` sub-directory (again, replacing
`YOUR_BUCKET_NAME` with your actual bucket name):
Finally, the Makefile also provides a target to subscribe a CloudWatch log group to a Kinesis
stream, using information from the created stack:

```
cd sam
sam build
# option 1: use the default stack name
make subscribe LOG_GROUP=my_logs
sam package --s3-bucket YOUR_BUCKET_NAME --output-template output.yaml
# option 2: use a custom stack name
make subscribe STACK_NAME=CloudWatchSubscriptionTransformer LOG_GROUP=my_logs
```

You can then use the CloudFormation console to create the stack. This variant requires the
same parameters as the CloudFormation variant, except the source bucket/key (because SAM
will fill those in automatically).

*Note:* SAM wants the function source code to be in a `src` directory. To avoid duplication,
I've used a symlink. If you're running on Windows you will need to copy the file explicitly.
This last target implemented using the AWS CLI, not CloudFormation. You can subscribe as many
log groups as you'd like to a single Kinesis stream, but must use the Console to remove the
subscription.
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,17 @@ Resources:
- "kinesis:DescribeStream"
- "kinesis:GetShardIterator"
- "kinesis:GetRecords"
Resource: [ !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${SourceStreamName}" ]
Resource: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${SourceStreamName}"
-
PolicyName: "WriteToDestination"
PolicyDocument:
Version: "2012-10-17"
Statement:
Effect: "Allow"
Action: [ "kinesis:PutRecords" ]
Resource: [ !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${DestinationStreamName}" ]
Action:
- "kinesis:PutRecords"
Resource: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${DestinationStreamName}"


LambdaFunction:
Type: "AWS::Lambda::Function"
Expand All @@ -78,12 +80,13 @@ Resources:
Code:
S3Bucket: !Ref SourceBucket
S3Key: !Ref SourceKey
MemorySize: 128
Timeout: 60
MemorySize: 512
Timeout: 30
Environment:
Variables:
DESTINATION_STREAM_NAME: !Ref DestinationStreamName


EventSource:
Type: "AWS::Lambda::EventSourceMapping"
Properties:
Expand All @@ -93,3 +96,39 @@ Resources:
StartingPosition: LATEST
BatchSize: 100
MaximumBatchingWindowInSeconds: 30


SubscriptionRole:
Type: "AWS::IAM::Role"
Properties:
RoleName: !Sub "${LambdaName}-SubscriptionRole"
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
Effect: "Allow"
Principal:
Service: !Sub "logs.${AWS::Region}.amazonaws.com"
Action: "sts:AssumeRole"
Policies:
-
PolicyName: "KinesisWriter"
PolicyDocument:
Version: "2012-10-17"
Statement:
Effect: "Allow"
Action:
- "kinesis:Describe*"
- "kinesis:CreateStream"
- "kinesis:Put*"
Resource: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${SourceStreamName}"


Outputs:

SubscriptionRoleOutput:
Description: "The ARN of a role that allows CloudWatchLogs to write to the source stream"
Value: !GetAtt SubscriptionRole.Arn

SourceArnOutput:
Description: "The ARN of a the source stream (exposed to make subscribing easier)"
Value: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${SourceStreamName}"
41 changes: 0 additions & 41 deletions lambda/cloudwatch-log-transform/cloudformation/kinesis.yml

This file was deleted.

Loading

0 comments on commit f47eb7e

Please sign in to comment.