Skip to content

Commit

Permalink
chore(cmd): Add Kinesis Tap Development Application (#199)
Browse files Browse the repository at this point in the history
* docs: Add Example Dev App for Reading from Kinesis

* chore: Move App
  • Loading branch information
jshlbrd authored Jul 8, 2024
1 parent ea5a592 commit 0308c74
Show file tree
Hide file tree
Showing 5 changed files with 346 additions and 4 deletions.
4 changes: 0 additions & 4 deletions cmd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ This directory contains applications that run Substation. Applications are organ

Applications that run on AWS Lambda.

## client/

Applications that run on a client machine.

## development/

Applications that are used for development.
52 changes: 52 additions & 0 deletions cmd/development/kinesis-tap/substation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# kinesis-tap/substation

`kinesis-tap` is a tool for tapping into and transforming data from an AWS Kinesis Data Stream in real-time with Substation.

This is intended as a Substation development aid, but it has other uses as well, such as:
- Previewing live data in a stream by printing it to the console (default behavior)
- Sampling live data from a stream and saving it to a local file
- Forwarding live data between data pipeline stages for testing

Warning: This is a development tool intended to provide temporary access to live data in a Kinesis Data Stream; if you need to process data from a Kinesis Data Stream with strict reliability guarantees, use the [AWS Lambda applications](/cmd/aws/lambda/).

## Usage

```
% ./substation -h
Usage of ./substation:
-config string
The Substation configuration file used to transform records (default "./config.json")
-stream-name string
The AWS Kinesis Data Stream to fetch records from
-stream-offset string
Determines the offset of the stream (earliest, latest) (default "earliest")
```

Use the `SUBSTATION_DEBUG=1` environment variable to enable debug logging:
```
% SUBSTATION_DEBUG=1 ./substation -stream-name my-stream
DEBU[0000] Retrieved active shards from Kinesis stream. count=2 stream=my-stream
DEBU[0001] Retrieved records from Kinesis shard. count=981 shard=0x140004a6f80 stream=my-stream
DEBU[0002] Retrieved records from Kinesis shard. count=1055 shard=0x140004a6fe0 stream=my-stream
DEBU[0003] Retrieved records from Kinesis shard. count=2333 shard=0x140004a6f80 stream=my-stream
DEBU[0003] Retrieved records from Kinesis shard. count=1110 shard=0x140004a6fe0 stream=my-stream
DEBU[0004] Retrieved records from Kinesis shard. count=2109 shard=0x140004a6f80 stream=my-stream
DEBU[0004] Retrieved records from Kinesis shard. count=1094 shard=0x140004a6fe0 stream=my-stream
^CDEBU[0004] Closed connections to the Kinesis stream.
DEBU[0004] Closed Substation pipeline.
DEBU[0004] Flushed Substation pipeline.
```

## Build

```
git clone github.com/brexhq/substation && \
cd substation/cmd/development/kinesis-tap/substation && \
go build .
```

## Authentication

`kinesis-tap` uses the AWS SDK for Go to authenticate with AWS. The SDK uses the same authentication methods as the AWS CLI, so you can use the same environment variables or configuration files to authenticate.

For more information, see the [AWS CLI documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html).
7 changes: 7 additions & 0 deletions cmd/development/kinesis-tap/substation/config.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
local sub = import '../../../../build/config/substation.libsonnet';

{
transforms: [
sub.tf.send.stdout(),
]
}
264 changes: 264 additions & 0 deletions cmd/development/kinesis-tap/substation/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
package main

import (
"bytes"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"os"
"os/signal"
"runtime"
"syscall"
"time"

"golang.org/x/sync/errgroup"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/awslabs/kinesis-aggregation/go/deaggregator"
"github.com/brexhq/substation"
"github.com/brexhq/substation/internal/aws"
"github.com/brexhq/substation/internal/aws/kinesis"
"github.com/brexhq/substation/internal/channel"
"github.com/brexhq/substation/internal/file"
"github.com/brexhq/substation/internal/log"
"github.com/brexhq/substation/message"
)

type options struct {
Config string

// StreamName is the name of the Kinesis stream to read from.
StreamName string
// StreamOffset is the read offset of the stream (earliest, latest).
StreamOffset string
}

// getConfig contextually retrieves a Substation configuration.
func getConfig(ctx context.Context, cfg string) (io.Reader, error) {
path, err := file.Get(ctx, cfg)
defer os.Remove(path)

if err != nil {
return nil, err
}

conf, err := os.Open(path)
if err != nil {
return nil, err
}
defer conf.Close()

buf := new(bytes.Buffer)
if _, err := io.Copy(buf, conf); err != nil {
return nil, err
}

return buf, nil
}

func main() {
var opts options

flag.StringVar(&opts.Config, "config", "./config.json", "The Substation configuration file used to transform records")
flag.StringVar(&opts.StreamName, "stream-name", "", "The AWS Kinesis Data Stream to fetch records from")
flag.StringVar(&opts.StreamOffset, "stream-offset", "earliest", "Determines the offset of the stream (earliest, latest)")
flag.Parse()

if err := run(context.Background(), opts); err != nil {
panic(fmt.Errorf("main: %v", err))
}
}

//nolint:gocognit // Ignore cognitive complexity.
func run(ctx context.Context, opts options) error {
cfg := substation.Config{}
c, err := getConfig(ctx, opts.Config)
if err != nil {
return err
}

if err := json.NewDecoder(c).Decode(&cfg); err != nil {
return err
}

sub, err := substation.New(ctx, cfg)
if err != nil {
return err
}

ch := channel.New[*message.Message]()
group, ctx := errgroup.WithContext(ctx)

// Consumer group that transforms records using Substation
// until the channel is closed by the producer group.
group.Go(func() error {
tfGroup, tfCtx := errgroup.WithContext(ctx)
tfGroup.SetLimit(runtime.NumCPU())

for message := range ch.Recv() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

msg := message
tfGroup.Go(func() error {
if _, err := sub.Transform(tfCtx, msg); err != nil {
return err
}

return nil
})
}

if err := tfGroup.Wait(); err != nil {
return err
}

log.Debug("Closed Substation pipeline.")

// ctrl messages flush the pipeline. This must be done
// after all messages have been processed.
ctrl := message.New().AsControl()
if _, err := sub.Transform(tfCtx, ctrl); err != nil {
return err
}

log.Debug("Flushed Substation pipeline.")

return nil
})

// Producer group that fetches records from each shard in the
// Kinesis stream until the context is cancelled by an interrupt
// signal.
group.Go(func() error {
defer ch.Close() // Producer goroutines must close the channel when they are done.

// The AWS client is configured using environment variables
// or the default credentials file.
client := kinesis.API{}
client.Setup(aws.Config{})

res, err := client.ListShards(ctx, opts.StreamName)
if err != nil {
return err
}

log.WithField("stream", opts.StreamName).WithField("count", len(res.Shards)).Debug("Retrieved active shards from Kinesis stream.")

var iType string
switch opts.StreamOffset {
case "earliest":
iType = "TRIM_HORIZON"
case "latest":
iType = "LATEST"
default:
return fmt.Errorf("invalid offset: %s", opts.StreamOffset)
}

// Each shard is read concurrently using a worker
// pool managed by an errgroup that can be cancelled
// by an interrupt signal.
notifyCtx, cancel := signal.NotifyContext(ctx, syscall.SIGINT)
defer cancel()

recvGroup, recvCtx := errgroup.WithContext(notifyCtx)
defer log.Debug("Closed connections to the Kinesis stream.")

// This iterates over a snapshot of active shards in the
// stream and will not be updated if shards are split or
// merged. New shards can be identified in the response
// from GetRecords, but this isn't implemented.
//
// Each shard is paginated until the end of the shard is
// reached or the context is cancelled.
for _, shard := range res.Shards {
iterator, err := client.GetShardIterator(ctx, opts.StreamName, *shard.ShardId, iType)
if err != nil {
return err
}

recvGroup.Go(func() error {
shardIterator := *iterator.ShardIterator

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// GetRecords has a limit of 5 transactions per second
// per shard, so this loop is designed to not overload
// the API in case other consumers are reading from the
// same shard.
res, err := client.GetRecords(recvCtx, shardIterator)
if err != nil {
return err
}

if res.NextShardIterator == nil {
log.WithField("stream", opts.StreamName).WithField("shard", shard.ShardId).Debug("Reached end of Kinesis shard.")

break
}
shardIterator = *res.NextShardIterator

if len(res.Records) == 0 {
time.Sleep(500 * time.Millisecond)

continue
}

deagg, err := deaggregator.DeaggregateRecords(res.Records)
if err != nil {
return err
}

log.WithField("stream", opts.StreamName).WithField("shard", shard.ShardId).WithField("count", len(deagg)).Debug("Retrieved records from Kinesis shard.")

for _, record := range deagg {
msg := message.New().SetData(record.Data)
ch.Send(msg)
}

time.Sleep(500 * time.Millisecond)
}

return nil
})
}

// Cancellation errors are expected when the errgroup
// is interrupted. All other errors are returned to
// the caller.
if err := recvGroup.Wait(); err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "RequestCanceled" {
return nil
}
}

if errors.Is(err, context.Canceled) {
return nil
}

return err
}

