Skip to content

Commit

Permalink
Merge pull request #1073 from jbleduigou/kinesis-efo-consumer
Browse files Browse the repository at this point in the history
feat: add support of Kinesis EFO Consumers
  • Loading branch information
mlabouardy authored Oct 11, 2023
2 parents 476b273 + c577fd6 commit 72bb651
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/configuration/cloud-providers/aws.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ sidebar_label: Amazon Web Services
- IAM roles
- IAM SAML providers
- Kinesis Data Streams
- Kinesis EFO Consumers
- KMS keys
- Lambda functions
- RDS clusters
Expand Down
20 changes: 20 additions & 0 deletions mocks/kinesis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package mocks

import (
"context"

"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/stretchr/testify/mock"
)

type KinesisClient struct {
mock.Mock
}

func (_m *KinesisClient) ListStreamConsumers(ctx context.Context, params *kinesis.ListStreamConsumersInput, optFns ...func(*kinesis.Options)) (*kinesis.ListStreamConsumersOutput, error) {
ret := _m.Called(ctx, params, optFns)
if ret.Get(1) == nil {
return ret.Get(0).(*kinesis.ListStreamConsumersOutput), nil
}
return nil, ret.Get(1).(error)
}
1 change: 1 addition & 0 deletions policy.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"iam:ListSAMLProviders",
"iam:ListSAMLProviderTags",
"kinesis:ListStreams",
"kinesis:ListStreamConsumers",
"kms:ListKeys",
"kms:ListResourceTags",
"kms:DescribeKey",
Expand Down
46 changes: 46 additions & 0 deletions providers/aws/kinesis/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ import (

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
. "github.com/tailwarden/komiser/models"
. "github.com/tailwarden/komiser/providers"
)

type KinesisClient interface {
ListStreamConsumers(ctx context.Context, params *kinesis.ListStreamConsumersInput, optFns ...func(*kinesis.Options)) (*kinesis.ListStreamConsumersOutput, error)
}

func Streams(ctx context.Context, client ProviderClient) ([]Resource, error) {
resources := make([]Resource, 0)
var config kinesis.ListStreamsInput
Expand All @@ -37,6 +42,11 @@ func Streams(ctx context.Context, client ProviderClient) ([]Resource, error) {
FetchedAt: time.Now(),
Link: fmt.Sprintf("https://%s.console.aws.amazon.com/kinesis/home?region=%s#/streams/details/%s", client.AWSClient.Region, client.AWSClient.Region, *stream.StreamName),
})
consumers, err := getStreamConsumers(ctx, kinesisClient, stream, client.Name, client.AWSClient.Region)
if err != nil {
return resources, err
}
resources = append(resources, consumers...)
}

