From 1b60014315fa68a79a815c1e3a4acd8d67aeb781 Mon Sep 17 00:00:00 2001 From: juniorrhis1 Date: Mon, 25 Mar 2024 10:30:50 -0300 Subject: [PATCH] fix: Enable parallel requests in the http output (#5) * fix: Enable parallel requests in the http output * chore: update doc * test: Resolve unit test --- internal/impl/io/output_http_client.go | 77 ++++++++++++++++++- .../docs/components/outputs/http_client.md | 9 +++ 2 files changed, 82 insertions(+), 4 deletions(-) diff --git a/internal/impl/io/output_http_client.go b/internal/impl/io/output_http_client.go index ace8351eec..7b737578fe 100644 --- a/internal/impl/io/output_http_client.go +++ b/internal/impl/io/output_http_client.go @@ -2,7 +2,10 @@ package io import ( "context" + "errors" + "fmt" + "github.com/benthosdev/benthos/v4/internal/component" "github.com/benthosdev/benthos/v4/internal/component/output" "github.com/benthosdev/benthos/v4/internal/httpclient" "github.com/benthosdev/benthos/v4/public/service" @@ -27,6 +30,9 @@ It's possible to propagate the response from each HTTP request back to the input service.NewBoolField("batch_as_multipart"). Description("Send message batches as a single request using [RFC1341](https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html). If disabled messages in batches will be sent as individual requests."). Advanced().Default(false), + service.NewBoolField("parallel"). + Description("When processing batched messages and `batch_as_multipart` is false, whether to send messages of the batch in parallel, otherwise they are sent serially."). + Advanced().Default(false), service.NewBoolField("propagate_response"). Description("Whether responses from the server should be [propagated back](/docs/guides/sync_responses) to the input."). Advanced().Default(false), @@ -79,6 +85,7 @@ type httpClientWriter struct { logURL string propResponse bool batchAsMultipart bool + parallel bool } func newHTTPClientOutputFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (*httpClientWriter, error) { @@ -123,12 +130,18 @@ func newHTTPClientOutputFromParsed(conf *service.ParsedConfig, mgr *service.Reso return nil, err } + parallel, err := conf.FieldBool("parallel") + if err != nil { + return nil, err + } + return &httpClientWriter{ client: client, log: mgr.Logger(), logURL: logURL, propResponse: propResponse, batchAsMultipart: batchAsMultipart, + parallel: parallel, }, nil } @@ -139,12 +152,16 @@ func (h *httpClientWriter) Connect(ctx context.Context) error { func (h *httpClientWriter) WriteBatch(ctx context.Context, msg service.MessageBatch) error { if len(msg) > 1 && !h.batchAsMultipart { - for _, v := range msg { - if err := h.WriteBatch(ctx, service.MessageBatch{v}); err != nil { - return err + if !h.parallel { + for _, v := range msg { + if err := h.WriteBatch(ctx, service.MessageBatch{v}); err != nil { + return err + } } + } else { + // Hard, need to do parallel requests limited by max parallelism. + return h.handleParallelRequests(msg) } - return nil } resultMsg, err := h.client.Send(ctx, msg) @@ -175,6 +192,58 @@ func (h *httpClientWriter) WriteBatch(ctx context.Context, msg service.MessageBa return err } +func (h *httpClientWriter) handleParallelRequests(msg service.MessageBatch) error { + results := make(service.MessageBatch, len(msg)) + for i, p := range msg { + results[i] = p.Copy() + } + reqChan, resChan := make(chan int), make(chan error) + + for i := 0; i < len(msg); i++ { + go func() { + for index := range reqChan { + tmpMsg := service.MessageBatch{msg[index]} + result, err := h.client.Send(context.Background(), tmpMsg) + if err == nil && len(result) != 1 { + err = fmt.Errorf("unexpected response size: %v", len(result)) + } + if err == nil { + mBytes, _ := result[0].AsBytes() + results[index].SetBytes(mBytes) + _ = result[0].MetaWalkMut(func(k string, v any) error { + results[index].MetaSetMut(k, v) + return nil + }) + } else { + var hErr component.ErrUnexpectedHTTPRes + if ok := errors.As(err, &hErr); ok { + results[index].MetaSetMut("http_status_code", hErr.Code) + } + results[index].SetError(err) + } + resChan <- err + } + }() + } + go func() { + for i := 0; i < len(msg); i++ { + reqChan <- i + } + }() + for i := 0; i < len(msg); i++ { + if err := <-resChan; err != nil { + h.log.Errorf("HTTP parallel request to '%v' failed: %v", h.logURL, err) + } + } + + close(reqChan) + + if len(results) < 1 { + return fmt.Errorf("HTTP response from '%v' was empty", h.logURL) + } + return nil +} + func (h *httpClientWriter) Close(ctx context.Context) error { return h.client.Close(ctx) } diff --git a/website/docs/components/outputs/http_client.md b/website/docs/components/outputs/http_client.md index bf00015498..6178286f79 100644 --- a/website/docs/components/outputs/http_client.md +++ b/website/docs/components/outputs/http_client.md @@ -102,6 +102,7 @@ output: successful_on: [] proxy_url: "" # No default (optional) batch_as_multipart: false + parallel: false propagate_response: false max_in_flight: 64 batching: @@ -705,6 +706,14 @@ Type: `string` Send message batches as a single request using [RFC1341](https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html). If disabled messages in batches will be sent as individual requests. +Type: `bool` +Default: `false` + +### `parallel` + +When processing batched messages and `batch_as_multipart` is false, whether to send messages of the batch in parallel, otherwise they are sent serially. + + Type: `bool` Default: `false`