Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix throughput err #9

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 20 additions & 49 deletions streams/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/elastic/beats/libbeat/beat"
Expand Down Expand Up @@ -132,30 +131,18 @@ func (client *client) publishBatch(b batch) ([]publisher.Event, error) {
}
}
logp.Debug("kinesis", "mapped to records: %v", records)
res, err := client.putKinesisRecords(records)
failed := collectFailedEvents(res, okEvents)
if len(failed) == 0 {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case kinesis.ErrCodeLimitExceededException:
case kinesis.ErrCodeProvisionedThroughputExceededException:
case kinesis.ErrCodeInternalFailureException:
logp.Info("putKinesisRecords failed (api level, not per-record failure). Will retry all records. error: %v", err)
failed = eventsFlattenLists(okEvents)
default:
logp.Warn("putKinesisRecords persistent failure. Will not retry. error: %v", err)
}
}
}
if err != nil || len(failed) > 0 {
err, ok := client.putKinesisRecords(records)
LeonPev marked this conversation as resolved.
Show resolved Hide resolved
if !ok || err != nil {
dur := client.backoff.Duration()
logp.Info("retrying %d events client.backoff.Duration %s on error: %v", len(failed), dur, err)
logp.Info("retrying %d events client.backoff.Duration %s on error: %v", len(records), dur, err)
LeonPev marked this conversation as resolved.
Show resolved Hide resolved
LeonPev marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(dur)
return eventsFlattenLists(okEvents), err
} else {
client.backoff.Reset()
}
return failed, err
return nil, nil
}

func eventsFlattenLists(listOfListsEvents [][]publisher.Event) []publisher.Event {
var flatEvents []publisher.Event
for _, events := range listOfListsEvents {
Expand Down Expand Up @@ -260,7 +247,6 @@ func (client *client) mapEventsGZip(events []publisher.Event) []batch {
batchSize := 0

for i := range events {
// var gzippedBytes []byte
event := events[i]
buf, err := client.getBytesFromEvent(&event)
if err != nil {
Expand Down Expand Up @@ -340,7 +326,6 @@ func (client *client) mapEventsGZip(events []publisher.Event) []batch {
dropped: dropped})
}
records = append(records, record)
okEvents = append(okEvents, []publisher.Event{events[len(events)-1]})
}
batches = append(batches, batch{
okEvents: okEvents,
Expand Down Expand Up @@ -374,6 +359,7 @@ func (client *client) getBytesFromEvent(event *publisher.Event) ([]byte, error)
logp.Critical("Unable to encode event: %v", err)
return nil, err
}

// See https://github.com/elastic/beats/blob/5a6630a8bc9b9caf312978f57d1d9193bdab1ac7/libbeat/outputs/kafka/client.go#L163-L164
// You need to copy the byte data like this. Otherwise you see strange issues like all the records sent in a same batch has the same Data.
buf = make([]byte, len(serializedEvent))
Expand Down Expand Up @@ -430,42 +416,27 @@ func (client *client) mapBytes(buf []byte, event *publisher.Event) (*kinesis.Put
return &kinesis.PutRecordsRequestEntry{Data: bufCP, PartitionKey: aws.String(partitionKey)}, nil
}

func (client *client) putKinesisRecords(records []*kinesis.PutRecordsRequestEntry) (*kinesis.PutRecordsOutput, error) {
func (client *client) putKinesisRecords(records []*kinesis.PutRecordsRequestEntry) (error, bool) {
request := kinesis.PutRecordsInput{
StreamName: &client.streamName,
Records: records,
}
res, err := client.streams.PutRecords(&request)
if err != nil {
return res, fmt.Errorf("failed to put records: %v", err)
return err, false
}
return res, nil
}

func collectFailedEvents(res *kinesis.PutRecordsOutput, events [][]publisher.Event) []publisher.Event {
if res.FailedRecordCount != nil && *res.FailedRecordCount > 0 {
failedEvents := make([]publisher.Event, 0)
records := res.Records
for i, r := range records {
if r == nil {
// See https://github.com/s12v/awsbeats/issues/27 for more info
logp.NewLogger("streams").Warn("no record returned from kinesis for event: ", events[i])
continue
}
if r.ErrorCode == nil {
// logp.NewLogger("streams").Warn("skipping failed event with unexpected state: corresponding kinesis record misses error code: ", r)
continue
}
if *r.ErrorCode == "ProvisionedThroughputExceededException" {
logp.NewLogger("streams").Debug("throughput exceeded. will retry", r)
failedEvents = append(failedEvents, events[i]...)
}
if *r.ErrorCode != "" {
failedEvents = append(failedEvents, events[i]...)
allGood := true
for _, record := range res.Records {
if record != nil && record.ErrorCode != nil {
if *record.ErrorCode != "" {
allGood = false
if record.ErrorMessage != nil {
logp.Critical("Failed to put record to kinesis.code:%s msg :%s", *record.ErrorCode, *record.ErrorMessage)
} else {
logp.Critical("Failed to put record to kinesis: %v", *record.ErrorCode)
}
}
}
logp.Warn("Retrying %d events", len(failedEvents))
return failedEvents
}
return []publisher.Event{}
return nil, allGood
}
Loading