diff --git a/lambda/cloudwatch-log-transform/Makefile b/lambda/cloudwatch-log-transform/Makefile new file mode 100644 index 0000000..3a66ace --- /dev/null +++ b/lambda/cloudwatch-log-transform/Makefile @@ -0,0 +1,56 @@ +.PHONY: default build upload deploy clean + +DEPLOY_DIR ?= $(PWD) +ARTIFACT ?= log_transform.zip + +S3_KEY ?= $(ARTIFACT) + +STACK_NAME ?= LogsTransformer + +DEPLOY_PARAMETERS = [ +DEPLOY_PARAMETERS += {"ParameterKey": "LambdaName", "ParameterValue": "$(STACK_NAME)"}, +DEPLOY_PARAMETERS += {"ParameterKey": "SourceBucket", "ParameterValue": "$(S3_BUCKET)"}, +DEPLOY_PARAMETERS += {"ParameterKey": "SourceKey", "ParameterValue": "$(S3_KEY)"}, +DEPLOY_PARAMETERS += {"ParameterKey": "SourceStreamName", "ParameterValue": "$(SOURCE_STREAM)"}, +DEPLOY_PARAMETERS += {"ParameterKey": "DestinationStreamName", "ParameterValue": "$(DEST_STREAM)"} +DEPLOY_PARAMETERS += ] + + +default: build + +build: + zip $(DEPLOY_DIR)/$(ARTIFACT) *.py + +upload: build + aws s3 cp $(DEPLOY_DIR)/$(ARTIFACT) s3://$(S3_BUCKET)/$(S3_KEY) + +deploy: upload + aws cloudformation create-stack \ + --stack-name $(STACK_NAME) \ + --template-body file://cloudformation.yml \ + --capabilities CAPABILITY_NAMED_IAM \ + --parameters '$(DEPLOY_PARAMETERS)' + +subscribe: + STREAM_NAME=$$(aws cloudformation describe-stacks \ + --stack-name $(STACK_NAME) \ + --query "Stacks[].Parameters[?ParameterKey=='SourceStreamName'].ParameterValue" \ + --output text) ; \ + STREAM_ARN=$$(aws cloudformation describe-stacks \ + --stack-name $(STACK_NAME) \ + --query "Stacks[].Outputs[?OutputKey=='SourceArnOutput'].OutputValue" \ + --output text) ; \ + ROLE_ARN=$$(aws cloudformation describe-stacks \ + --stack-name $(STACK_NAME) \ + --query "Stacks[].Outputs[?OutputKey=='SubscriptionRoleOutput'].OutputValue" \ + --output text) ; \ + aws logs put-subscription-filter \ + --log-group-name $(LOG_GROUP) \ + --filter-name "Kinesis-$${STREAM_NAME}" \ + --filter-pattern "" \ + --destination-arn $${STREAM_ARN} \ + --role-arn $${ROLE_ARN} + +clean: + rm -rf $(DEPLOY_DIR)/$(ARTIFACT) + rm -rf __pycache__ diff --git a/lambda/cloudwatch-log-transform/README.md b/lambda/cloudwatch-log-transform/README.md index 533115e..737a427 100644 --- a/lambda/cloudwatch-log-transform/README.md +++ b/lambda/cloudwatch-log-transform/README.md @@ -1,55 +1,58 @@ -This function is part of a pipeline that copies events from CloudWatch Logs into -Elasticsearch. The entire pipeline consists of the following stages: - -1. An application (typically a Lambda function) writes events to CloudWatch Logs. -2. A subscription on the log group copies records into a Kinesis stream. -3. This function reads the records from the source Kinesis stream, transforms them - if necessary, and writes them to another Kinesis stream. -4. Kinesis Firehose reads events from this second stream and writes them to - Elasticsearch. - -As part of step 3, this function performs the following transformations on the source -records: - -* If they're not already JSON, they're converted to JSON with `timestamp`, `message`, - and `level` fields. The timestamp is formatted as an ISO-8601 datetime, and the - level is always `INFO`. +This Lambda decomposes messages that have been written to a Kinesis stream by a CloudWatch +Logs subscription, writing them to a destination stream as individual log events. See [this +blog post](https://blog.kdgregory.com/2019/09/streaming-cloudwatch-logs-to.html) for more +information about why this is necessary. + +The specific transformations are: + +* Multiple log events are broken out of the source Kinesis record and written as distinct + records on the destination stream. +* If a log event is not JSON, it is transformed into a JSON object containing the fields + `timestamp`, `message`, and `level`. The timestamp is the log event timestamp reported + by CloudWatch, and is formatted as an ISO-8601 datetime (eg, "2021-08-23-11:15:12Z"). + The level is always `INFO`. +* If the log event is already JSON, it is examined and a `timestamp` field added if one + doesn't already exist, using the value/format above. * The origin log group and log stream names are added, as a child object under the key - `cloudwatch` (this object has two fields, `logGroup` and `logStream`). + `source`. This object has two fields, `logGroup` and `logStream`. * If the message appears to be a Lambda execution report, it is parsed, and the stats are stored in a sub-object under the key `lambda`. -* If the message appears to by output from the Python logger, it is parsed, the original - timestamp and logged message are extracted, and the Lambda request ID is stored in a - child object under the key `lambda`. +* If the message appears to be output from the [Lambda Python logging + library](https://docs.aws.amazon.com/lambda/latest/dg/python-logging.html#python-logging-lib), + it is parsed, the original timestamp and logged message are extracted, and the Lambda + request ID is stored in a child object under the key `lambda`. -## Warnings and Caveats +Warnings and Caveats -This function makes a _best-effort_ attempt to post messages to the destination stream: -it will retry any individual messages that are rejected by the destination stream -(typically due to throttling at the shard level), until the Lambda times out. Messages that -are rejected due to "internal error" are logged and dropped. Any other exception causes the -function to abort (they typically indicate misconfiguration, and are unrecoverable). +* All CloudWatch log groups must be subscribed to a _single_ Kinesis stream, which is then + processed by this Lambda and written to a _single_ Kinesis destination stream. -You may also find duplicate messages: the Kinesis trigger will retry on any failed send. -If this is due to persistent throttling, then the messages that _were_ successfully sent -in a prior batch will be resent. +* This function makes a _best-effort_ attempt to post messages to the destination stream: + it will retry any individual messages that are rejected by the destination stream + (typically due to throttling at the shard level), until the Lambda times out. Messages that + are rejected due to "internal error" are logged and dropped. Any other exception causes the + function to abort (they typically indicate misconfiguration, and are unrecoverable). +* You may also find duplicate messages: the Kinesis trigger will retry on any failed send. + If this is due to persistent throttling, then the messages that _were_ successfully sent + in a prior batch will be resent. -## Lambda Configuration -Runtime: Python 3.x +## Lambda Configuration -Required Memory: 128 MB +General: -Recommended Timeout: 60 sec +* Runtime: Python 3.7+ +* Recommended Memory: 512 MB (for CPU; actual memory requirement is much lower) +* Recommended Timeout: 30 seconds -### Environment variables +Environment variables * `DESTINATION_STREAM_NAME`: the name of the Kinesis stream where messages will be written. -### Permissions Required +Permissions Required * `AWSLambdaBasicExecutionRole` * Source stream: `kinesis:DescribeStream`, `kinesis:GetRecords`, `kinesis:GetShardIterator`, @@ -57,84 +60,55 @@ Recommended Timeout: 60 sec * Destination stream: `kinesis:PutRecords` -## Deployment - -Deploying this function is a multi-step process, so I've created CloudFormation templates -to help. I've also created a Serverless Application Model (SAM) template for the Lambda -function. - -### Subscription Stream - -> Note: I assume that you already have a logging pipeline with destination stream, Firehose, - and Elasticsearch. If not, you can find CloudFormation templates to set up a pipeline - [here](https://github.com/kdgregory/log4j-aws-appenders/tree/master/examples/cloudformation). - -The Kinesis stream and CloudWatch subscription are created as two separate steps: the -subscription can't be created until the stream has become active, and CloudFormation -doesn't support tracking of resources (other than wait conditions, which require manual -intervention). +## Building and Deploying -* The [Kinesis](cloudformation/kinesis.yml) template creates a single-shard Kinesis - stream and the IAM role that allows CloudWatch to write to that stream. The stream - name is specified via the `StreamName` parameter. +*Note:* The source and destination Kinesis streams must exist before deploying this Lambda. -* The [Subscription](cloudformation/subscription.yml) template subscribes a single - log group, specified with the `LogGroupName` parameter, to the Kinesis stream - specified with the `StreamName` parameter (the default values for this parameter - are the same in both templates). +The easiest way to build and deploy is with `make`. The provided Makefile has three targets +for the Lambda: - For actual use, you'll probably create multiple subscriptions; all can go to the - same stream (although you might need to increase the shard count to handle load). - In that case, simply replicate the subscription resource (giving each a unique - name), and hardcode the log group name rather than using a parameter. +* `build`: builds the deployment bundle (`log_transform.zip`) and stores it in the project directory. + You would normally invoke this target only if you're making changes and want to upload manually. -### Lambda (CloudFormation) +* `upload`: builds the deployment bundle and then uploads it to an S3 bucket. You must provide + the name of the bucket when invoking this target; you can optionally give the bundle a different + name: -The [Lambda](cloudformation/lambda.yml) template creates the Lambda function to -transform log events and write them to Kinesis, the execution role that lets it -do its job, and an event source that attaches it to the Kinesis stream created -above. It uses for following parameters to control its operation: + ``` + # option 1, use predefined key + make upload S3_BUCKET=my_deployment_bucket -* `LambdaName`: The name of the function to create (default: `CloudWatchLogsTransformer`) -* `SourceBucket: The S3 bucket where the deployment bundle can be found (see below). -* `SourceKey: The path in that bucket for the deployment bundle (see below). -* `SourceStreamName: The Kinesis stream that contains CloudWatch Logs events (which - you created above). -* `DestinationStreamName: The Kinesis stream for transformed log messages (which you - created previously). + # option 2, use explicit key + make upload S3_BUCKET=my_deployment_bucket S3_KEY=my_bundle_name.zip + ``` -CloudFormation requires you to provide a deployment bundle for a Lambda function, even -when it's just a single file. So, from the project directory, execute the following -commands, replacing `YOUR_BUCKET_NAME` with an existing bucket that belongs to you: +* `deploy`: builds the deployment bundle, uploads it to S3, and then creates a CloudFormation + stack (by default named `LogsTransformer`, which is also the name of the created Lamdba) + that creates the Lambda and all related resources. You must provide the names of the Kinesis + streams, and may override the stack (Lambda) name. -``` -zip /tmp/cloudwatch_logs_transformer.zip lambda_function.py - -aws s3 cp /tmp/cloudwatch_logs_transformer.zip s3://YOUR_BUCKET_NAME/cloudwatch_logs_transformer.zip -``` + ``` + # option 1: use defaults + make deploy S3_BUCKET=my_deployment_bucket SOURCE_STREAM=subscription_dest DEST_STREAM=log_aggregator -The event source mapping is hardcoded to start reading from the end of the stream, -with a maximum batch size of 100 records and a maximum delay of 30 seconds. + # option 2: specify stack name and deployment bundle + make deploy STACK_NAME=CloudWatchSubscriptionTransformer S3_BUCKET=my_deployment_bucket S3_KEY=my_bundle_name.zip SOURCE_STREAM=subscription_dest DEST_STREAM=log_aggregator + ``` +In addition to creating all resources for the Lambda, this stack also creates a role that allows +CloudWatch logs to write to the Kinesis stream. This role has the name `STACKNAME-SubscriptionRole`. -### Serverless Application Model (SAM) - -To avoid manually ZIPping and uploading the Lambda function, you can [install the SAM -cli](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html), -then execute the following commands from within the `sam` sub-directory (again, replacing -`YOUR_BUCKET_NAME` with your actual bucket name): +Finally, the Makefile also provides a target to subscribe a CloudWatch log group to a Kinesis +stream, using information from the created stack: ``` -cd sam - -sam build +# option 1: use the default stack name +make subscribe LOG_GROUP=my_logs -sam package --s3-bucket YOUR_BUCKET_NAME --output-template output.yaml +# option 2: use a custom stack name +make subscribe STACK_NAME=CloudWatchSubscriptionTransformer LOG_GROUP=my_logs ``` -You can then use the CloudFormation console to create the stack. This variant requires the -same parameters as the CloudFormation variant, except the source bucket/key (because SAM -will fill those in automatically). - -*Note:* SAM wants the function source code to be in a `src` directory. To avoid duplication, -I've used a symlink. If you're running on Windows you will need to copy the file explicitly. +This last target implemented using the AWS CLI, not CloudFormation. You can subscribe as many +log groups as you'd like to a single Kinesis stream, but must use the Console to remove the +subscription. diff --git a/lambda/cloudwatch-log-transform/cloudformation/lambda.yml b/lambda/cloudwatch-log-transform/cloudformation.yml similarity index 66% rename from lambda/cloudwatch-log-transform/cloudformation/lambda.yml rename to lambda/cloudwatch-log-transform/cloudformation.yml index aab1b98..be99438 100644 --- a/lambda/cloudwatch-log-transform/cloudformation/lambda.yml +++ b/lambda/cloudwatch-log-transform/cloudformation.yml @@ -57,15 +57,17 @@ Resources: - "kinesis:DescribeStream" - "kinesis:GetShardIterator" - "kinesis:GetRecords" - Resource: [ !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${SourceStreamName}" ] + Resource: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${SourceStreamName}" - PolicyName: "WriteToDestination" PolicyDocument: Version: "2012-10-17" Statement: Effect: "Allow" - Action: [ "kinesis:PutRecords" ] - Resource: [ !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${DestinationStreamName}" ] + Action: + - "kinesis:PutRecords" + Resource: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${DestinationStreamName}" + LambdaFunction: Type: "AWS::Lambda::Function" @@ -78,12 +80,13 @@ Resources: Code: S3Bucket: !Ref SourceBucket S3Key: !Ref SourceKey - MemorySize: 128 - Timeout: 60 + MemorySize: 512 + Timeout: 30 Environment: Variables: DESTINATION_STREAM_NAME: !Ref DestinationStreamName + EventSource: Type: "AWS::Lambda::EventSourceMapping" Properties: @@ -93,3 +96,39 @@ Resources: StartingPosition: LATEST BatchSize: 100 MaximumBatchingWindowInSeconds: 30 + + + SubscriptionRole: + Type: "AWS::IAM::Role" + Properties: + RoleName: !Sub "${LambdaName}-SubscriptionRole" + AssumeRolePolicyDocument: + Version: "2012-10-17" + Statement: + Effect: "Allow" + Principal: + Service: !Sub "logs.${AWS::Region}.amazonaws.com" + Action: "sts:AssumeRole" + Policies: + - + PolicyName: "KinesisWriter" + PolicyDocument: + Version: "2012-10-17" + Statement: + Effect: "Allow" + Action: + - "kinesis:Describe*" + - "kinesis:CreateStream" + - "kinesis:Put*" + Resource: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${SourceStreamName}" + + +Outputs: + + SubscriptionRoleOutput: + Description: "The ARN of a role that allows CloudWatchLogs to write to the source stream" + Value: !GetAtt SubscriptionRole.Arn + + SourceArnOutput: + Description: "The ARN of a the source stream (exposed to make subscribing easier)" + Value: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${SourceStreamName}" diff --git a/lambda/cloudwatch-log-transform/cloudformation/kinesis.yml b/lambda/cloudwatch-log-transform/cloudformation/kinesis.yml deleted file mode 100644 index 6c0463e..0000000 --- a/lambda/cloudwatch-log-transform/cloudformation/kinesis.yml +++ /dev/null @@ -1,41 +0,0 @@ -AWSTemplateFormatVersion: "2010-09-09" -Description: "Creates a Kinesis stream" - -Parameters: - - StreamName: - Description: "Name of the Kinesis stream" - Type: "String" - Default: "CloudWatchSubscriptionDestination" - -Resources: - - KinesisStream: - Type: "AWS::Kinesis::Stream" - Properties: - Name: !Ref StreamName - ShardCount: 1 - - SubscriptionRole: - Type: "AWS::IAM::Role" - Properties: - RoleName: !Sub "${StreamName}-DeliveryRole" - AssumeRolePolicyDocument: - Version: "2012-10-17" - Statement: - Effect: "Allow" - Principal: - Service: !Sub "logs.${AWS::Region}.amazonaws.com" - Action: "sts:AssumeRole" - Policies: - - - PolicyName: "KinesisWriter" - PolicyDocument: - Version: "2012-10-17" - Statement: - Effect: "Allow" - Action: - - "kinesis:Describe*" - - "kinesis:CreateStream" - - "kinesis:Put*" - Resource: !GetAtt KinesisStream.Arn diff --git a/lambda/cloudwatch-log-transform/cloudformation/subscription.yml b/lambda/cloudwatch-log-transform/cloudformation/subscription.yml deleted file mode 100644 index a7d8686..0000000 --- a/lambda/cloudwatch-log-transform/cloudformation/subscription.yml +++ /dev/null @@ -1,24 +0,0 @@ -AWSTemplateFormatVersion: "2010-09-09" -Description: "Subscribes a CloudWatch log group to a Kinesis stream" - -Parameters: - - LogGroupName: - Description: "Name of the CloudWatch log group that will be subscribed (omit for no subscription)" - Type: "String" - Default: "AppenderExample" - - StreamName: - Description: "Name of the Kinesis stream that will receive log events" - Type: "String" - Default: "CloudWatchSubscriptionDestination" - -Resources: - - Subscription: - Type: "AWS::Logs::SubscriptionFilter" - Properties: - LogGroupName: !Ref LogGroupName - FilterPattern: "" - DestinationArn: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${StreamName}" - RoleArn: !Sub "arn:aws:iam::${AWS::AccountId}:role/${StreamName}-DeliveryRole" diff --git a/lambda/cloudwatch-log-transform/lambda_function.py b/lambda/cloudwatch-log-transform/lambda_function.py index 8772321..4259db2 100644 --- a/lambda/cloudwatch-log-transform/lambda_function.py +++ b/lambda/cloudwatch-log-transform/lambda_function.py @@ -1,24 +1,24 @@ -# Copyright 2019 Keith D Gregory -# +# Copyright 2019-2021 Keith D Gregory +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -## -## This function is attached to a Kinesis stream that is the destination for a -## CloudWatch Logs subscription, to write the log events to a different stream. -## Along the way it transforms the event to JSON ## (if it's not already) and -## adds information about the log stream and (for Lambdas) execution times. -## -################################################################################ + +""" This function is attached to a Kinesis stream that is the destination for a + CloudWatch Logs subscription, to write the log events to a different stream. + ## Along the way it transforms the event to JSON ## (if it's not already) and + ## adds information about the log stream and (for Lambdas) execution times. + """ + import base64 import boto3 @@ -31,32 +31,42 @@ from datetime import datetime, timezone + logging.basicConfig() logging.getLogger().setLevel(level=logging.INFO) -kinesisClient = boto3.client('kinesis') -kinesisStream = os.environ['DESTINATION_STREAM_NAME'] +## fail fast on bad configuration +kinesis_client = boto3.client('kinesis') +kinesis_stream = os.environ['DESTINATION_STREAM_NAME'] + -pythonLoggingRegex = re.compile(r'\[([A-Z]+)]\s+([0-9]{4}-.*Z)\s+([-0-9a-fA-F]{36})\s+(.*)') +python_logging_regex = re.compile(r'\[([A-Z]+)]\s+([0-9]{4}-.*Z)\s+([-0-9a-fA-F]{36})\s+(.*)') -lambdaStartRegex = re.compile(r'START RequestId:\s+([-0-9a-fA-F]{36})\s+Version:\s+(.+)') -lambdaFinishRegex = re.compile(r'END RequestId:\s+([-0-9a-fA-F]{36})') -lambdaReportRegex = re.compile(r'REPORT RequestId:\s+([-0-9a-fA-F]{36})\s+Duration:\s+([0-9.]+)\s+ms\s+Billed Duration:\s+([0-9]+)\s+ms\s+Memory Size:\s+([0-9]+)\s+MB\s+Max Memory Used:\s+([0-9]+)\s+MB') -lambdaExtendedRegex = re.compile(r'.*Init Duration:\s+([0-9.]+)\s+ms') -lambdaXRayRegex = re.compile(r'.*XRAY TraceId:\s+([0-9a-fA-F-]+)\s+SegmentId:\s+([0-9a-fA-F]+)\s+Sampled:\s+(true|false)') +lambda_start_regex = re.compile(r'START RequestId:\s+([-0-9a-fA-F]{36})\s+Version:\s+(.+)') +lambda_finish_regex = re.compile(r'END RequestId:\s+([-0-9a-fA-F]{36})') +lambda_report_regex = re.compile(r'REPORT RequestId:\s+([-0-9a-fA-F]{36})\s+Duration:\s+([0-9.]+)\s+ms\s+Billed Duration:\s+([0-9]+)\s+ms\s+Memory Size:\s+([0-9]+)\s+MB\s+Max Memory Used:\s+([0-9]+)\s+MB') +lambda_extended_regex = re.compile(r'.*Init Duration:\s+([0-9.]+)\s+ms') +lambda_xray_regex = re.compile(r'.*XRAY TraceId:\s+([0-9a-fA-F-]+)\s+SegmentId:\s+([0-9a-fA-F]+)\s+Sampled:\s+(true|false)') def lambda_handler(event, context): - outputMessages = [] + output_messages = extract_messages(event) + logging.info(f'total number of messages to output: {len(output_messages)}') + logging.debug(f'output messages: {json.dumps(output_messages)}') + write_to_kinesis(output_messages) + + +def extract_messages(event): + """ Source records may contain multiple log messages, so we can't just use a list + comprehension to extract them. We also want to skip control records, and any + records that we can't process for unforeseen reasons. Thus this function. + """ + output_messages = [] for record in event['Records']: - outputMessages = outputMessages + process_input_record(record) - logging.info(f'total number of messages to output: {len(outputMessages)}') - logging.debug(f'output messages: {json.dumps(outputMessages)}') - write_to_kinesis(outputMessages) - -## each input record may be a data message with multiple log events, or a control message -## that indicates the start of processing; this function processes the first and ignores -## the second (as well as any other messages that might be on the stream) + output_messages += process_input_record(record) + return output_messages + + def process_input_record(record): try: payload = record['kinesis']['data'] @@ -64,25 +74,26 @@ def process_input_record(record): data = json.loads(decoded) message_type = data.get('messageType') if message_type == 'DATA_MESSAGE': - logGroup = data['logGroup'] - logStream = data['logStream'] + log_group = data['logGroup'] + log_stream = data['logStream'] events = data.get('logEvents', []) - logging.info(f'processing {len(events)} events from group "{logGroup}" / stream "{logStream}"') + logging.info(f'processing {len(events)} events from group "{log_group}" / stream "{log_stream}"') logging.debug(f'input events: {json.dumps(events)}') - return [transform_log_event(logGroup, logStream, event) for event in events] + return [transform_log_event(log_group, log_stream, event) for event in events] elif message_type == 'CONTROL_MESSAGE': - logging.info('skipping control message') + logging.debug('skipping control message') elif message_type: - logging.warn(f'unexpected message type: {message_type}') + logging.warning(f'unexpected message type: {message_type}') except: logging.error(f'failed to process record; keys = {record.keys()}', exc_info=True) # fall-through for any unprocessed messages (exception or unhandled message type) return [] -## turns the message into JSON if it isn't already, recognizing standard logging -## formats, and adding tracking fields -def transform_log_event(logGroup, logStream, event): +def transform_log_event(log_group, log_stream, event): + """ Turns the message into JSON if it isn't already, recognizing standard logging + formats; adds tracking fields. + """ message = event.get('message', '').strip() result = try_parse_json(message) if not result: @@ -94,17 +105,15 @@ def transform_log_event(logGroup, logStream, event): 'level': 'INFO', 'message': message } - - result['cloudwatch'] = { - 'logGroup': logGroup, - 'logStream': logStream + + result['source'] = { + 'logGroup': log_group, + 'logStream': log_stream } opt_add_timestamp(result, event) return result -## attempts to parse the passed message as JSON, returning the parsed representation -## if successful; otherwise returns a JSON object with a single "message" element def try_parse_json(message): if message.startswith('{') and message.endswith('}'): try: @@ -113,10 +122,11 @@ def try_parse_json(message): pass -## attempts to parse the passed message as output from the Lambda Python logger, -## as documented here: https://docs.aws.amazon.com/lambda/latest/dg/python-logging.html def try_parse_python_log(message): - match = pythonLoggingRegex.match(message) + """ Attempts to parse the passed message as output from the Lambda Python logger, as + documented here: https://docs.aws.amazon.com/lambda/latest/dg/python-logging.html + """ + match = python_logging_regex.match(message) if match: return { 'level': match.group(1), @@ -126,25 +136,26 @@ def try_parse_python_log(message): } -## if the message matches one of the Lambda status messages, extracts relevant information def try_parse_lambda_status(message): + """ Attempts to parse the message as one of the standard Lambda status messages. + """ try: if message.startswith('START RequestId:'): - match = lambdaStartRegex.match(message) + match = lambda_start_regex.match(message) if match: data = { 'requestId': match.group(1), 'version': match.group(2) } elif message.startswith('END RequestId:'): - match = lambdaFinishRegex.match(message) + match = lambda_finish_regex.match(message) if match: data = { 'requestId': match.group(1) } elif message.startswith('REPORT RequestId:'): message = message.replace('\n', '\t') - match = lambdaReportRegex.match(message) + match = lambda_report_regex.match(message) if match: data = { 'requestId': match.group(1), @@ -153,16 +164,16 @@ def try_parse_lambda_status(message): 'maxMemoryMb': int(match.group(4)), 'usedMemoryMb': int(match.group(5)) } - # these next two appear to be rolling out on a per-region basis, - # so aren't part of the base regex - match = lambdaExtendedRegex.match(message) + # initialization stats are only reported for first invocation + match = lambda_extended_regex.match(message) if match: - data['initialDurationMs'] = float(match.group(1)) - match = lambdaXRayRegex.match(message) + data['initializationMs'] = float(match.group(1)) + # x-ray stats are only reported if enabled + match = lambda_xray_regex.match(message) if match: - data['XRayTraceId'] = match.group(1) - data['XRaySegment'] = match.group(2) - data['XRaySampled'] = match.group(3) + data['xrayTraceId'] = match.group(1) + data['xraySegment'] = match.group(2) + data['xraySampled'] = match.group(3) if data: return { 'level': 'INFO', @@ -173,19 +184,21 @@ def try_parse_lambda_status(message): pass - -## if the passed data field already has a "timestamp" element, it's returned unchanged -## otherwise the passed event timestamp is formatted and added to the message def opt_add_timestamp(data, event): + """ If the passed data field already has a "timestamp" element, it's returned + unchanged. Otherwise the log event's timestamp is formatted and added to + the message. + """ if data.get('timestamp'): return dt = datetime.fromtimestamp(event['timestamp'] / 1000.0, tz=timezone.utc) data['timestamp'] = dt.isoformat() -## makes a best-effort attempt to write all messages to Kinesis, batching them -## as needed to meet the limits of PutRecords def write_to_kinesis(listOfEvents): + """ Makes a best-effort attempt to write all messages to Kinesis, batching them + as needed to meet the limits of PutRecords. + """ records = prepare_records(listOfEvents) while records: records = process_batch(records) @@ -194,79 +207,80 @@ def write_to_kinesis(listOfEvents): return -## packages the passed log events into records for PutRecords def prepare_records(listOfEvents): records = [] for event in listOfEvents: - partitionKey = event.get('cloudwatch', {}).get('logStream', 'DEFAULT') + partition_key = event.get('cloudwatch', {}).get('logStream', 'DEFAULT') records.append({ - 'PartitionKey': partitionKey, + 'PartitionKey': partition_key, 'Data': json.dumps(event) }) return records -## forms a batch from the provided records and attempts to send it; any records -## that couldn't fit in the batch will be returned, as well as any that couldn't -## be sent (we return unattempted records first to give them the best chance of -## being sent if there are persistent errors) def process_batch(records): - toBeSent, toBeReturned = build_batch(records) - logging.info(f'sending batch of {len(toBeSent)} records with {len(toBeReturned)} remaining') + """ Forms a batch from the provided records and attempts to send it; any records + that won't fit in the batch will be returned, as well as any that couldn't + be sent (we return unattempted records first to give them the best chance of + being sent if there are persistent errors). + """ + to_be_sent, to_be_returned = build_batch(records) + logging.info(f'sending batch of {len(to_be_sent)} records with {len(to_be_returned)} remaining') try: - response = kinesisClient.put_records( - StreamName=kinesisStream, - Records=toBeSent + response = kinesis_client.put_records( + StreamName=kinesis_stream, + Records=to_be_sent ) - return process_response(response, toBeSent) + toBeReturned - except kinesisClient.exceptions.ProvisionedThroughputExceededException: - logging.warn(f'received throughput-exceeded on stream {kinesisStream}; retrying all messages') - return toBeSent + toBeReturned + return process_response(response, to_be_sent) + to_be_returned + except kinesis_client.exceptions.ProvisionedThroughputExceededException: + logging.warn(f'received throughput-exceeded on stream {kinesis_stream}; retrying all messages') + return to_be_sent + to_be_returned -## creates a batch of records based on Kinesis limits; returns both the batch -## and any remaining records that didn't fit in the batch def build_batch(records): - recCount = 0 - byteCount = 0 - - while recCount < min(len(records), 500) and byteCount < 1048576: - record = records[recCount] - recCount += 1 - byteCount += len(record['Data']) + len(record['PartitionKey']) - + """ Creates a batch of records based on Kinesis limits; returns both the batch + and any remaining records that didn't fit in the batch. + """ + rec_count = 0 + byte_count = 0 + + while rec_count < min(len(records), 500) and byte_count < 1048576: + record = records[rec_count] + rec_count += 1 + byte_count += len(record['Data']) + len(record['PartitionKey']) + # we already added the record before we knew the size, so we'll compensate - if byteCount > 1048576: - recCount = recCount - 1 - + if byte_count > 1048576: + rec_count = rec_count - 1 + # this should never happen: it would require a max-size log event and a # long partition key - if recCount == 0: + if rec_count == 0: logging.warn('throwing out too-long message') return [], records[1:] - - return records[:recCount], records[recCount:] + + return records[:rec_count], records[rec_count:] -## examines the response from a send command, and returns those records that were -## rejected def process_response(response, records): + """ Examines the response from PutRecords, returning any records that couldn't be sent. + """ if response['FailedRecordCount'] == 0: return [] - + result = [] - droppedRecordCount = 0 + dropped_record_count = 0 for ii in range(len(response['Records'])): entry = response['Records'][ii] errorCode = entry.get('ErrorCode') if errorCode == 'ProvisionedThroughputExceededException': result.append(records[ii]) elif errorCode: - droppedRecordCount += 1 - - if droppedRecordCount > 0: - logging.warn(f'dropped {droppedRecordCount} records due to Kinesis internal errors') + dropped_record_count += 1 + + if dropped_record_count > 0: + logging.warn(f'dropped {dropped_record_count} records due to Kinesis internal errors') if len(result) > 0: logging.info(f'requeueing {len(result)} records due to throughput-exceeded') - + return result diff --git a/lambda/cloudwatch-log-transform/sam/src/lambda_function.py b/lambda/cloudwatch-log-transform/sam/src/lambda_function.py deleted file mode 120000 index b222124..0000000 --- a/lambda/cloudwatch-log-transform/sam/src/lambda_function.py +++ /dev/null @@ -1 +0,0 @@ -../../lambda_function.py \ No newline at end of file diff --git a/lambda/cloudwatch-log-transform/sam/src/requirements.txt b/lambda/cloudwatch-log-transform/sam/src/requirements.txt deleted file mode 100644 index e69de29..0000000 diff --git a/lambda/cloudwatch-log-transform/sam/template.yml b/lambda/cloudwatch-log-transform/sam/template.yml deleted file mode 100644 index b201c40..0000000 --- a/lambda/cloudwatch-log-transform/sam/template.yml +++ /dev/null @@ -1,83 +0,0 @@ -AWSTemplateFormatVersion: "2010-09-09" -Description: "A Lambda function that receives CloudWatch Logs events from one Kinesis stream and writes them to another" -Transform: "AWS::Serverless-2016-10-31" - -Parameters: - - LambdaName: - Description: "Name of the Lambda function to create" - Type: "String" - Default: "CloudWatchLogsTransformer" - - SourceStreamName: - Description: "Name of the source Kinesis stream" - Type: "String" - Default: "CloudWatchSubscriptionDestination" - - DestinationStreamName: - Description: "Name of the Kinesis stream that will receive transformed log events" - Type: "String" - Default: "AppenderExample" - -Resources: - - LambdaRole: - Type: "AWS::IAM::Role" - Properties: - Path: "/lambda/" - RoleName: !Sub "${LambdaName}-ExecutionRole" - AssumeRolePolicyDocument: - Version: "2012-10-17" - Statement: - Effect: "Allow" - Principal: - Service: "lambda.amazonaws.com" - Action: "sts:AssumeRole" - ManagedPolicyArns: - - "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" - Policies: - - - PolicyName: "ReadFromSource" - PolicyDocument: - Version: "2012-10-17" - Statement: - Effect: "Allow" - Action: - - "kinesis:ListStreams" - - "kinesis:DescribeStream" - - "kinesis:GetShardIterator" - - "kinesis:GetRecords" - Resource: [ !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${SourceStreamName}" ] - - - PolicyName: "WriteToDestination" - PolicyDocument: - Version: "2012-10-17" - Statement: - Effect: "Allow" - Action: [ "kinesis:PutRecords" ] - Resource: [ !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${DestinationStreamName}" ] - - LambdaFunction: - Type: "AWS::Serverless::Function" - Properties: - FunctionName: !Ref LambdaName - Description: "Responds to CloudWatch Logs events delivered via Kinesis stream" - Role: !GetAtt LambdaRole.Arn - Runtime: "python3.7" - CodeUri: "src/" - Handler: "lambda_function.lambda_handler" - MemorySize: 128 - Timeout: 60 - Environment: - Variables: - DESTINATION_STREAM_NAME: !Ref DestinationStreamName - - EventSource: - Type: "AWS::Lambda::EventSourceMapping" - Properties: - EventSourceArn: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${SourceStreamName}" - FunctionName: !Ref LambdaFunction - Enabled: true - StartingPosition: LATEST - BatchSize: 100 - MaximumBatchingWindowInSeconds: 30