Skip to content

Commit

Permalink
fix: Enable parallel requests in the http output (#5)
Browse files Browse the repository at this point in the history
* fix: Enable parallel requests in the http output

* chore: update doc

* test: Resolve unit test
  • Loading branch information
juniorrhis1 authored Mar 25, 2024
1 parent c5c3b28 commit 1b60014
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 4 deletions.
77 changes: 73 additions & 4 deletions internal/impl/io/output_http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package io

import (
"context"
"errors"
"fmt"

"github.com/benthosdev/benthos/v4/internal/component"
"github.com/benthosdev/benthos/v4/internal/component/output"
"github.com/benthosdev/benthos/v4/internal/httpclient"
"github.com/benthosdev/benthos/v4/public/service"
Expand All @@ -27,6 +30,9 @@ It's possible to propagate the response from each HTTP request back to the input
service.NewBoolField("batch_as_multipart").
Description("Send message batches as a single request using [RFC1341](https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html). If disabled messages in batches will be sent as individual requests.").
Advanced().Default(false),
service.NewBoolField("parallel").
Description("When processing batched messages and `batch_as_multipart` is false, whether to send messages of the batch in parallel, otherwise they are sent serially.").
Advanced().Default(false),
service.NewBoolField("propagate_response").
Description("Whether responses from the server should be [propagated back](/docs/guides/sync_responses) to the input.").
Advanced().Default(false),
Expand Down Expand Up @@ -79,6 +85,7 @@ type httpClientWriter struct {
logURL string
propResponse bool
batchAsMultipart bool
parallel bool
}

func newHTTPClientOutputFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (*httpClientWriter, error) {
Expand Down Expand Up @@ -123,12 +130,18 @@ func newHTTPClientOutputFromParsed(conf *service.ParsedConfig, mgr *service.Reso
return nil, err
}

parallel, err := conf.FieldBool("parallel")
if err != nil {
return nil, err
}

return &httpClientWriter{
client: client,
log: mgr.Logger(),
logURL: logURL,
propResponse: propResponse,
batchAsMultipart: batchAsMultipart,
parallel: parallel,
}, nil
}

Expand All @@ -139,12 +152,16 @@ func (h *httpClientWriter) Connect(ctx context.Context) error {

func (h *httpClientWriter) WriteBatch(ctx context.Context, msg service.MessageBatch) error {
if len(msg) > 1 && !h.batchAsMultipart {
for _, v := range msg {
if err := h.WriteBatch(ctx, service.MessageBatch{v}); err != nil {
return err
if !h.parallel {
for _, v := range msg {
if err := h.WriteBatch(ctx, service.MessageBatch{v}); err != nil {
return err
}
}
} else {
// Hard, need to do parallel requests limited by max parallelism.
return h.handleParallelRequests(msg)
}
return nil
}

resultMsg, err := h.client.Send(ctx, msg)
Expand Down Expand Up @@ -175,6 +192,58 @@ func (h *httpClientWriter) WriteBatch(ctx context.Context, msg service.MessageBa
return err
}

func (h *httpClientWriter) handleParallelRequests(msg service.MessageBatch) error {
results := make(service.MessageBatch, len(msg))
for i, p := range msg {
results[i] = p.Copy()
}
reqChan, resChan := make(chan int), make(chan error)

for i := 0; i < len(msg); i++ {
go func() {
for index := range reqChan {
tmpMsg := service.MessageBatch{msg[index]}
result, err := h.client.Send(context.Background(), tmpMsg)
if err == nil && len(result) != 1 {
err = fmt.Errorf("unexpected response size: %v", len(result))
}
if err == nil {
mBytes, _ := result[0].AsBytes()
results[index].SetBytes(mBytes)
_ = result[0].MetaWalkMut(func(k string, v any) error {
results[index].MetaSetMut(k, v)
return nil
})
} else {
var hErr component.ErrUnexpectedHTTPRes
if ok := errors.As(err, &hErr); ok {
results[index].MetaSetMut("http_status_code", hErr.Code)
}
results[index].SetError(err)
}
resChan <- err
}
}()
}
go func() {
for i := 0; i < len(msg); i++ {
reqChan <- i
}
}()
for i := 0; i < len(msg); i++ {
if err := <-resChan; err != nil {
h.log.Errorf("HTTP parallel request to '%v' failed: %v", h.logURL, err)
}
}

close(reqChan)

if len(results) < 1 {
return fmt.Errorf("HTTP response from '%v' was empty", h.logURL)
}
return nil
}

func (h *httpClientWriter) Close(ctx context.Context) error {
return h.client.Close(ctx)
}
9 changes: 9 additions & 0 deletions website/docs/components/outputs/http_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ output:
successful_on: []
proxy_url: "" # No default (optional)
batch_as_multipart: false
parallel: false
propagate_response: false
max_in_flight: 64
batching:
Expand Down Expand Up @@ -705,6 +706,14 @@ Type: `string`
Send message batches as a single request using [RFC1341](https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html). If disabled messages in batches will be sent as individual requests.


Type: `bool`
Default: `false`

### `parallel`

When processing batched messages and `batch_as_multipart` is false, whether to send messages of the batch in parallel, otherwise they are sent serially.


Type: `bool`
Default: `false`

Expand Down

0 comments on commit 1b60014

Please sign in to comment.