diff --git a/filters/scheduler/cleanup_test.go b/filters/scheduler/cleanup_test.go new file mode 100644 index 0000000000..a479173561 --- /dev/null +++ b/filters/scheduler/cleanup_test.go @@ -0,0 +1,85 @@ +package scheduler + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/proxy" + "github.com/zalando/skipper/routing" + "github.com/zalando/skipper/routing/testdataclient" + "github.com/zalando/skipper/scheduler" +) + +func TestCleanupOnBackendErrors(t *testing.T) { + doc := ` + aroute: * + -> lifo(1, 1, "100ms") + -> lifoGroup("foo", 1, 1, "100ms") + -> lifo(2, 2, "200ms") + -> lifoGroup("bar", 1, 1, "100ms") + -> fifo(1, 1, "200ms") + -> "http://test.invalid" + ` + + dc, err := testdataclient.NewDoc(doc) + require.NoError(t, err) + defer dc.Close() + + reg := scheduler.RegistryWith(scheduler.Options{}) + defer reg.Close() + + fr := make(filters.Registry) + fr.Register(NewLIFO()) + fr.Register(NewLIFOGroup()) + fr.Register(NewFifo()) + + ro := routing.Options{ + SignalFirstLoad: true, + FilterRegistry: fr, + DataClients: []routing.DataClient{dc}, + PostProcessors: []routing.PostProcessor{reg}, + } + + rt := routing.New(ro) + defer rt.Close() + + <-rt.FirstLoad() + + pr := proxy.WithParams(proxy.Params{ + Routing: rt, + }) + defer pr.Close() + + ts := httptest.NewServer(pr) + defer ts.Close() + + rsp, err := http.Get(ts.URL) + require.NoError(t, err) + rsp.Body.Close() + + var route *routing.Route + { + req, err := http.NewRequest("GET", ts.URL, nil) + require.NoError(t, err) + + route, _ = rt.Get().Do(req) + require.NotNil(t, route, "failed to lookup route") + } + + for _, f := range route.Filters { + if qf, ok := f.Filter.(interface{ GetQueue() *scheduler.Queue }); ok { + status := qf.GetQueue().Status() + assert.Equal(t, scheduler.QueueStatus{ActiveRequests: 0, QueuedRequests: 0, Closed: false}, status) + } else if qf, ok := f.Filter.(interface{ GetQueue() *scheduler.FifoQueue }); ok { + status := qf.GetQueue().Status() + assert.Equal(t, scheduler.QueueStatus{ActiveRequests: 0, QueuedRequests: 0, Closed: false}, status) + } else { + t.Fatal("filter does not implement GetQueue()") + } + } +} diff --git a/filters/scheduler/fifo.go b/filters/scheduler/fifo.go index 040d92a3ad..eca58e59fe 100644 --- a/filters/scheduler/fifo.go +++ b/filters/scheduler/fifo.go @@ -150,3 +150,9 @@ func (f *fifoFilter) Response(ctx filters.FilterContext) { pending[last]() ctx.StateBag()[fifoKey] = pending[:last] } + +// HandleErrorResponse is to opt-in for filters to get called +// Response(ctx) in case of errors via proxy. It has to return true to opt-in. +func (f *fifoFilter) HandleErrorResponse() bool { + return true +} diff --git a/filters/scheduler/lifo.go b/filters/scheduler/lifo.go index 60db279a58..717580bcd9 100644 --- a/filters/scheduler/lifo.go +++ b/filters/scheduler/lifo.go @@ -243,7 +243,7 @@ func (l *lifoFilter) Close() error { // increase the number of inflight requests and respond to the caller, // if the bounded queue returns an error. Status code by Error: // -// - 503 if jobqueue.ErrQueueFull +// - 503 if jobqueue.ErrStackFull // - 502 if jobqueue.ErrTimeout func (l *lifoFilter) Request(ctx filters.FilterContext) { request(l.GetQueue(), scheduler.LIFOKey, ctx) @@ -255,6 +255,12 @@ func (l *lifoFilter) Response(ctx filters.FilterContext) { response(scheduler.LIFOKey, ctx) } +// HandleErrorResponse is to opt-in for filters to get called +// Response(ctx) in case of errors via proxy. It has to return true to opt-in. +func (l *lifoFilter) HandleErrorResponse() bool { + return true +} + func (l *lifoGroupFilter) Group() string { return l.name } @@ -300,6 +306,12 @@ func (l *lifoGroupFilter) Response(ctx filters.FilterContext) { response(scheduler.LIFOKey, ctx) } +// HandleErrorResponse is to opt-in for filters to get called +// Response(ctx) in case of errors via proxy. It has to return true to opt-in. +func (l *lifoGroupFilter) HandleErrorResponse() bool { + return true +} + func request(q *scheduler.Queue, key string, ctx filters.FilterContext) { if q == nil { ctx.Logger().Warnf("Unexpected scheduler.Queue is nil for key %s", key) diff --git a/proxy/proxy.go b/proxy/proxy.go index 2d2ef831d8..ba119efb3f 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -40,7 +40,6 @@ import ( "github.com/zalando/skipper/ratelimit" "github.com/zalando/skipper/rfc" "github.com/zalando/skipper/routing" - "github.com/zalando/skipper/scheduler" "github.com/zalando/skipper/tracing" ) @@ -1046,29 +1045,6 @@ func (p *Proxy) do(ctx *context) (err error) { return errMaxLoopbacksReached } - // this can be deleted after fixing - // https://github.com/zalando/skipper/issues/1238 problem - // happens if we get proxy errors, for example connect errors, - // which would block responses until fifo() timeouts. - defer func() { - stateBag := ctx.StateBag() - - pendingFIFO, _ := stateBag[scheduler.FIFOKey].([]func()) - for _, done := range pendingFIFO { - done() - } - - pendingLIFO, _ := stateBag[scheduler.LIFOKey].([]func()) - for _, done := range pendingLIFO { - done() - } - - // Cleanup state bag to avoid double call of done() - // because do() could be called for loopback backend - delete(stateBag, scheduler.FIFOKey) - delete(stateBag, scheduler.LIFOKey) - }() - // proxy global setting if !ctx.wasExecuted() { if settings, retryAfter := p.limiters.Check(ctx.request); retryAfter > 0 {