Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Enable parallel requests in the http output #5

Merged
merged 3 commits into from
Mar 25, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 72 additions & 4 deletions internal/impl/io/output_http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package io

import (
"context"
"errors"
"fmt"

"github.com/benthosdev/benthos/v4/internal/component"
"github.com/benthosdev/benthos/v4/internal/component/output"
"github.com/benthosdev/benthos/v4/internal/httpclient"
"github.com/benthosdev/benthos/v4/public/service"
Expand All @@ -27,6 +30,9 @@ It's possible to propagate the response from each HTTP request back to the input
service.NewBoolField("batch_as_multipart").
Description("Send message batches as a single request using [RFC1341](https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html). If disabled messages in batches will be sent as individual requests.").
Advanced().Default(false),
service.NewBoolField("parallel").
juniorrhis1 marked this conversation as resolved.
Show resolved Hide resolved
Description("When processing batched messages and `batch_as_multipart` is false, whether to send messages of the batch in parallel, otherwise they are sent serially.").
Advanced().Default(false),
service.NewBoolField("propagate_response").
Description("Whether responses from the server should be [propagated back](/docs/guides/sync_responses) to the input.").
Advanced().Default(false),
Expand Down Expand Up @@ -79,6 +85,7 @@ type httpClientWriter struct {
logURL string
propResponse bool
batchAsMultipart bool
parallel bool
}

func newHTTPClientOutputFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (*httpClientWriter, error) {
Expand Down Expand Up @@ -123,12 +130,18 @@ func newHTTPClientOutputFromParsed(conf *service.ParsedConfig, mgr *service.Reso
return nil, err
}

parallel, err := conf.FieldBool("parallel")
if err != nil {
return nil, err
}

return &httpClientWriter{
client: client,
log: mgr.Logger(),
logURL: logURL,
propResponse: propResponse,
batchAsMultipart: batchAsMultipart,
parallel: parallel,
}, nil
}

Expand All @@ -139,12 +152,15 @@ func (h *httpClientWriter) Connect(ctx context.Context) error {

func (h *httpClientWriter) WriteBatch(ctx context.Context, msg service.MessageBatch) error {
if len(msg) > 1 && !h.batchAsMultipart {
for _, v := range msg {
if err := h.WriteBatch(ctx, service.MessageBatch{v}); err != nil {
return err
if !h.parallel {
for _, v := range msg {
if err := h.WriteBatch(ctx, service.MessageBatch{v}); err != nil {
return err
}
}
}
return nil
// Hard, need to do parallel requests limited by max parallelism.
return h.handleParallelRequests(msg)
}

resultMsg, err := h.client.Send(ctx, msg)
Expand Down Expand Up @@ -175,6 +191,58 @@ func (h *httpClientWriter) WriteBatch(ctx context.Context, msg service.MessageBa
return err
}

func (h *httpClientWriter) handleParallelRequests(msg service.MessageBatch) error {
results := make(service.MessageBatch, len(msg))
for i, p := range msg {
results[i] = p.Copy()
}
reqChan, resChan := make(chan int), make(chan error)

for i := 0; i < len(msg); i++ {
go func() {
for index := range reqChan {
tmpMsg := service.MessageBatch{msg[index]}
result, err := h.client.Send(context.Background(), tmpMsg)
if err == nil && len(result) != 1 {
err = fmt.Errorf("unexpected response size: %v", len(result))
}
if err == nil {
mBytes, _ := result[0].AsBytes()
results[index].SetBytes(mBytes)
_ = result[0].MetaWalkMut(func(k string, v any) error {
results[index].MetaSetMut(k, v)
return nil
})
} else {
var hErr component.ErrUnexpectedHTTPRes
if ok := errors.As(err, &hErr); ok {
results[index].MetaSetMut("http_status_code", hErr.Code)
}
results[index].SetError(err)
}
resChan <- err
}
}()
}
go func() {
for i := 0; i < len(msg); i++ {
reqChan <- i
}
}()
for i := 0; i < len(msg); i++ {
if err := <-resChan; err != nil {
h.log.Errorf("HTTP parallel request to '%v' failed: %v", h.logURL, err)
}
}

close(reqChan)

if len(results) < 1 {
return fmt.Errorf("HTTP response from '%v' was empty", h.logURL)
}
return nil
}

func (h *httpClientWriter) Close(ctx context.Context) error {
return h.client.Close(ctx)
}
Loading