Skip to content

Commit

Permalink
fix: elastic error handler (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
janiodev authored May 28, 2024
1 parent d402a59 commit 0ccfa50
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions internal/impl/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,19 +427,19 @@ func (e *Output) WriteBatch(ctx context.Context, msg service.MessageBatch) error

e.log.Errorf("Elasticsearch message '%v' rejected with status [%v]: %v\n", item.Id, item.Status, reason)
if !shouldRetry(item.Status) {
return fmt.Errorf("failed to send message '%v': %v", item.Id, reason)
msg[i].SetError(fmt.Errorf("failed to send message '%v': %v", item.Id, reason))
} else {
// IMPORTANT: i exactly matches the index of our source requests
// and when we re-run our bulk request with errored requests
// that must remain true.
sourceReq := requests[i]
bulkReq, err := e.buildBulkableRequest(sourceReq)
if err != nil {
return err
}
b.Add(bulkReq)
newRequests = append(newRequests, sourceReq)
}

// IMPORTANT: i exactly matches the index of our source requests
// and when we re-run our bulk request with errored requests
// that must remain true.
sourceReq := requests[i]
bulkReq, err := e.buildBulkableRequest(sourceReq)
if err != nil {
return err
}
b.Add(bulkReq)
newRequests = append(newRequests, sourceReq)
}
}
requests = newRequests
Expand Down

0 comments on commit 0ccfa50

Please sign in to comment.