Skip to content

Commit

Permalink
refactor!: metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jshlbrd committed Oct 6, 2023
1 parent 2ccd4d2 commit 5f91dad
Show file tree
Hide file tree
Showing 24 changed files with 286 additions and 235 deletions.
25 changes: 25 additions & 0 deletions build/config/substation.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,19 @@
type: 'meta_for_each',
settings: std.mergePatch(default, settings),
},
metrics: {
duration(settings=null): {
local default = {
name: null,
attributes: null,
destination: null,
transform: null,
},

type: 'meta_metrics_duration',
settings: std.mergePatch(default, settings),
},
},
pipe(settings=null): $.transform.meta.pipeline(settings=settings),
pipeline(settings=null): {
local default = {
Expand Down Expand Up @@ -881,6 +894,18 @@
type: 'utility_err',
settings: std.mergePatch(default, settings),
},
metrics: {
count(settings=null): {
local default = {
name: null,
attributes: null,
destination: null,
},

type: 'utility_metrics_count',
settings: std.mergePatch(default, settings),
},
},
secret(settings=null): {
local default = { secret: null },

Expand Down
35 changes: 1 addition & 34 deletions cmd/aws/lambda/substation/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ import (
"context"
"encoding/json"
"strings"
"sync/atomic"
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/brexhq/substation"
"github.com/brexhq/substation/internal/aws/dynamodb"
"github.com/brexhq/substation/internal/channel"
"github.com/brexhq/substation/internal/metrics"
"github.com/brexhq/substation/message"
"github.com/brexhq/substation/transform"
"golang.org/x/sync/errgroup"
Expand All @@ -26,7 +24,7 @@ type dynamodbMetadata struct {
StreamViewType string `json:"streamViewType"`
}

//nolint: gocognit, gocyclo, cyclop // Ignore cognitive and cyclomatic complexity.
// nolint: gocognit, gocyclo, cyclop // Ignore cognitive and cyclomatic complexity.

Check failure on line 27 in cmd/aws/lambda/substation/dynamodb.go

View workflow job for this annotation

GitHub Actions / go

directive `// nolint: gocognit, gocyclo, cyclop // Ignore cognitive and cyclomatic complexity.` should be written without leading space as `//nolint: gocognit, gocyclo, cyclop // Ignore cognitive and cyclomatic complexity.` (nolintlint)
func dynamodbHandler(ctx context.Context, event events.DynamoDBEvent) error {
// Retrieve and load configuration.
conf, err := getConfig(ctx)
Expand All @@ -44,13 +42,6 @@ func dynamodbHandler(ctx context.Context, event events.DynamoDBEvent) error {
return err
}

// Application metrics.
var msgRecv, msgTran uint32
metric, err := metrics.New(ctx, cfg.Metrics)
if err != nil {
return err
}

ch := channel.New[*message.Message]()
group, ctx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -78,8 +69,6 @@ func dynamodbHandler(ctx context.Context, event events.DynamoDBEvent) error {
if m.IsControl() {
continue
}

atomic.AddUint32(&msgTran, 1)
}

return nil
Expand Down Expand Up @@ -233,7 +222,6 @@ func dynamodbHandler(ctx context.Context, event events.DynamoDBEvent) error {
}

ch.Send(msg)
atomic.AddUint32(&msgRecv, 1)
}

return nil
Expand All @@ -245,26 +233,5 @@ func dynamodbHandler(ctx context.Context, event events.DynamoDBEvent) error {
return err
}

// Generate metrics.
if err := metric.Generate(ctx, metrics.Data{
Name: "MessagesReceived",
Value: msgRecv,
Attributes: map[string]string{
"FunctionName": functionName,
},
}); err != nil {
return err
}

if err := metric.Generate(ctx, metrics.Data{
Name: "MessagesTransformed",
Value: msgTran,
Attributes: map[string]string{
"FunctionName": functionName,
},
}); err != nil {
return err
}

return nil
}
33 changes: 0 additions & 33 deletions cmd/aws/lambda/substation/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package main
import (
"context"
"encoding/json"
"sync/atomic"
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/awslabs/kinesis-aggregation/go/deaggregator"
"github.com/brexhq/substation"
"github.com/brexhq/substation/internal/aws/kinesis"
"github.com/brexhq/substation/internal/channel"
"github.com/brexhq/substation/internal/metrics"
"github.com/brexhq/substation/message"
"github.com/brexhq/substation/transform"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -41,13 +39,6 @@ func kinesisHandler(ctx context.Context, event events.KinesisEvent) error {
return err
}

// Application metrics.
var msgRecv, msgTran uint32
metric, err := metrics.New(ctx, cfg.Metrics)
if err != nil {
return err
}

ch := channel.New[*message.Message]()
group, ctx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -75,8 +66,6 @@ func kinesisHandler(ctx context.Context, event events.KinesisEvent) error {
if m.IsControl() {
continue
}

atomic.AddUint32(&msgTran, 1)
}

return nil
Expand Down Expand Up @@ -132,7 +121,6 @@ func kinesisHandler(ctx context.Context, event events.KinesisEvent) error {
msg := message.New().SetData(record.Data).SetMetadata(metadata)

ch.Send(msg)
atomic.AddUint32(&msgRecv, 1)
}

return nil
Expand All @@ -144,26 +132,5 @@ func kinesisHandler(ctx context.Context, event events.KinesisEvent) error {
return err
}

// Generate metrics.
if err := metric.Generate(ctx, metrics.Data{
Name: "MessagesReceived",
Value: msgRecv,
Attributes: map[string]string{
"FunctionName": functionName,
},
}); err != nil {
return err
}

if err := metric.Generate(ctx, metrics.Data{
Name: "MessagesTransformed",
Value: msgTran,
Attributes: map[string]string{
"FunctionName": functionName,
},
}); err != nil {
return err
}

return nil
}
4 changes: 1 addition & 3 deletions cmd/aws/lambda/substation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/aws/aws-lambda-go/lambda"
"github.com/brexhq/substation"
"github.com/brexhq/substation/config"
"github.com/brexhq/substation/internal/file"
)

Expand All @@ -27,8 +26,7 @@ var errLambdaInvalidHandler = fmt.Errorf("invalid handler")
type customConfig struct {
substation.Config

Concurrency int `json:"concurrency"`
Metrics config.Config `json:"metrics"`
Concurrency int `json:"concurrency"`
}

// getConfig contextually retrieves a Substation configuration.
Expand Down
66 changes: 1 addition & 65 deletions cmd/aws/lambda/substation/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"net/url"
"os"
"sync/atomic"
"time"

"github.com/aws/aws-lambda-go/events"
Expand All @@ -14,7 +13,6 @@ import (
"github.com/brexhq/substation/internal/aws/s3manager"
"github.com/brexhq/substation/internal/bufio"
"github.com/brexhq/substation/internal/channel"
"github.com/brexhq/substation/internal/metrics"
"github.com/brexhq/substation/message"
"github.com/brexhq/substation/transform"
"golang.org/x/sync/errgroup"
Expand All @@ -28,7 +26,7 @@ type s3Metadata struct {
ObjectSize int64 `json:"objectSize"`
}

//nolint: gocognit // Ignore cognitive complexity.
// nolint: gocognit // Ignore cognitive complexity.

Check failure on line 29 in cmd/aws/lambda/substation/s3.go

View workflow job for this annotation

GitHub Actions / go

directive `// nolint: gocognit // Ignore cognitive complexity.` should be written without leading space as `//nolint: gocognit // Ignore cognitive complexity.` (nolintlint)
func s3Handler(ctx context.Context, event events.S3Event) error {
// Retrieve and load configuration.
conf, err := getConfig(ctx)
Expand All @@ -46,13 +44,6 @@ func s3Handler(ctx context.Context, event events.S3Event) error {
return err
}

// Application metrics.
var msgRecv, msgTran uint32
metric, err := metrics.New(ctx, cfg.Metrics)
if err != nil {
return err
}

ch := channel.New[*message.Message]()
group, ctx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -80,8 +71,6 @@ func s3Handler(ctx context.Context, event events.S3Event) error {
if m.IsControl() {
continue
}

atomic.AddUint32(&msgTran, 1)
}

return nil
Expand Down Expand Up @@ -164,7 +153,6 @@ func s3Handler(ctx context.Context, event events.S3Event) error {
msg := message.New().SetData(b).SetMetadata(metadata)

ch.Send(msg)
atomic.AddUint32(&msgRecv, 1)
}

if err := scanner.Err(); err != nil {
Expand All @@ -181,27 +169,6 @@ func s3Handler(ctx context.Context, event events.S3Event) error {
return err
}

// Generate metrics.
if err := metric.Generate(ctx, metrics.Data{
Name: "MessagesReceived",
Value: msgRecv,
Attributes: map[string]string{
"FunctionName": functionName,
},
}); err != nil {
return err
}

if err := metric.Generate(ctx, metrics.Data{
Name: "MessagesTransformed",
Value: msgTran,
Attributes: map[string]string{
"FunctionName": functionName,
},
}); err != nil {
return err
}

return nil
}

Expand All @@ -223,13 +190,6 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error {
return err
}

// Application metrics.
var msgRecv, msgTran uint32
metric, err := metrics.New(ctx, cfg.Metrics)
if err != nil {
return err
}

ch := channel.New[*message.Message]()
group, ctx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -257,8 +217,6 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error {
if m.IsControl() {
continue
}

atomic.AddUint32(&msgTran, 1)
}

return nil
Expand Down Expand Up @@ -344,7 +302,6 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error {
msg := message.New().SetData(b).SetMetadata(metadata)

ch.Send(msg)
atomic.AddUint32(&msgRecv, 1)
}

if err := scanner.Err(); err != nil {
Expand All @@ -362,26 +319,5 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error {
return err
}

// Generate metrics.
if err := metric.Generate(ctx, metrics.Data{
Name: "MessagesReceived",
Value: msgRecv,
Attributes: map[string]string{
"FunctionName": functionName,
},
}); err != nil {
return err
}

if err := metric.Generate(ctx, metrics.Data{
Name: "MessagesTransformed",
Value: msgTran,
Attributes: map[string]string{
"FunctionName": functionName,
},
}); err != nil {
return err
}

return nil
}
Loading

0 comments on commit 5f91dad

Please sign in to comment.