From 0ccfa506bf8835d805ad2b54373ca40605f88f72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A2nio?= Date: Tue, 28 May 2024 20:08:15 +0100 Subject: [PATCH] fix: elastic error handler (#18) --- internal/impl/elasticsearch/output.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/internal/impl/elasticsearch/output.go b/internal/impl/elasticsearch/output.go index 837792ed60..3d1542ebf6 100644 --- a/internal/impl/elasticsearch/output.go +++ b/internal/impl/elasticsearch/output.go @@ -427,19 +427,19 @@ func (e *Output) WriteBatch(ctx context.Context, msg service.MessageBatch) error e.log.Errorf("Elasticsearch message '%v' rejected with status [%v]: %v\n", item.Id, item.Status, reason) if !shouldRetry(item.Status) { - return fmt.Errorf("failed to send message '%v': %v", item.Id, reason) + msg[i].SetError(fmt.Errorf("failed to send message '%v': %v", item.Id, reason)) + } else { + // IMPORTANT: i exactly matches the index of our source requests + // and when we re-run our bulk request with errored requests + // that must remain true. + sourceReq := requests[i] + bulkReq, err := e.buildBulkableRequest(sourceReq) + if err != nil { + return err + } + b.Add(bulkReq) + newRequests = append(newRequests, sourceReq) } - - // IMPORTANT: i exactly matches the index of our source requests - // and when we re-run our bulk request with errored requests - // that must remain true. - sourceReq := requests[i] - bulkReq, err := e.buildBulkableRequest(sourceReq) - if err != nil { - return err - } - b.Add(bulkReq) - newRequests = append(newRequests, sourceReq) } } requests = newRequests