Skip to content

Commit

Permalink
Decouple request forwarding from delay
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Chacin <[email protected]>
  • Loading branch information
pablochacin committed Aug 8, 2023
1 parent 72c4e48 commit 020cbba
Showing 1 changed file with 61 additions and 39 deletions.
100 changes: 61 additions & 39 deletions pkg/agent/protocol/http/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,73 +98,95 @@ func (h *httpHandler) isExcluded(r *http.Request) bool {
return false
}

// forward forwards a request to the upstream URL.
// Request is performed immediately, but response won't be sent before the duration specified in delay.
func (h *httpHandler) forward(rw http.ResponseWriter, req *http.Request, delay time.Duration) {
timer := time.After(delay)

// forward forwards a request to the upstream URL and returns a function that
// copies the response to a ResponseWriter
func (h *httpHandler) forward(req *http.Request) func(rw http.ResponseWriter) {
upstreamReq := req.Clone(context.Background())
upstreamReq.Host = h.upstreamURL.Host
upstreamReq.URL.Host = h.upstreamURL.Host
upstreamReq.URL.Scheme = h.upstreamURL.Scheme
upstreamReq.RequestURI = "" // It is an error to set this field in an HTTP client request.

//nolint:bodyclose // it is closed in the returned functions
response, err := h.client.Do(req)
<-timer

// return a function that writes the upstream error
if err != nil {
rw.WriteHeader(http.StatusBadGateway)
_, _ = fmt.Fprint(rw, err)
return
}

defer func() {
// Fully consume and then close upstream response body.
_, _ = io.Copy(io.Discard, response.Body)
_ = response.Body.Close()
}()
return func(rw http.ResponseWriter) {
rw.WriteHeader(http.StatusBadGateway)
_, _ = fmt.Fprint(rw, err)

// Mirror headers.
for key, values := range response.Header {
for _, value := range values {
rw.Header().Add(key, value)
// Fully consume and then close upstream response body.
_, _ = io.Copy(io.Discard, response.Body)
_ = response.Body.Close()
}
}

// Mirror status code.
rw.WriteHeader(response.StatusCode)
// return a function that copies upstream response
return func(rw http.ResponseWriter) {
// Mirror headers.
for key, values := range response.Header {
for _, value := range values {
rw.Header().Add(key, value)
}
}

// ignore errors writing body, nothing to do.
_, _ = io.Copy(rw, response.Body)
}
// Mirror status code.
rw.WriteHeader(response.StatusCode)

// injectError waits sleeps the duration specified in delay and then writes the configured error downstream.
func (h *httpHandler) injectError(rw http.ResponseWriter, delay time.Duration) {
time.Sleep(delay)
// ignore errors writing body, nothing to do.
_, _ = io.Copy(rw, response.Body)
_ = response.Body.Close()
}
}

// injectError writes the configured error to a ResponseWriter
func (h *httpHandler) injectError(rw http.ResponseWriter) {
rw.WriteHeader(int(h.disruption.ErrorCode))
_, _ = rw.Write([]byte(h.disruption.ErrorBody))
}

func (h *httpHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if h.isExcluded(req) {
//nolint:contextcheck // Unclear which context the linter requires us to propagate here.
h.forward(rw, req, 0)
return
}

func (h *httpHandler) delay() time.Duration {
delay := h.disruption.AverageDelay
if h.disruption.DelayVariation > 0 {
variation := int64(h.disruption.DelayVariation)
delay += time.Duration(variation - 2*rand.Int63n(variation))
}

if h.disruption.ErrorRate > 0 && rand.Float32() <= h.disruption.ErrorRate {
h.injectError(rw, delay)
return delay
}

func (h *httpHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
// if excluded, forward request and return response immediately
if h.isExcluded(req) {
//nolint:contextcheck // Unclear which context the linter requires us to propagate here.
h.forward(req)(rw)
return
}

//nolint:contextcheck // Unclear which context the linter requires us to propagate here.
h.forward(rw, req, delay)
// writer is used to write the response
var writer func(rw http.ResponseWriter)

// forward request
done := make(chan struct{})
go func() {
if h.disruption.ErrorRate > 0 && rand.Float32() <= h.disruption.ErrorRate {
writer = h.injectError
} else {
writer = h.forward(req)
}

done <- struct{}{}
}()

// wait for delay
<-time.After(h.delay())

// wait for upstream request
<-done

// return response
writer(rw)
}

// Start starts the execution of the proxy
Expand Down

0 comments on commit 020cbba

Please sign in to comment.