From a12ea82def8fe433ee4664c99c7f36ace12c7868 Mon Sep 17 00:00:00 2001 From: jbleduigou Date: Tue, 10 Oct 2023 20:51:59 +0200 Subject: [PATCH 1/2] feat: add support of Kinesis EFO Consumers --- docs/configuration/cloud-providers/aws.mdx | 1 + mocks/kinesis.go | 20 +++ policy.json | 1 + providers/aws/kinesis/streams.go | 46 ++++++ providers/aws/kinesis/streams_test.go | 164 +++++++++++++++++++++ 5 files changed, 232 insertions(+) create mode 100644 mocks/kinesis.go create mode 100644 providers/aws/kinesis/streams_test.go diff --git a/docs/configuration/cloud-providers/aws.mdx b/docs/configuration/cloud-providers/aws.mdx index a924962c5..fc1a827b9 100644 --- a/docs/configuration/cloud-providers/aws.mdx +++ b/docs/configuration/cloud-providers/aws.mdx @@ -33,6 +33,7 @@ sidebar_label: Amazon Web Services - IAM roles - IAM SAML providers - Kinesis Data Streams + - Kinesis EFO Consumers - KMS keys - Lambda functions - RDS clusters diff --git a/mocks/kinesis.go b/mocks/kinesis.go new file mode 100644 index 000000000..28f60b488 --- /dev/null +++ b/mocks/kinesis.go @@ -0,0 +1,20 @@ +package mocks + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/stretchr/testify/mock" +) + +type KinesisClient struct { + mock.Mock +} + +func (_m *KinesisClient) ListStreamConsumers(ctx context.Context, params *kinesis.ListStreamConsumersInput, optFns ...func(*kinesis.Options)) (*kinesis.ListStreamConsumersOutput, error) { + ret := _m.Called(ctx, params, optFns) + if ret.Get(1) == nil { + return ret.Get(0).(*kinesis.ListStreamConsumersOutput), nil + } + return nil, ret.Get(1).(error) +} diff --git a/policy.json b/policy.json index c39f4dfd9..cd321e529 100644 --- a/policy.json +++ b/policy.json @@ -55,6 +55,7 @@ "iam:ListSAMLProviders", "iam:ListSAMLProviderTags", "kinesis:ListStreams", + "kinesis:ListStreamConsumers", "kms:ListKeys", "kms:ListResourceTags", "kms:DescribeKey", diff --git a/providers/aws/kinesis/streams.go b/providers/aws/kinesis/streams.go index 2a2c31a55..27d1c6c6c 100644 --- a/providers/aws/kinesis/streams.go +++ b/providers/aws/kinesis/streams.go @@ -9,10 +9,15 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" . "github.com/tailwarden/komiser/models" . "github.com/tailwarden/komiser/providers" ) +type KinesisClient interface { + ListStreamConsumers(ctx context.Context, params *kinesis.ListStreamConsumersInput, optFns ...func(*kinesis.Options)) (*kinesis.ListStreamConsumersOutput, error) +} + func Streams(ctx context.Context, client ProviderClient) ([]Resource, error) { resources := make([]Resource, 0) var config kinesis.ListStreamsInput @@ -37,6 +42,11 @@ func Streams(ctx context.Context, client ProviderClient) ([]Resource, error) { FetchedAt: time.Now(), Link: fmt.Sprintf("https://%s.console.aws.amazon.com/kinesis/home?region=%s#/streams/details/%s", client.AWSClient.Region, client.AWSClient.Region, *stream.StreamName), }) + consumers, err := getStreamConsumers(kinesisClient, stream, client.Name, client.AWSClient.Region) + if err != nil { + return resources, err + } + resources = append(resources, consumers...) } if aws.ToString(output.NextToken) == "" { @@ -56,3 +66,39 @@ func Streams(ctx context.Context, client ProviderClient) ([]Resource, error) { return resources, nil } + +func getStreamConsumers(kinesisClient KinesisClient, stream types.StreamSummary, clientName, region string) ([]Resource, error) { + resources := make([]Resource, 0) + config := kinesis.ListStreamConsumersInput{ + StreamARN: aws.String(aws.ToString(stream.StreamARN)), + } + + for { + output, err := kinesisClient.ListStreamConsumers(context.Background(), &config) + if err != nil { + return resources, err + } + + for _, consumer := range output.Consumers { + resources = append(resources, Resource{ + Provider: "AWS", + Account: clientName, + Service: "Kinesis EFO Consumer", + ResourceId: *consumer.ConsumerARN, + Region: region, + Name: *consumer.ConsumerName, + Cost: 0, + CreatedAt: *consumer.ConsumerCreationTimestamp, + FetchedAt: time.Now(), + Link: fmt.Sprintf("https://%s.console.aws.amazon.com/kinesis/home?region=%s#/streams/details/%s/registeredConsumers/%s", region, region, aws.ToString(stream.StreamName), *consumer.ConsumerName), + }) + } + + if aws.ToString(output.NextToken) == "" { + break + } + config.NextToken = output.NextToken + } + + return resources, nil +} diff --git a/providers/aws/kinesis/streams_test.go b/providers/aws/kinesis/streams_test.go new file mode 100644 index 000000000..7f10e173b --- /dev/null +++ b/providers/aws/kinesis/streams_test.go @@ -0,0 +1,164 @@ +package kinesis + +import ( + "fmt" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/tailwarden/komiser/mocks" + . "github.com/tailwarden/komiser/models" +) + +func Test_getStreamConsumers(t *testing.T) { + tests := []struct { + name string + stream types.StreamSummary + setupMock func(m *mocks.KinesisClient) + clientName string + region string + want []Resource + wantErr bool + }{ + { + name: "Should return one EFO consumer", + stream: types.StreamSummary{ + StreamARN: aws.String("arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream"), + StreamName: aws.String("kinesis-data-stream"), + }, + setupMock: func(m *mocks.KinesisClient) { + m.On("ListStreamConsumers", mock.Anything, mock.Anything, mock.Anything).Return(&kinesis.ListStreamConsumersOutput{ + Consumers: []types.Consumer{ + { + ConsumerARN: aws.String("arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream/consumer/kinesis-efo-consumer:1234567890"), + ConsumerCreationTimestamp: aws.Time(time.UnixMilli(1234567890)), + ConsumerName: aws.String("kinesis-efo-consumer"), + ConsumerStatus: types.ConsumerStatusActive, + }, + }, + }, nil).Once() + }, + clientName: "sandbox", + region: "us-east-1", + want: []Resource{ + { + Provider: "AWS", + Account: "sandbox", + Service: "Kinesis EFO Consumer", + ResourceId: "arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream/consumer/kinesis-efo-consumer:1234567890", + Region: "us-east-1", + Name: "kinesis-efo-consumer", + Cost: 0, + CreatedAt: time.UnixMilli(1234567890), + FetchedAt: time.Now(), + Link: "https://us-east-1.console.aws.amazon.com/kinesis/home?region=us-east-1#/streams/details/kinesis-data-stream/registeredConsumers/kinesis-efo-consumer", + }, + }, + wantErr: false, + }, + { + name: "Should paginate using next token", + stream: types.StreamSummary{ + StreamARN: aws.String("arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream"), + StreamName: aws.String("kinesis-data-stream"), + }, + setupMock: func(m *mocks.KinesisClient) { + m.On("ListStreamConsumers", mock.Anything, mock.Anything, mock.Anything).Return(&kinesis.ListStreamConsumersOutput{ + NextToken: aws.String("next-token"), + Consumers: []types.Consumer{ + { + ConsumerARN: aws.String("arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream/consumer/kinesis-efo-consumer-1:1234567890"), + ConsumerCreationTimestamp: aws.Time(time.UnixMilli(1234567890)), + ConsumerName: aws.String("kinesis-efo-consumer-1"), + ConsumerStatus: types.ConsumerStatusActive, + }, + }, + }, nil).Once() + m.On("ListStreamConsumers", mock.Anything, mock.Anything, mock.Anything).Return(&kinesis.ListStreamConsumersOutput{ + Consumers: []types.Consumer{ + { + ConsumerARN: aws.String("arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream/consumer/kinesis-efo-consumer-2:1234567890"), + ConsumerCreationTimestamp: aws.Time(time.UnixMilli(1234567890)), + ConsumerName: aws.String("kinesis-efo-consumer-2"), + ConsumerStatus: types.ConsumerStatusActive, + }, + }, + }, nil).Once() + }, + clientName: "sandbox", + region: "us-east-1", + want: []Resource{ + { + Provider: "AWS", + Account: "sandbox", + Service: "Kinesis EFO Consumer", + ResourceId: "arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream/consumer/kinesis-efo-consumer-1:1234567890", + Region: "us-east-1", + Name: "kinesis-efo-consumer-1", + Cost: 0, + CreatedAt: time.UnixMilli(1234567890), + FetchedAt: time.Now(), + Link: "https://us-east-1.console.aws.amazon.com/kinesis/home?region=us-east-1#/streams/details/kinesis-data-stream/registeredConsumers/kinesis-efo-consumer-1", + }, + { + Provider: "AWS", + Account: "sandbox", + Service: "Kinesis EFO Consumer", + ResourceId: "arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream/consumer/kinesis-efo-consumer-2:1234567890", + Region: "us-east-1", + Name: "kinesis-efo-consumer-2", + Cost: 0, + CreatedAt: time.UnixMilli(1234567890), + FetchedAt: time.Now(), + Link: "https://us-east-1.console.aws.amazon.com/kinesis/home?region=us-east-1#/streams/details/kinesis-data-stream/registeredConsumers/kinesis-efo-consumer-2", + }, + }, + wantErr: false, + }, + { + name: "Should return error if error with kinesis client", + stream: types.StreamSummary{ + StreamARN: aws.String("arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream"), + StreamName: aws.String("kinesis-data-stream"), + }, + setupMock: func(m *mocks.KinesisClient) { + m.On("ListStreamConsumers", mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("unit test error")).Once() + }, + clientName: "sandbox", + region: "us-east-1", + want: []Resource{}, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + kinesisClient := &mocks.KinesisClient{} + tt.setupMock(kinesisClient) + + got, err := getStreamConsumers(kinesisClient, tt.stream, tt.clientName, tt.region) + if (err != nil) != tt.wantErr { + t.Errorf("getStreamConsumers() error = %v, wantErr %v", err, tt.wantErr) + return + } + if len(got) != len(tt.want) { + t.Errorf("getStreamConsumers() incorrect lenght of resources got = %v, want %v", len(got), len(tt.want)) + } else { + for i := range got { + assert.Equalf(t, tt.want[i].Link, got[i].Link, "incorrect Link for resources") + assert.Equalf(t, tt.want[i].Provider, got[i].Provider, "incorrect Provider for resources") + assert.Equalf(t, tt.want[i].Account, got[i].Account, "incorrect Account for resources") + assert.Equalf(t, tt.want[i].Service, got[i].Service, "incorrect Service for resources") + assert.Equalf(t, tt.want[i].ResourceId, got[i].ResourceId, "incorrect ResourceId for resources") + assert.Equalf(t, tt.want[i].Region, got[i].Region, "incorrect Region for resources") + assert.Equalf(t, tt.want[i].Name, got[i].Name, "incorrect Name for resources") + assert.Equalf(t, tt.want[i].Cost, got[i].Cost, "incorrect Cost for resources") + } + } + kinesisClient.AssertExpectations(t) + }) + } +} From c577fd66c8e74fbeba9d3989919927e9b0858435 Mon Sep 17 00:00:00 2001 From: jbleduigou Date: Wed, 11 Oct 2023 08:28:38 +0200 Subject: [PATCH 2/2] fix: re-use the same context within private functions --- providers/aws/kinesis/streams.go | 6 +++--- providers/aws/kinesis/streams_test.go | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/providers/aws/kinesis/streams.go b/providers/aws/kinesis/streams.go index 27d1c6c6c..de718e6d4 100644 --- a/providers/aws/kinesis/streams.go +++ b/providers/aws/kinesis/streams.go @@ -42,7 +42,7 @@ func Streams(ctx context.Context, client ProviderClient) ([]Resource, error) { FetchedAt: time.Now(), Link: fmt.Sprintf("https://%s.console.aws.amazon.com/kinesis/home?region=%s#/streams/details/%s", client.AWSClient.Region, client.AWSClient.Region, *stream.StreamName), }) - consumers, err := getStreamConsumers(kinesisClient, stream, client.Name, client.AWSClient.Region) + consumers, err := getStreamConsumers(ctx, kinesisClient, stream, client.Name, client.AWSClient.Region) if err != nil { return resources, err } @@ -67,14 +67,14 @@ func Streams(ctx context.Context, client ProviderClient) ([]Resource, error) { return resources, nil } -func getStreamConsumers(kinesisClient KinesisClient, stream types.StreamSummary, clientName, region string) ([]Resource, error) { +func getStreamConsumers(ctx context.Context, kinesisClient KinesisClient, stream types.StreamSummary, clientName, region string) ([]Resource, error) { resources := make([]Resource, 0) config := kinesis.ListStreamConsumersInput{ StreamARN: aws.String(aws.ToString(stream.StreamARN)), } for { - output, err := kinesisClient.ListStreamConsumers(context.Background(), &config) + output, err := kinesisClient.ListStreamConsumers(ctx, &config) if err != nil { return resources, err } diff --git a/providers/aws/kinesis/streams_test.go b/providers/aws/kinesis/streams_test.go index 7f10e173b..20c4790eb 100644 --- a/providers/aws/kinesis/streams_test.go +++ b/providers/aws/kinesis/streams_test.go @@ -1,6 +1,7 @@ package kinesis import ( + "context" "fmt" "testing" "time" @@ -136,10 +137,12 @@ func Test_getStreamConsumers(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + kinesisClient := &mocks.KinesisClient{} tt.setupMock(kinesisClient) - got, err := getStreamConsumers(kinesisClient, tt.stream, tt.clientName, tt.region) + got, err := getStreamConsumers(ctx, kinesisClient, tt.stream, tt.clientName, tt.region) if (err != nil) != tt.wantErr { t.Errorf("getStreamConsumers() error = %v, wantErr %v", err, tt.wantErr) return