return nil
})

// Wait for the producer and consumer groups to finish,
// or an error from either group.
if err := group.Wait(); err != nil {
return err
}

return nil
}
23 changes: 23 additions & 0 deletions internal/aws/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,29 @@ func (a *API) IsEnabled() bool {
return a.Client != nil
}

// ListShards wraps the ListShardsWithContext API.
func (a *API) ListShards(ctx aws.Context, stream string) (*kinesis.ListShardsOutput, error) {
return a.Client.ListShardsWithContext(ctx, &kinesis.ListShardsInput{
StreamName: aws.String(stream),
})
}

// GetShardIterator wraps the GetShardIteratorWithContext API.
func (a *API) GetShardIterator(ctx aws.Context, stream, shard, iteratorType string) (*kinesis.GetShardIteratorOutput, error) {
return a.Client.GetShardIteratorWithContext(ctx, &kinesis.GetShardIteratorInput{
ShardId: aws.String(shard),
ShardIteratorType: aws.String(iteratorType),
StreamName: aws.String(stream),
})
}

// GetRecords wraps the GetRecordsWithContext API.
func (a *API) GetRecords(ctx aws.Context, iterator string) (*kinesis.GetRecordsOutput, error) {
return a.Client.GetRecordsWithContext(ctx, &kinesis.GetRecordsInput{
ShardIterator: aws.String(iterator),
})
}

// PutRecords is a convenience wrapper for putting multiple records into a Kinesis stream.
func (a *API) PutRecords(ctx aws.Context, stream, partitionKey string, data [][]byte) (*kinesis.PutRecordsOutput, error) {
var records []*kinesis.PutRecordsRequestEntry
Expand Down

0 comments on commit 0308c74

Please sign in to comment.