From 0308c743c139496e947e8f928456fb10690adac8 Mon Sep 17 00:00:00 2001 From: Josh Liburdi Date: Mon, 8 Jul 2024 10:55:27 -0700 Subject: [PATCH] chore(cmd): Add Kinesis Tap Development Application (#199) * docs: Add Example Dev App for Reading from Kinesis * chore: Move App --- cmd/README.md | 4 - .../kinesis-tap/substation/README.md | 52 ++++ .../kinesis-tap/substation/config.jsonnet | 7 + .../kinesis-tap/substation/main.go | 264 ++++++++++++++++++ internal/aws/kinesis/kinesis.go | 23 ++ 5 files changed, 346 insertions(+), 4 deletions(-) create mode 100644 cmd/development/kinesis-tap/substation/README.md create mode 100644 cmd/development/kinesis-tap/substation/config.jsonnet create mode 100644 cmd/development/kinesis-tap/substation/main.go diff --git a/cmd/README.md b/cmd/README.md index 37d39e30..0a29d514 100644 --- a/cmd/README.md +++ b/cmd/README.md @@ -6,10 +6,6 @@ This directory contains applications that run Substation. Applications are organ Applications that run on AWS Lambda. -## client/ - -Applications that run on a client machine. - ## development/ Applications that are used for development. diff --git a/cmd/development/kinesis-tap/substation/README.md b/cmd/development/kinesis-tap/substation/README.md new file mode 100644 index 00000000..e96ca0ce --- /dev/null +++ b/cmd/development/kinesis-tap/substation/README.md @@ -0,0 +1,52 @@ +# kinesis-tap/substation + +`kinesis-tap` is a tool for tapping into and transforming data from an AWS Kinesis Data Stream in real-time with Substation. + +This is intended as a Substation development aid, but it has other uses as well, such as: +- Previewing live data in a stream by printing it to the console (default behavior) +- Sampling live data from a stream and saving it to a local file +- Forwarding live data between data pipeline stages for testing + +Warning: This is a development tool intended to provide temporary access to live data in a Kinesis Data Stream; if you need to process data from a Kinesis Data Stream with strict reliability guarantees, use the [AWS Lambda applications](/cmd/aws/lambda/). + +## Usage + +``` +% ./substation -h +Usage of ./substation: + -config string + The Substation configuration file used to transform records (default "./config.json") + -stream-name string + The AWS Kinesis Data Stream to fetch records from + -stream-offset string + Determines the offset of the stream (earliest, latest) (default "earliest") +``` + +Use the `SUBSTATION_DEBUG=1` environment variable to enable debug logging: +``` +% SUBSTATION_DEBUG=1 ./substation -stream-name my-stream +DEBU[0000] Retrieved active shards from Kinesis stream. count=2 stream=my-stream +DEBU[0001] Retrieved records from Kinesis shard. count=981 shard=0x140004a6f80 stream=my-stream +DEBU[0002] Retrieved records from Kinesis shard. count=1055 shard=0x140004a6fe0 stream=my-stream +DEBU[0003] Retrieved records from Kinesis shard. count=2333 shard=0x140004a6f80 stream=my-stream +DEBU[0003] Retrieved records from Kinesis shard. count=1110 shard=0x140004a6fe0 stream=my-stream +DEBU[0004] Retrieved records from Kinesis shard. count=2109 shard=0x140004a6f80 stream=my-stream +DEBU[0004] Retrieved records from Kinesis shard. count=1094 shard=0x140004a6fe0 stream=my-stream +^CDEBU[0004] Closed connections to the Kinesis stream. +DEBU[0004] Closed Substation pipeline. +DEBU[0004] Flushed Substation pipeline. +``` + +## Build + +``` +git clone github.com/brexhq/substation && \ +cd substation/cmd/development/kinesis-tap/substation && \ +go build . +``` + +## Authentication + +`kinesis-tap` uses the AWS SDK for Go to authenticate with AWS. The SDK uses the same authentication methods as the AWS CLI, so you can use the same environment variables or configuration files to authenticate. + +For more information, see the [AWS CLI documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html). diff --git a/cmd/development/kinesis-tap/substation/config.jsonnet b/cmd/development/kinesis-tap/substation/config.jsonnet new file mode 100644 index 00000000..198853b4 --- /dev/null +++ b/cmd/development/kinesis-tap/substation/config.jsonnet @@ -0,0 +1,7 @@ +local sub = import '../../../../build/config/substation.libsonnet'; + +{ + transforms: [ + sub.tf.send.stdout(), + ] +} diff --git a/cmd/development/kinesis-tap/substation/main.go b/cmd/development/kinesis-tap/substation/main.go new file mode 100644 index 00000000..5425c403 --- /dev/null +++ b/cmd/development/kinesis-tap/substation/main.go @@ -0,0 +1,264 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "os" + "os/signal" + "runtime" + "syscall" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/awslabs/kinesis-aggregation/go/deaggregator" + "github.com/brexhq/substation" + "github.com/brexhq/substation/internal/aws" + "github.com/brexhq/substation/internal/aws/kinesis" + "github.com/brexhq/substation/internal/channel" + "github.com/brexhq/substation/internal/file" + "github.com/brexhq/substation/internal/log" + "github.com/brexhq/substation/message" +) + +type options struct { + Config string + + // StreamName is the name of the Kinesis stream to read from. + StreamName string + // StreamOffset is the read offset of the stream (earliest, latest). + StreamOffset string +} + +// getConfig contextually retrieves a Substation configuration. +func getConfig(ctx context.Context, cfg string) (io.Reader, error) { + path, err := file.Get(ctx, cfg) + defer os.Remove(path) + + if err != nil { + return nil, err + } + + conf, err := os.Open(path) + if err != nil { + return nil, err + } + defer conf.Close() + + buf := new(bytes.Buffer) + if _, err := io.Copy(buf, conf); err != nil { + return nil, err + } + + return buf, nil +} + +func main() { + var opts options + + flag.StringVar(&opts.Config, "config", "./config.json", "The Substation configuration file used to transform records") + flag.StringVar(&opts.StreamName, "stream-name", "", "The AWS Kinesis Data Stream to fetch records from") + flag.StringVar(&opts.StreamOffset, "stream-offset", "earliest", "Determines the offset of the stream (earliest, latest)") + flag.Parse() + + if err := run(context.Background(), opts); err != nil { + panic(fmt.Errorf("main: %v", err)) + } +} + +//nolint:gocognit // Ignore cognitive complexity. +func run(ctx context.Context, opts options) error { + cfg := substation.Config{} + c, err := getConfig(ctx, opts.Config) + if err != nil { + return err + } + + if err := json.NewDecoder(c).Decode(&cfg); err != nil { + return err + } + + sub, err := substation.New(ctx, cfg) + if err != nil { + return err + } + + ch := channel.New[*message.Message]() + group, ctx := errgroup.WithContext(ctx) + + // Consumer group that transforms records using Substation + // until the channel is closed by the producer group. + group.Go(func() error { + tfGroup, tfCtx := errgroup.WithContext(ctx) + tfGroup.SetLimit(runtime.NumCPU()) + + for message := range ch.Recv() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + msg := message + tfGroup.Go(func() error { + if _, err := sub.Transform(tfCtx, msg); err != nil { + return err + } + + return nil + }) + } + + if err := tfGroup.Wait(); err != nil { + return err + } + + log.Debug("Closed Substation pipeline.") + + // ctrl messages flush the pipeline. This must be done + // after all messages have been processed. + ctrl := message.New().AsControl() + if _, err := sub.Transform(tfCtx, ctrl); err != nil { + return err + } + + log.Debug("Flushed Substation pipeline.") + + return nil + }) + + // Producer group that fetches records from each shard in the + // Kinesis stream until the context is cancelled by an interrupt + // signal. + group.Go(func() error { + defer ch.Close() // Producer goroutines must close the channel when they are done. + + // The AWS client is configured using environment variables + // or the default credentials file. + client := kinesis.API{} + client.Setup(aws.Config{}) + + res, err := client.ListShards(ctx, opts.StreamName) + if err != nil { + return err + } + + log.WithField("stream", opts.StreamName).WithField("count", len(res.Shards)).Debug("Retrieved active shards from Kinesis stream.") + + var iType string + switch opts.StreamOffset { + case "earliest": + iType = "TRIM_HORIZON" + case "latest": + iType = "LATEST" + default: + return fmt.Errorf("invalid offset: %s", opts.StreamOffset) + } + + // Each shard is read concurrently using a worker + // pool managed by an errgroup that can be cancelled + // by an interrupt signal. + notifyCtx, cancel := signal.NotifyContext(ctx, syscall.SIGINT) + defer cancel() + + recvGroup, recvCtx := errgroup.WithContext(notifyCtx) + defer log.Debug("Closed connections to the Kinesis stream.") + + // This iterates over a snapshot of active shards in the + // stream and will not be updated if shards are split or + // merged. New shards can be identified in the response + // from GetRecords, but this isn't implemented. + // + // Each shard is paginated until the end of the shard is + // reached or the context is cancelled. + for _, shard := range res.Shards { + iterator, err := client.GetShardIterator(ctx, opts.StreamName, *shard.ShardId, iType) + if err != nil { + return err + } + + recvGroup.Go(func() error { + shardIterator := *iterator.ShardIterator + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // GetRecords has a limit of 5 transactions per second + // per shard, so this loop is designed to not overload + // the API in case other consumers are reading from the + // same shard. + res, err := client.GetRecords(recvCtx, shardIterator) + if err != nil { + return err + } + + if res.NextShardIterator == nil { + log.WithField("stream", opts.StreamName).WithField("shard", shard.ShardId).Debug("Reached end of Kinesis shard.") + + break + } + shardIterator = *res.NextShardIterator + + if len(res.Records) == 0 { + time.Sleep(500 * time.Millisecond) + + continue + } + + deagg, err := deaggregator.DeaggregateRecords(res.Records) + if err != nil { + return err + } + + log.WithField("stream", opts.StreamName).WithField("shard", shard.ShardId).WithField("count", len(deagg)).Debug("Retrieved records from Kinesis shard.") + + for _, record := range deagg { + msg := message.New().SetData(record.Data) + ch.Send(msg) + } + + time.Sleep(500 * time.Millisecond) + } + + return nil + }) + } + + // Cancellation errors are expected when the errgroup + // is interrupted. All other errors are returned to + // the caller. + if err := recvGroup.Wait(); err != nil { + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == "RequestCanceled" { + return nil + } + } + + if errors.Is(err, context.Canceled) { + return nil + } + + return err + } + + return nil + }) + + // Wait for the producer and consumer groups to finish, + // or an error from either group. + if err := group.Wait(); err != nil { + return err + } + + return nil +} diff --git a/internal/aws/kinesis/kinesis.go b/internal/aws/kinesis/kinesis.go index 9c2531af..a402e12a 100644 --- a/internal/aws/kinesis/kinesis.go +++ b/internal/aws/kinesis/kinesis.go @@ -170,6 +170,29 @@ func (a *API) IsEnabled() bool { return a.Client != nil } +// ListShards wraps the ListShardsWithContext API. +func (a *API) ListShards(ctx aws.Context, stream string) (*kinesis.ListShardsOutput, error) { + return a.Client.ListShardsWithContext(ctx, &kinesis.ListShardsInput{ + StreamName: aws.String(stream), + }) +} + +// GetShardIterator wraps the GetShardIteratorWithContext API. +func (a *API) GetShardIterator(ctx aws.Context, stream, shard, iteratorType string) (*kinesis.GetShardIteratorOutput, error) { + return a.Client.GetShardIteratorWithContext(ctx, &kinesis.GetShardIteratorInput{ + ShardId: aws.String(shard), + ShardIteratorType: aws.String(iteratorType), + StreamName: aws.String(stream), + }) +} + +// GetRecords wraps the GetRecordsWithContext API. +func (a *API) GetRecords(ctx aws.Context, iterator string) (*kinesis.GetRecordsOutput, error) { + return a.Client.GetRecordsWithContext(ctx, &kinesis.GetRecordsInput{ + ShardIterator: aws.String(iterator), + }) +} + // PutRecords is a convenience wrapper for putting multiple records into a Kinesis stream. func (a *API) PutRecords(ctx aws.Context, stream, partitionKey string, data [][]byte) (*kinesis.PutRecordsOutput, error) { var records []*kinesis.PutRecordsRequestEntry