if aws.ToString(output.NextToken) == "" {
Expand All @@ -56,3 +66,39 @@ func Streams(ctx context.Context, client ProviderClient) ([]Resource, error) {

return resources, nil
}

func getStreamConsumers(ctx context.Context, kinesisClient KinesisClient, stream types.StreamSummary, clientName, region string) ([]Resource, error) {
resources := make([]Resource, 0)
config := kinesis.ListStreamConsumersInput{
StreamARN: aws.String(aws.ToString(stream.StreamARN)),
}

for {
output, err := kinesisClient.ListStreamConsumers(ctx, &config)
if err != nil {
return resources, err
}

for _, consumer := range output.Consumers {
resources = append(resources, Resource{
Provider: "AWS",
Account: clientName,
Service: "Kinesis EFO Consumer",
ResourceId: *consumer.ConsumerARN,
Region: region,
Name: *consumer.ConsumerName,
Cost: 0,
CreatedAt: *consumer.ConsumerCreationTimestamp,
FetchedAt: time.Now(),
Link: fmt.Sprintf("https://%s.console.aws.amazon.com/kinesis/home?region=%s#/streams/details/%s/registeredConsumers/%s", region, region, aws.ToString(stream.StreamName), *consumer.ConsumerName),
})
}

if aws.ToString(output.NextToken) == "" {
break
}
config.NextToken = output.NextToken
}

return resources, nil
}
167 changes: 167 additions & 0 deletions providers/aws/kinesis/streams_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package kinesis

import (
"context"
"fmt"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/tailwarden/komiser/mocks"
. "github.com/tailwarden/komiser/models"
)

func Test_getStreamConsumers(t *testing.T) {
tests := []struct {
name string
stream types.StreamSummary
setupMock func(m *mocks.KinesisClient)
clientName string
region string
want []Resource
wantErr bool
}{
{
name: "Should return one EFO consumer",
stream: types.StreamSummary{
StreamARN: aws.String("arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream"),
StreamName: aws.String("kinesis-data-stream"),
},
setupMock: func(m *mocks.KinesisClient) {
m.On("ListStreamConsumers", mock.Anything, mock.Anything, mock.Anything).Return(&kinesis.ListStreamConsumersOutput{
Consumers: []types.Consumer{
{
ConsumerARN: aws.String("arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream/consumer/kinesis-efo-consumer:1234567890"),
ConsumerCreationTimestamp: aws.Time(time.UnixMilli(1234567890)),
ConsumerName: aws.String("kinesis-efo-consumer"),
ConsumerStatus: types.ConsumerStatusActive,
},
},
}, nil).Once()
},
clientName: "sandbox",
region: "us-east-1",
want: []Resource{
{
Provider: "AWS",
Account: "sandbox",
Service: "Kinesis EFO Consumer",
ResourceId: "arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream/consumer/kinesis-efo-consumer:1234567890",
Region: "us-east-1",
Name: "kinesis-efo-consumer",
Cost: 0,
CreatedAt: time.UnixMilli(1234567890),
FetchedAt: time.Now(),
Link: "https://us-east-1.console.aws.amazon.com/kinesis/home?region=us-east-1#/streams/details/kinesis-data-stream/registeredConsumers/kinesis-efo-consumer",
},
},
wantErr: false,
},
{
name: "Should paginate using next token",
stream: types.StreamSummary{
StreamARN: aws.String("arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream"),
StreamName: aws.String("kinesis-data-stream"),
},
setupMock: func(m *mocks.KinesisClient) {
m.On("ListStreamConsumers", mock.Anything, mock.Anything, mock.Anything).Return(&kinesis.ListStreamConsumersOutput{
NextToken: aws.String("next-token"),
Consumers: []types.Consumer{
{
ConsumerARN: aws.String("arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream/consumer/kinesis-efo-consumer-1:1234567890"),
ConsumerCreationTimestamp: aws.Time(time.UnixMilli(1234567890)),
ConsumerName: aws.String("kinesis-efo-consumer-1"),
ConsumerStatus: types.ConsumerStatusActive,
},
},
}, nil).Once()
m.On("ListStreamConsumers", mock.Anything, mock.Anything, mock.Anything).Return(&kinesis.ListStreamConsumersOutput{
Consumers: []types.Consumer{
{
ConsumerARN: aws.String("arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream/consumer/kinesis-efo-consumer-2:1234567890"),
ConsumerCreationTimestamp: aws.Time(time.UnixMilli(1234567890)),
ConsumerName: aws.String("kinesis-efo-consumer-2"),
ConsumerStatus: types.ConsumerStatusActive,
},
},
}, nil).Once()
},
clientName: "sandbox",
region: "us-east-1",
want: []Resource{
{
Provider: "AWS",
Account: "sandbox",
Service: "Kinesis EFO Consumer",
ResourceId: "arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream/consumer/kinesis-efo-consumer-1:1234567890",
Region: "us-east-1",
Name: "kinesis-efo-consumer-1",
Cost: 0,
CreatedAt: time.UnixMilli(1234567890),
FetchedAt: time.Now(),
Link: "https://us-east-1.console.aws.amazon.com/kinesis/home?region=us-east-1#/streams/details/kinesis-data-stream/registeredConsumers/kinesis-efo-consumer-1",
},
{
Provider: "AWS",
Account: "sandbox",
Service: "Kinesis EFO Consumer",
ResourceId: "arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream/consumer/kinesis-efo-consumer-2:1234567890",
Region: "us-east-1",
Name: "kinesis-efo-consumer-2",
Cost: 0,
CreatedAt: time.UnixMilli(1234567890),
FetchedAt: time.Now(),
Link: "https://us-east-1.console.aws.amazon.com/kinesis/home?region=us-east-1#/streams/details/kinesis-data-stream/registeredConsumers/kinesis-efo-consumer-2",
},
},
wantErr: false,
},
{
name: "Should return error if error with kinesis client",
stream: types.StreamSummary{
StreamARN: aws.String("arn:aws:kinesis:us-east-1:0123456789:stream/kinesis-data-stream"),
StreamName: aws.String("kinesis-data-stream"),
},
setupMock: func(m *mocks.KinesisClient) {
m.On("ListStreamConsumers", mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("unit test error")).Once()
},
clientName: "sandbox",
region: "us-east-1",
want: []Resource{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()

kinesisClient := &mocks.KinesisClient{}
tt.setupMock(kinesisClient)

got, err := getStreamConsumers(ctx, kinesisClient, tt.stream, tt.clientName, tt.region)
if (err != nil) != tt.wantErr {
t.Errorf("getStreamConsumers() error = %v, wantErr %v", err, tt.wantErr)
return
}
if len(got) != len(tt.want) {
t.Errorf("getStreamConsumers() incorrect lenght of resources got = %v, want %v", len(got), len(tt.want))
} else {
for i := range got {
assert.Equalf(t, tt.want[i].Link, got[i].Link, "incorrect Link for resources")
assert.Equalf(t, tt.want[i].Provider, got[i].Provider, "incorrect Provider for resources")
assert.Equalf(t, tt.want[i].Account, got[i].Account, "incorrect Account for resources")
assert.Equalf(t, tt.want[i].Service, got[i].Service, "incorrect Service for resources")
assert.Equalf(t, tt.want[i].ResourceId, got[i].ResourceId, "incorrect ResourceId for resources")
assert.Equalf(t, tt.want[i].Region, got[i].Region, "incorrect Region for resources")
assert.Equalf(t, tt.want[i].Name, got[i].Name, "incorrect Name for resources")
assert.Equalf(t, tt.want[i].Cost, got[i].Cost, "incorrect Cost for resources")
}
}
kinesisClient.AssertExpectations(t)
})
}
}

0 comments on commit 72bb651

Please sign in to comment.