From 3e1f6c7a57045503c1f195cf400a938835e08ac6 Mon Sep 17 00:00:00 2001 From: leon Date: Thu, 28 Apr 2022 19:03:03 +0300 Subject: [PATCH 1/5] fix kinesis error handle --- streams/client.go | 69 +++------ streams/client_test.go | 308 ++++++++++++++++++++++------------------- 2 files changed, 188 insertions(+), 189 deletions(-) diff --git a/streams/client.go b/streams/client.go index 53779d9..b8656d5 100644 --- a/streams/client.go +++ b/streams/client.go @@ -8,7 +8,6 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" "github.com/elastic/beats/libbeat/beat" @@ -132,30 +131,18 @@ func (client *client) publishBatch(b batch) ([]publisher.Event, error) { } } logp.Debug("kinesis", "mapped to records: %v", records) - res, err := client.putKinesisRecords(records) - failed := collectFailedEvents(res, okEvents) - if len(failed) == 0 { - if aerr, ok := err.(awserr.Error); ok { - switch aerr.Code() { - case kinesis.ErrCodeLimitExceededException: - case kinesis.ErrCodeProvisionedThroughputExceededException: - case kinesis.ErrCodeInternalFailureException: - logp.Info("putKinesisRecords failed (api level, not per-record failure). Will retry all records. error: %v", err) - failed = eventsFlattenLists(okEvents) - default: - logp.Warn("putKinesisRecords persistent failure. Will not retry. error: %v", err) - } - } - } - if err != nil || len(failed) > 0 { + err, ok := client.putKinesisRecords(records) + if !ok || err != nil { dur := client.backoff.Duration() - logp.Info("retrying %d events client.backoff.Duration %s on error: %v", len(failed), dur, err) + logp.Info("retrying %d events client.backoff.Duration %s on error: %v", len(records), dur, err) time.Sleep(dur) + return eventsFlattenLists(okEvents), err } else { client.backoff.Reset() } - return failed, err + return nil, nil } + func eventsFlattenLists(listOfListsEvents [][]publisher.Event) []publisher.Event { var flatEvents []publisher.Event for _, events := range listOfListsEvents { @@ -260,7 +247,6 @@ func (client *client) mapEventsGZip(events []publisher.Event) []batch { batchSize := 0 for i := range events { - // var gzippedBytes []byte event := events[i] buf, err := client.getBytesFromEvent(&event) if err != nil { @@ -340,7 +326,6 @@ func (client *client) mapEventsGZip(events []publisher.Event) []batch { dropped: dropped}) } records = append(records, record) - okEvents = append(okEvents, []publisher.Event{events[len(events)-1]}) } batches = append(batches, batch{ okEvents: okEvents, @@ -374,6 +359,7 @@ func (client *client) getBytesFromEvent(event *publisher.Event) ([]byte, error) logp.Critical("Unable to encode event: %v", err) return nil, err } + // See https://github.com/elastic/beats/blob/5a6630a8bc9b9caf312978f57d1d9193bdab1ac7/libbeat/outputs/kafka/client.go#L163-L164 // You need to copy the byte data like this. Otherwise you see strange issues like all the records sent in a same batch has the same Data. buf = make([]byte, len(serializedEvent)) @@ -430,42 +416,27 @@ func (client *client) mapBytes(buf []byte, event *publisher.Event) (*kinesis.Put return &kinesis.PutRecordsRequestEntry{Data: bufCP, PartitionKey: aws.String(partitionKey)}, nil } -func (client *client) putKinesisRecords(records []*kinesis.PutRecordsRequestEntry) (*kinesis.PutRecordsOutput, error) { +func (client *client) putKinesisRecords(records []*kinesis.PutRecordsRequestEntry) (error, bool) { request := kinesis.PutRecordsInput{ StreamName: &client.streamName, Records: records, } res, err := client.streams.PutRecords(&request) if err != nil { - return res, fmt.Errorf("failed to put records: %v", err) + return err, false } - return res, nil -} - -func collectFailedEvents(res *kinesis.PutRecordsOutput, events [][]publisher.Event) []publisher.Event { - if res.FailedRecordCount != nil && *res.FailedRecordCount > 0 { - failedEvents := make([]publisher.Event, 0) - records := res.Records - for i, r := range records { - if r == nil { - // See https://github.com/s12v/awsbeats/issues/27 for more info - logp.NewLogger("streams").Warn("no record returned from kinesis for event: ", events[i]) - continue - } - if r.ErrorCode == nil { - // logp.NewLogger("streams").Warn("skipping failed event with unexpected state: corresponding kinesis record misses error code: ", r) - continue - } - if *r.ErrorCode == "ProvisionedThroughputExceededException" { - logp.NewLogger("streams").Debug("throughput exceeded. will retry", r) - failedEvents = append(failedEvents, events[i]...) - } - if *r.ErrorCode != "" { - failedEvents = append(failedEvents, events[i]...) + allGood := true + for _, record := range res.Records { + if record != nil && record.ErrorCode != nil { + if *record.ErrorCode != "" { + allGood = false + if record.ErrorMessage != nil { + logp.Critical("Failed to put record to kinesis.code:%s msg :%s", *record.ErrorCode, *record.ErrorMessage) + } else { + logp.Critical("Failed to put record to kinesis: %v", *record.ErrorCode) + } } } - logp.Warn("Retrying %d events", len(failedEvents)) - return failedEvents } - return []publisher.Event{} + return nil, allGood } diff --git a/streams/client_test.go b/streams/client_test.go index 426d598..c175bfc 100644 --- a/streams/client_test.go +++ b/streams/client_test.go @@ -210,143 +210,133 @@ func TestPublishEvents(t *testing.T) { partitionKeyProvider: provider, observer: outputs.NewNilObserver(), batchSizeBytes: 5 * 1000 * 1000, + gzip: true, } event := publisher.Event{Content: beat.Event{Fields: common.MapStr{fieldForPartitionKey: expectedPartitionKey}}} - events := []publisher.Event{event} - - { - codecData := [][]byte{[]byte("boom")} - codecErr := []error{nil} - client.encoder = &StubCodec{dat: codecData, err: codecErr} - - putRecordsOut := []*kinesis.PutRecordsOutput{ - &kinesis.PutRecordsOutput{ - Records: []*kinesis.PutRecordsResultEntry{ - &kinesis.PutRecordsResultEntry{ - ErrorCode: aws.String(""), + testcases := []struct { + testname string + putRecordsOut []*kinesis.PutRecordsOutput + putRecordsErr []error + codecData [][]byte + codecErr []error + eventsToPublish []publisher.Event + remainingEvents int + }{ + { + testname: "success", + putRecordsOut: []*kinesis.PutRecordsOutput{ + { + Records: []*kinesis.PutRecordsResultEntry{ + { + ErrorCode: aws.String(""), + }, }, + FailedRecordCount: aws.Int64(0), }, - FailedRecordCount: aws.Int64(0), }, - } - putRecordsErr := []error{nil} - client.streams = &StubClient{out: putRecordsOut, err: putRecordsErr} - rest, err := client.publishEvents(events) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if len(rest) != 0 { - t.Errorf("unexpected number of remaining events: %d", len(rest)) - } - } - - { - // An event that can't be encoded should be ignored without any error, but with some log. - codecData := [][]byte{[]byte("")} - codecErr := []error{fmt.Errorf("failed to encode")} - client.encoder = &StubCodec{dat: codecData, err: codecErr} - putRecordsOut := []*kinesis.PutRecordsOutput{ - &kinesis.PutRecordsOutput{ - Records: []*kinesis.PutRecordsResultEntry{ - &kinesis.PutRecordsResultEntry{ - ErrorCode: aws.String(""), + putRecordsErr: []error{nil}, + codecData: [][]byte{[]byte("boom")}, + codecErr: []error{nil}, + eventsToPublish: []publisher.Event{event}, + remainingEvents: 0, + }, + { // An event that can't be encoded should be ignored without any error, but with some log. + testname: "unable to encode", + putRecordsOut: []*kinesis.PutRecordsOutput{ + { + Records: []*kinesis.PutRecordsResultEntry{ + { + ErrorCode: aws.String(""), + }, }, + FailedRecordCount: aws.Int64(0), }, - FailedRecordCount: aws.Int64(0), }, - } - putRecordsErr := []error{nil} - client.streams = &StubClient{out: putRecordsOut, err: putRecordsErr} - - rest, err := client.publishEvents(events) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if len(rest) != 0 { - t.Errorf("unexpected number of remaining events: %d", len(rest)) - } - } - - { - // Nil records returned by Kinesis should be ignored with some log - codecData := [][]byte{[]byte("boom")} - codecErr := []error{nil} - client.encoder = &StubCodec{dat: codecData, err: codecErr} - - putRecordsOut := []*kinesis.PutRecordsOutput{ - &kinesis.PutRecordsOutput{ - Records: []*kinesis.PutRecordsResultEntry{ - nil, + putRecordsErr: []error{nil}, + codecData: [][]byte{[]byte("")}, + codecErr: []error{fmt.Errorf("failed to encode")}, + eventsToPublish: []publisher.Event{event}, + remainingEvents: 0, + }, + { // Nil records returned by Kinesis should be ignored with some log + testname: "nil records returned by Kinesis", + putRecordsOut: []*kinesis.PutRecordsOutput{ + { + Records: []*kinesis.PutRecordsResultEntry{nil}, + FailedRecordCount: aws.Int64(1), }, - FailedRecordCount: aws.Int64(1), }, - } - putRecordsErr := []error{nil} - client.streams = &StubClient{out: putRecordsOut, err: putRecordsErr} - - rest, err := client.publishEvents(events) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if len(rest) != 0 { - t.Errorf("unexpected number of remaining events: %d", len(rest)) - } - } - - { - // Records with nil error codes should be ignored with some log - codecData := [][]byte{[]byte("boom")} - codecErr := []error{nil} - client.encoder = &StubCodec{dat: codecData, err: codecErr} - - putRecordsOut := []*kinesis.PutRecordsOutput{ - &kinesis.PutRecordsOutput{ - Records: []*kinesis.PutRecordsResultEntry{ - &kinesis.PutRecordsResultEntry{ - ErrorCode: nil, + putRecordsErr: []error{nil}, + codecData: [][]byte{[]byte("boom")}, + codecErr: []error{nil}, + eventsToPublish: []publisher.Event{event}, + remainingEvents: 0, + }, + { // Records with nil error codes should be ignored with some log + testname: "nil error codes", + putRecordsOut: []*kinesis.PutRecordsOutput{ + { + Records: []*kinesis.PutRecordsResultEntry{ + {ErrorCode: nil}, }, + FailedRecordCount: aws.Int64(1), }, - FailedRecordCount: aws.Int64(1), }, - } - putRecordsErr := []error{nil} - client.streams = &StubClient{out: putRecordsOut, err: putRecordsErr} - - rest, err := client.publishEvents(events) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if len(rest) != 0 { - t.Errorf("unexpected number of remaining events: %d", len(rest)) - } - } - - { - // Kinesis received the event but it was not persisted, probably due to underlying infrastructure failure - codecData := [][]byte{[]byte("boom")} - codecErr := []error{nil} - client.encoder = &StubCodec{dat: codecData, err: codecErr} - - putRecordsOut := []*kinesis.PutRecordsOutput{ - &kinesis.PutRecordsOutput{ - Records: []*kinesis.PutRecordsResultEntry{ - &kinesis.PutRecordsResultEntry{ - ErrorCode: aws.String("simulated_error"), + putRecordsErr: []error{nil}, + codecData: [][]byte{[]byte("boom")}, + codecErr: []error{nil}, + eventsToPublish: []publisher.Event{event}, + remainingEvents: 0, + }, + { // Kinesis received the event but it was not persisted, probably due to underlying infrastructure failure + testname: "event not persisted", + putRecordsOut: []*kinesis.PutRecordsOutput{ + { + Records: []*kinesis.PutRecordsResultEntry{ + { + ErrorCode: aws.String("simulated_error"), + }, }, + FailedRecordCount: aws.Int64(1), }, - FailedRecordCount: aws.Int64(1), }, - } - putRecordsErr := []error{nil} - client.streams = &StubClient{out: putRecordsOut, err: putRecordsErr} - - rest, err := client.publishEvents(events) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if len(rest) != 1 { - t.Errorf("unexpected number of remaining events: %d", len(rest)) - } + putRecordsErr: []error{nil}, + codecData: [][]byte{[]byte("boom")}, + codecErr: []error{nil}, + eventsToPublish: []publisher.Event{event}, + remainingEvents: 1, + }, + { // Kinesis received the event but it was not persisted, due to throughput exceeded error + testname: "throughput exceeded", + putRecordsOut: []*kinesis.PutRecordsOutput{ + { + Records: []*kinesis.PutRecordsResultEntry{ + { + ErrorCode: aws.String("ProvisionedThroughputExceededException"), + }, + }, + FailedRecordCount: aws.Int64(1), + }, + }, + putRecordsErr: []error{nil}, + codecData: [][]byte{[]byte("boom")}, + codecErr: []error{nil}, + eventsToPublish: []publisher.Event{event}, + remainingEvents: 1, + }, + } + for _, tc := range testcases { + t.Run(tc.testname, func(t *testing.T) { + client.encoder = &StubCodec{dat: tc.codecData, err: tc.codecErr} + client.streams = &StubClient{out: tc.putRecordsOut, err: tc.putRecordsErr} + rest, err := client.publishEvents(tc.eventsToPublish) + if err != nil { + t.Errorf("unexpected error: %+v", err) + } + if len(rest) != tc.remainingEvents { + t.Errorf("unexpected number of remaining events got:%d want:%d", len(rest), tc.remainingEvents) + } + }) } } @@ -446,15 +436,46 @@ func TestTestPublishEventsBatch(t *testing.T) { func TestTestPublishLargeStream(t *testing.T) { origMaxSizeOfRecord := MAX_RECORD_SIZE + + putRecordsOutputGood := &kinesis.PutRecordsOutput{ + Records: []*kinesis.PutRecordsResultEntry{{ErrorCode: aws.String("")}}, + FailedRecordCount: aws.Int64(0), + } + putRecordsOutputBad := &kinesis.PutRecordsOutput{ + Records: []*kinesis.PutRecordsResultEntry{{ErrorCode: aws.String("balagan")}}, + FailedRecordCount: aws.Int64(1), + } cases := []struct { gzip bool expectedNumberOfBatches int expectedBatch0Size int expectedBatch1Size int + expectedNumberOfRest int + putRecordsOut []*kinesis.PutRecordsOutput name string }{ - {gzip: false, expectedNumberOfBatches: 8, expectedBatch0Size: 10, expectedBatch1Size: 7, name: "no_gzip"}, - {gzip: true, expectedNumberOfBatches: 2, expectedBatch0Size: 15, expectedBatch1Size: 13, name: "gzip"}, + {gzip: false, expectedNumberOfBatches: 8, expectedBatch0Size: 10, expectedBatch1Size: 7, name: "no_gzip", + putRecordsOut: []*kinesis.PutRecordsOutput{ + putRecordsOutputGood, + putRecordsOutputGood, + putRecordsOutputGood, + putRecordsOutputGood, + putRecordsOutputGood, + putRecordsOutputGood, + putRecordsOutputGood, + putRecordsOutputGood, + }, + }, + {gzip: true, expectedNumberOfBatches: 2, expectedBatch0Size: 15, expectedBatch1Size: 13, name: "gzip", + putRecordsOut: []*kinesis.PutRecordsOutput{ + putRecordsOutputGood, + putRecordsOutputGood, + }}, + {gzip: true, expectedNumberOfBatches: 2, expectedBatch0Size: 15, expectedBatch1Size: 13, name: "gzip_put_error", + putRecordsOut: []*kinesis.PutRecordsOutput{ + putRecordsOutputGood, + putRecordsOutputBad, + }, expectedNumberOfRest: 21}, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { @@ -526,16 +547,8 @@ func TestTestPublishLargeStream(t *testing.T) { client.encoder = &StubCodec{dat: codecData, err: codecErr} - putRecordsOutputGood := &kinesis.PutRecordsOutput{ - Records: []*kinesis.PutRecordsResultEntry{{ErrorCode: aws.String("")}}, - FailedRecordCount: aws.Int64(0), - } - var putRecordsOut []*kinesis.PutRecordsOutput - for i := 0; i < tc.expectedNumberOfBatches; i++ { - putRecordsOut = append(putRecordsOut, putRecordsOutputGood) - } - putRecordsErr := make([]error, len(putRecordsOut)) - kinesisStub := &StubClient{out: putRecordsOut, err: putRecordsErr} + putRecordsErr := make([]error, len(tc.putRecordsOut)) + kinesisStub := &StubClient{out: tc.putRecordsOut, err: putRecordsErr} client.streams = kinesisStub for range codecData { @@ -545,24 +558,23 @@ func TestTestPublishLargeStream(t *testing.T) { fieldForPartitionKey: "expectedPartitionKey", }, }, - }, - ) + }) } rest, err := client.publishEvents(events) if err != nil { t.Errorf("unexpected error: %v", err) } - if len(rest) != 0 { + if len(rest) != tc.expectedNumberOfRest { t.Errorf("unexpected number of remaining events: %d", len(rest)) } if len(kinesisStub.calls) != tc.expectedNumberOfBatches { - t.Errorf("unexpected number of batches: %d", len(kinesisStub.calls)) + t.Errorf("unexpected number of batches. got:%d wanted:%d", len(kinesisStub.calls), tc.expectedNumberOfBatches) } if len(kinesisStub.calls[0].Records) != tc.expectedBatch0Size { - t.Errorf("unexpected number of records in batch 0 got: %d", len(kinesisStub.calls[0].Records)) + t.Errorf("unexpected number of records in batch 0. got:%d wanted:%d", len(kinesisStub.calls[0].Records), tc.expectedBatch0Size) } if len(kinesisStub.calls[1].Records) != tc.expectedBatch1Size { - t.Errorf("unexpected number of records in batch 1 got: %d", len(kinesisStub.calls[1].Records)) + t.Errorf("unexpected number of records in batch 1. got:%d wanted:%d", len(kinesisStub.calls[1].Records), tc.expectedBatch1Size) } content, err := ioutil.ReadFile(fmt.Sprint("../testdata/streams/TestTestPublishEventsBatchGZIP_", tc.name, ".golden")) if err != nil { @@ -606,8 +618,11 @@ func getStringFromKinesisStab(s *StubClient, is_gzip bool, t *testing.T) string if err != nil { t.Errorf("unable to read gzipped data error: %v", err) } - io.Copy(&buf, zr) + var midBuf bytes.Buffer + io.Copy(&midBuf, zr) zr.Close() + midStr := midBuf.String() + buf.WriteString(midStr) buf.WriteString("\n") } } @@ -662,3 +677,16 @@ func TestClient_String(t *testing.T) { t.Errorf("unexpected value '%v'", v) } } + +// func testCollectFailedEvents(t *testing.T) { +// events []publisher.Event = []publisher.Event{ +// failedEvents := collectFailedEvents(events) +// if len(failedEvents) != len(expectedFailedEvents) { +// t.Errorf("unexpected number of failed events: %d", len(failedEvents)) +// } +// for i, ev := range failedEvents { +// if !cmp.Equal(ev, expectedFailedEvents[i]) { +// t.Errorf("unexpected failed event: %v", ev) +// } +// } +// } From d8db8a32ec2f69a59313436e0ee5cbbf23abd345 Mon Sep 17 00:00:00 2001 From: leon Date: Thu, 28 Apr 2022 19:12:05 +0300 Subject: [PATCH 2/5] rm comments --- streams/client_test.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/streams/client_test.go b/streams/client_test.go index c175bfc..4a7ca02 100644 --- a/streams/client_test.go +++ b/streams/client_test.go @@ -677,16 +677,3 @@ func TestClient_String(t *testing.T) { t.Errorf("unexpected value '%v'", v) } } - -// func testCollectFailedEvents(t *testing.T) { -// events []publisher.Event = []publisher.Event{ -// failedEvents := collectFailedEvents(events) -// if len(failedEvents) != len(expectedFailedEvents) { -// t.Errorf("unexpected number of failed events: %d", len(failedEvents)) -// } -// for i, ev := range failedEvents { -// if !cmp.Equal(ev, expectedFailedEvents[i]) { -// t.Errorf("unexpected failed event: %v", ev) -// } -// } -// } From 656a85f8dff1850c69bb7707166b5b037281e9ea Mon Sep 17 00:00:00 2001 From: leon Date: Wed, 18 May 2022 18:46:30 +0300 Subject: [PATCH 3/5] expected fix in tests --- streams/client_test.go | 84 +++++++++++++++++++++--------------------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/streams/client_test.go b/streams/client_test.go index 4a7ca02..8251635 100644 --- a/streams/client_test.go +++ b/streams/client_test.go @@ -214,13 +214,13 @@ func TestPublishEvents(t *testing.T) { } event := publisher.Event{Content: beat.Event{Fields: common.MapStr{fieldForPartitionKey: expectedPartitionKey}}} testcases := []struct { - testname string - putRecordsOut []*kinesis.PutRecordsOutput - putRecordsErr []error - codecData [][]byte - codecErr []error - eventsToPublish []publisher.Event - remainingEvents int + testname string + putRecordsOut []*kinesis.PutRecordsOutput + mockPutRecordsErr []error + mockCodecData [][]byte + mockCodecErr []error + mockEventsToPublish []publisher.Event + expectedRemainingEvents int }{ { testname: "success", @@ -234,11 +234,11 @@ func TestPublishEvents(t *testing.T) { FailedRecordCount: aws.Int64(0), }, }, - putRecordsErr: []error{nil}, - codecData: [][]byte{[]byte("boom")}, - codecErr: []error{nil}, - eventsToPublish: []publisher.Event{event}, - remainingEvents: 0, + mockPutRecordsErr: []error{nil}, + mockCodecData: [][]byte{[]byte("boom")}, + mockCodecErr: []error{nil}, + mockEventsToPublish: []publisher.Event{event}, + expectedRemainingEvents: 0, }, { // An event that can't be encoded should be ignored without any error, but with some log. testname: "unable to encode", @@ -252,11 +252,11 @@ func TestPublishEvents(t *testing.T) { FailedRecordCount: aws.Int64(0), }, }, - putRecordsErr: []error{nil}, - codecData: [][]byte{[]byte("")}, - codecErr: []error{fmt.Errorf("failed to encode")}, - eventsToPublish: []publisher.Event{event}, - remainingEvents: 0, + mockPutRecordsErr: []error{nil}, + mockCodecData: [][]byte{[]byte("")}, + mockCodecErr: []error{fmt.Errorf("failed to encode")}, + mockEventsToPublish: []publisher.Event{event}, + expectedRemainingEvents: 0, }, { // Nil records returned by Kinesis should be ignored with some log testname: "nil records returned by Kinesis", @@ -266,11 +266,11 @@ func TestPublishEvents(t *testing.T) { FailedRecordCount: aws.Int64(1), }, }, - putRecordsErr: []error{nil}, - codecData: [][]byte{[]byte("boom")}, - codecErr: []error{nil}, - eventsToPublish: []publisher.Event{event}, - remainingEvents: 0, + mockPutRecordsErr: []error{nil}, + mockCodecData: [][]byte{[]byte("boom")}, + mockCodecErr: []error{nil}, + mockEventsToPublish: []publisher.Event{event}, + expectedRemainingEvents: 0, }, { // Records with nil error codes should be ignored with some log testname: "nil error codes", @@ -282,11 +282,11 @@ func TestPublishEvents(t *testing.T) { FailedRecordCount: aws.Int64(1), }, }, - putRecordsErr: []error{nil}, - codecData: [][]byte{[]byte("boom")}, - codecErr: []error{nil}, - eventsToPublish: []publisher.Event{event}, - remainingEvents: 0, + mockPutRecordsErr: []error{nil}, + mockCodecData: [][]byte{[]byte("boom")}, + mockCodecErr: []error{nil}, + mockEventsToPublish: []publisher.Event{event}, + expectedRemainingEvents: 0, }, { // Kinesis received the event but it was not persisted, probably due to underlying infrastructure failure testname: "event not persisted", @@ -300,11 +300,11 @@ func TestPublishEvents(t *testing.T) { FailedRecordCount: aws.Int64(1), }, }, - putRecordsErr: []error{nil}, - codecData: [][]byte{[]byte("boom")}, - codecErr: []error{nil}, - eventsToPublish: []publisher.Event{event}, - remainingEvents: 1, + mockPutRecordsErr: []error{nil}, + mockCodecData: [][]byte{[]byte("boom")}, + mockCodecErr: []error{nil}, + mockEventsToPublish: []publisher.Event{event}, + expectedRemainingEvents: 1, }, { // Kinesis received the event but it was not persisted, due to throughput exceeded error testname: "throughput exceeded", @@ -318,23 +318,23 @@ func TestPublishEvents(t *testing.T) { FailedRecordCount: aws.Int64(1), }, }, - putRecordsErr: []error{nil}, - codecData: [][]byte{[]byte("boom")}, - codecErr: []error{nil}, - eventsToPublish: []publisher.Event{event}, - remainingEvents: 1, + mockPutRecordsErr: []error{nil}, + mockCodecData: [][]byte{[]byte("boom")}, + mockCodecErr: []error{nil}, + mockEventsToPublish: []publisher.Event{event}, + expectedRemainingEvents: 1, }, } for _, tc := range testcases { t.Run(tc.testname, func(t *testing.T) { - client.encoder = &StubCodec{dat: tc.codecData, err: tc.codecErr} - client.streams = &StubClient{out: tc.putRecordsOut, err: tc.putRecordsErr} - rest, err := client.publishEvents(tc.eventsToPublish) + client.encoder = &StubCodec{dat: tc.mockCodecData, err: tc.mockCodecErr} + client.streams = &StubClient{out: tc.putRecordsOut, err: tc.mockPutRecordsErr} + rest, err := client.publishEvents(tc.mockEventsToPublish) if err != nil { t.Errorf("unexpected error: %+v", err) } - if len(rest) != tc.remainingEvents { - t.Errorf("unexpected number of remaining events got:%d want:%d", len(rest), tc.remainingEvents) + if len(rest) != tc.expectedRemainingEvents { + t.Errorf("unexpected number of remaining events got:%d want:%d", len(rest), tc.expectedRemainingEvents) } }) } From a42730896a87daeb8a6155d1472be1756d1c0edd Mon Sep 17 00:00:00 2001 From: leon Date: Wed, 18 May 2022 18:48:14 +0300 Subject: [PATCH 4/5] zip buf --- streams/client_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streams/client_test.go b/streams/client_test.go index 8251635..341681b 100644 --- a/streams/client_test.go +++ b/streams/client_test.go @@ -618,10 +618,10 @@ func getStringFromKinesisStab(s *StubClient, is_gzip bool, t *testing.T) string if err != nil { t.Errorf("unable to read gzipped data error: %v", err) } - var midBuf bytes.Buffer - io.Copy(&midBuf, zr) + var zipBuf bytes.Buffer + io.Copy(&zipBuf, zr) zr.Close() - midStr := midBuf.String() + midStr := zipBuf.String() buf.WriteString(midStr) buf.WriteString("\n") } From 1c577e499615a173d8a4ef9a0c119740173c16ec Mon Sep 17 00:00:00 2001 From: leon Date: Thu, 19 May 2022 10:57:32 +0300 Subject: [PATCH 5/5] shutdown err --- ...blishEventsBatchGZIP_gzip_put_error.golden | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 testdata/streams/TestTestPublishEventsBatchGZIP_gzip_put_error.golden diff --git a/testdata/streams/TestTestPublishEventsBatchGZIP_gzip_put_error.golden b/testdata/streams/TestTestPublishEventsBatchGZIP_gzip_put_error.golden new file mode 100644 index 0000000..d42e3d8 --- /dev/null +++ b/testdata/streams/TestTestPublishEventsBatchGZIP_gzip_put_error.golden @@ -0,0 +1,58 @@ +batch 0 +record 0 +[cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc] +record 1 +[aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa,bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb,dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd,eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee] +record 2 +[ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff,gggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggg] +record 3 +[hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh,iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii,jjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjj] +record 4 +[kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkk,llllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllll] +record 5 +[mmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm,nnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnn] +record 6 +[oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo] +record 7 +[pppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppp] +record 8 +[qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq,rrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrr] +record 9 +[ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss] +record 10 +[tttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttt,uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu,vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv] +record 11 +[wwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwww,xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx] +record 12 +[yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy] +record 13 +[zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa] +record 14 +[bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb,cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc,dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd] +batch 1 +record 0 +[eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee,ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff] +record 1 +[gggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggg] +record 2 +[hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh,iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii,jjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjj] +record 3 +[kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkk,llllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllll] +record 4 +[mmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm,nnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnn] +record 5 +[oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo] +record 6 +[pppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppppp] +record 7 +[qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq,rrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrr] +record 8 +[ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss] +record 9 +[tttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttt,uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu,vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv] +record 10 +[wwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwww,xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx] +record 11 +[yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy] +record 12 +[zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz]