Skip to content

Commit

Permalink
filters/scheduler: handle errors (#2680)
Browse files Browse the repository at this point in the history
Make lifo, lifoGroup and fifo filters backed error aware and remove
cleanup from the proxy.

Follow up on #1054, #1086 and #2239
Updates #1238

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov committed Nov 27, 2023
1 parent e48b7a8 commit f0c6ab5
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 25 deletions.
85 changes: 85 additions & 0 deletions filters/scheduler/cleanup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package scheduler

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/proxy"
"github.com/zalando/skipper/routing"
"github.com/zalando/skipper/routing/testdataclient"
"github.com/zalando/skipper/scheduler"
)

func TestCleanupOnBackendErrors(t *testing.T) {
doc := `
aroute: *
-> lifo(1, 1, "100ms")
-> lifoGroup("foo", 1, 1, "100ms")
-> lifo(2, 2, "200ms")
-> lifoGroup("bar", 1, 1, "100ms")
-> fifo(1, 1, "200ms")
-> "http://test.invalid"
`

dc, err := testdataclient.NewDoc(doc)
require.NoError(t, err)
defer dc.Close()

reg := scheduler.RegistryWith(scheduler.Options{})
defer reg.Close()

fr := make(filters.Registry)
fr.Register(NewLIFO())
fr.Register(NewLIFOGroup())
fr.Register(NewFifo())

ro := routing.Options{
SignalFirstLoad: true,
FilterRegistry: fr,
DataClients: []routing.DataClient{dc},
PostProcessors: []routing.PostProcessor{reg},
}

rt := routing.New(ro)
defer rt.Close()

<-rt.FirstLoad()

pr := proxy.WithParams(proxy.Params{
Routing: rt,
})
defer pr.Close()

ts := httptest.NewServer(pr)
defer ts.Close()

rsp, err := http.Get(ts.URL)
require.NoError(t, err)
rsp.Body.Close()

var route *routing.Route
{
req, err := http.NewRequest("GET", ts.URL, nil)
require.NoError(t, err)

route, _ = rt.Get().Do(req)
require.NotNil(t, route, "failed to lookup route")
}

for _, f := range route.Filters {
if qf, ok := f.Filter.(interface{ GetQueue() *scheduler.Queue }); ok {
status := qf.GetQueue().Status()
assert.Equal(t, scheduler.QueueStatus{ActiveRequests: 0, QueuedRequests: 0, Closed: false}, status)
} else if qf, ok := f.Filter.(interface{ GetQueue() *scheduler.FifoQueue }); ok {
status := qf.GetQueue().Status()
assert.Equal(t, scheduler.QueueStatus{ActiveRequests: 0, QueuedRequests: 0, Closed: false}, status)
} else {
t.Fatal("filter does not implement GetQueue()")
}
}
}
6 changes: 6 additions & 0 deletions filters/scheduler/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,9 @@ func (f *fifoFilter) Response(ctx filters.FilterContext) {
pending[last]()
ctx.StateBag()[fifoKey] = pending[:last]
}

// HandleErrorResponse is to opt-in for filters to get called
// Response(ctx) in case of errors via proxy. It has to return true to opt-in.
func (f *fifoFilter) HandleErrorResponse() bool {
return true
}
14 changes: 13 additions & 1 deletion filters/scheduler/lifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (l *lifoFilter) Close() error {
// increase the number of inflight requests and respond to the caller,
// if the bounded queue returns an error. Status code by Error:
//
// - 503 if jobqueue.ErrQueueFull
// - 503 if jobqueue.ErrStackFull
// - 502 if jobqueue.ErrTimeout
func (l *lifoFilter) Request(ctx filters.FilterContext) {
request(l.GetQueue(), scheduler.LIFOKey, ctx)
Expand All @@ -255,6 +255,12 @@ func (l *lifoFilter) Response(ctx filters.FilterContext) {
response(scheduler.LIFOKey, ctx)
}

// HandleErrorResponse is to opt-in for filters to get called
// Response(ctx) in case of errors via proxy. It has to return true to opt-in.
func (l *lifoFilter) HandleErrorResponse() bool {
return true
}

func (l *lifoGroupFilter) Group() string {
return l.name
}
Expand Down Expand Up @@ -300,6 +306,12 @@ func (l *lifoGroupFilter) Response(ctx filters.FilterContext) {
response(scheduler.LIFOKey, ctx)
}

// HandleErrorResponse is to opt-in for filters to get called
// Response(ctx) in case of errors via proxy. It has to return true to opt-in.
func (l *lifoGroupFilter) HandleErrorResponse() bool {
return true
}

func request(q *scheduler.Queue, key string, ctx filters.FilterContext) {
if q == nil {
ctx.Logger().Warnf("Unexpected scheduler.Queue is nil for key %s", key)
Expand Down
24 changes: 0 additions & 24 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/zalando/skipper/ratelimit"
"github.com/zalando/skipper/rfc"
"github.com/zalando/skipper/routing"
"github.com/zalando/skipper/scheduler"
"github.com/zalando/skipper/tracing"
)

Expand Down Expand Up @@ -1046,29 +1045,6 @@ func (p *Proxy) do(ctx *context) (err error) {
return errMaxLoopbacksReached
}

// this can be deleted after fixing
// https://github.com/zalando/skipper/issues/1238 problem
// happens if we get proxy errors, for example connect errors,
// which would block responses until fifo() timeouts.
defer func() {
stateBag := ctx.StateBag()

pendingFIFO, _ := stateBag[scheduler.FIFOKey].([]func())
for _, done := range pendingFIFO {
done()
}

pendingLIFO, _ := stateBag[scheduler.LIFOKey].([]func())
for _, done := range pendingLIFO {
done()
}

// Cleanup state bag to avoid double call of done()
// because do() could be called for loopback backend
delete(stateBag, scheduler.FIFOKey)
delete(stateBag, scheduler.LIFOKey)
}()

// proxy global setting
if !ctx.wasExecuted() {
if settings, retryAfter := p.limiters.Check(ctx.request); retryAfter > 0 {
Expand Down

0 comments on commit f0c6ab5

Please sign in to comment.