diff --git a/docs/reference/filters.md b/docs/reference/filters.md index 98fea702f5..fa2da89075 100644 --- a/docs/reference/filters.md +++ b/docs/reference/filters.md @@ -2819,6 +2819,31 @@ Example: fifo(100, 150, "10s") ``` +### fifoWithBody + +This Filter is similar to the [lifo](#lifo) filter in regards to +parameters and status codes. +Performance considerations are similar to [fifo](#fifo). + +The difference between fifo and fifoWithBody is that fifo will decrement +the concurrency as soon as the backend sent response headers and +fifoWithBody will decrement the concurrency if the response body was +served. Normally both are very similar, but if you have a fully async +component that serves multiple website fragments, this would decrement +concurrency too early. + +Parameters: + +* MaxConcurrency specifies how many goroutines are allowed to work on this queue (int) +* MaxQueueSize sets the queue size (int) +* Timeout sets the timeout to get request scheduled (time) + +Example: + +``` +fifoWithBody(100, 150, "10s") +``` + ### lifo This Filter changes skipper to handle the route with a bounded last in diff --git a/filters/builtin/builtin.go b/filters/builtin/builtin.go index 0609d87412..d4e1895bf4 100644 --- a/filters/builtin/builtin.go +++ b/filters/builtin/builtin.go @@ -216,6 +216,7 @@ func Filters() []filters.Spec { auth.NewForwardToken(), auth.NewForwardTokenField(), scheduler.NewFifo(), + scheduler.NewFifoWithBody(), scheduler.NewLIFO(), scheduler.NewLIFOGroup(), rfc.NewPath(), diff --git a/filters/filters.go b/filters/filters.go index 6caaaa7a41..9f9a1f333a 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -327,6 +327,7 @@ const ( SetDynamicBackendUrl = "setDynamicBackendUrl" ApiUsageMonitoringName = "apiUsageMonitoring" FifoName = "fifo" + FifoWithBodyName = "fifoWithBody" LifoName = "lifo" LifoGroupName = "lifoGroup" RfcPathName = "rfcPath" diff --git a/filters/scheduler/fifo.go b/filters/scheduler/fifo.go index eca58e59fe..50ceb95b65 100644 --- a/filters/scheduler/fifo.go +++ b/filters/scheduler/fifo.go @@ -11,24 +11,31 @@ import ( "github.com/zalando/skipper/scheduler" ) -const ( - fifoKey string = "fifo" -) - type ( - fifoSpec struct{} + fifoSpec struct { + typ string + } fifoFilter struct { config scheduler.Config queue *scheduler.FifoQueue + typ string } ) func NewFifo() filters.Spec { - return &fifoSpec{} + return &fifoSpec{ + typ: filters.FifoName, + } +} + +func NewFifoWithBody() filters.Spec { + return &fifoSpec{ + typ: filters.FifoWithBodyName, + } } -func (*fifoSpec) Name() string { - return filters.FifoName +func (s *fifoSpec) Name() string { + return s.typ } // CreateFilter creates a fifoFilter, that will use a semaphore based @@ -65,6 +72,7 @@ func (s *fifoSpec) CreateFilter(args []interface{}) (filters.Filter, error) { } return &fifoFilter{ + typ: s.typ, config: scheduler.Config{ MaxConcurrency: cc, MaxQueueSize: qs, @@ -132,23 +140,29 @@ func (f *fifoFilter) Request(ctx filters.FilterContext) { } // ok - pending, _ := ctx.StateBag()[fifoKey].([]func()) - ctx.StateBag()[fifoKey] = append(pending, done) + pending, _ := ctx.StateBag()[f.typ].([]func()) + ctx.StateBag()[f.typ] = append(pending, done) } // Response will decrease the number of inflight requests to release // the concurrency reservation for the request. func (f *fifoFilter) Response(ctx filters.FilterContext) { - pending, ok := ctx.StateBag()[fifoKey].([]func()) - if !ok { - return - } - last := len(pending) - 1 - if last < 0 { - return + switch f.typ { + case filters.FifoName: + pending, ok := ctx.StateBag()[f.typ].([]func()) + if !ok { + return + } + last := len(pending) - 1 + if last < 0 { + return + } + pending[last]() + ctx.StateBag()[f.typ] = pending[:last] + + case filters.FifoWithBodyName: + // nothing to do here, handled in the proxy after copyStream() } - pending[last]() - ctx.StateBag()[fifoKey] = pending[:last] } // HandleErrorResponse is to opt-in for filters to get called diff --git a/filters/scheduler/fifo_test.go b/filters/scheduler/fifo_test.go index e23a410e66..c9f0093f20 100644 --- a/filters/scheduler/fifo_test.go +++ b/filters/scheduler/fifo_test.go @@ -1,14 +1,19 @@ package scheduler import ( + "bytes" "fmt" "io" "net/http" stdlibhttptest "net/http/httptest" + "strings" "testing" + "testing/iotest" "time" "github.com/opentracing/opentracing-go/mocktracer" + "github.com/sirupsen/logrus" + "github.com/zalando/skipper/eskip" "github.com/zalando/skipper/filters" "github.com/zalando/skipper/metrics/metricstest" @@ -19,7 +24,29 @@ import ( "github.com/zalando/skipper/scheduler" ) -func TestFifoCreateFilter(t *testing.T) { +func TestCreateFifoName(t *testing.T) { + for _, tt := range []struct { + name string + filterFunc func() filters.Spec + }{ + { + name: filters.FifoName, + filterFunc: NewFifo, + }, + { + name: filters.FifoWithBodyName, + filterFunc: NewFifoWithBody, + }, + } { + t.Run(tt.name, func(t *testing.T) { + if tt.filterFunc().Name() != tt.name { + t.Fatalf("got %q, want %q", tt.filterFunc().Name(), tt.name) + } + }) + } +} + +func TestCreateFifoFilter(t *testing.T) { for _, tt := range []struct { name string args []interface{} @@ -58,6 +85,33 @@ func TestFifoCreateFilter(t *testing.T) { Timeout: 1 * time.Second, }, }, + { + name: "fifo negative value arg1", + args: []interface{}{ + -3, + 5, + "1s", + }, + wantParseErr: true, + }, + { + name: "fifo negative value arg2", + args: []interface{}{ + 3, + -5, + "1s", + }, + wantParseErr: true, + }, + { + name: "fifo too small value arg3", + args: []interface{}{ + 3, + 5, + "1ns", + }, + wantParseErr: true, + }, { name: "fifo wrong type arg1", args: []interface{}{ @@ -106,43 +160,239 @@ func TestFifoCreateFilter(t *testing.T) { }, } { t.Run(tt.name, func(t *testing.T) { - spec := &fifoSpec{} - ff, err := spec.CreateFilter(tt.args) - if err != nil && !tt.wantParseErr { - t.Fatalf("Failed to parse filter: %v", err) + for _, f := range []func() filters.Spec{NewFifo, NewFifoWithBody} { + spec := f() + ff, err := spec.CreateFilter(tt.args) + if err != nil && !tt.wantParseErr { + t.Fatalf("Failed to parse filter: %v", err) + } + if err == nil && tt.wantParseErr { + t.Fatal("Failed to get wanted error on create filter") + } + + if _, ok := ff.(*fifoFilter); !ok && err == nil { + t.Fatal("Failed to convert filter to *fifoFilter") + } } - if err == nil && tt.wantParseErr { - t.Fatal("Failed to get wanted error on create filter") + }) + } +} + +type flusher struct { + w http.ResponseWriter +} + +func (f *flusher) Flush() { + f.w.(http.Flusher).Flush() +} + +func (f *flusher) Unwrap() http.ResponseWriter { + return f.w +} + +func (f *flusher) Write(p []byte) (n int, err error) { + n, err = f.w.Write(p) + if err == nil { + f.Flush() + } + return +} + +type slowReader struct { + r io.Reader + d time.Duration +} + +func (sr *slowReader) Read(p []byte) (int, error) { + logrus.Infof("slowReader: %d", len(p)) + if len(p) == 0 { + return 0, nil + } + time.Sleep(sr.d) + n, err := sr.r.Read(p) + logrus.Infof("slowReader: %d %v", n, err) + return n, err +} + +func TestFifoWithBody(t *testing.T) { + for _, tt := range []struct { + name string + args []interface{} + backendTime time.Duration + responseSize int + wantErr bool + }{ + { + name: "fifoWithBody 1024B with 1 queue should be ok", + args: []interface{}{1, 1, "1s"}, + backendTime: 10 * time.Millisecond, + responseSize: 1024, + }, + { + name: "fifoWithBody 1024B with 0 queue should fail", + args: []interface{}{1, 0, "10ms"}, + backendTime: 50 * time.Millisecond, + responseSize: 1024, + wantErr: true, + }, + { + name: "fifoWithBody 2x 1024B with 1 queue should be ok", + args: []interface{}{1, 1, "1s"}, + backendTime: 10 * time.Millisecond, + responseSize: 2 * 1024, + }, + { + name: "fifoWithBody 2x 1024B with 0 queue should fail", + args: []interface{}{1, 0, "15ms"}, + backendTime: 10 * time.Millisecond, + responseSize: 2 * 1024, + wantErr: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + + backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Logf("backend path: %s", r.URL.Path) + buf := bytes.NewBufferString(strings.Repeat("A", tt.responseSize)) + halfReader := iotest.HalfReader(buf) + sr := &slowReader{ + d: 100 * time.Millisecond, + r: halfReader, + } + + w.WriteHeader(http.StatusOK) + // sleep here to test the difference between streaming response and not + time.Sleep(tt.backendTime) + // TODO: maybe better to do slow body streaming? + b := make([]byte, 1024) + io.CopyBuffer(&flusher{w}, sr, b) + })) + defer backend.Close() + + // proxy + metrics := &metricstest.MockMetrics{} + defer metrics.Close() + reg := scheduler.RegistryWith(scheduler.Options{ + Metrics: metrics, + EnableRouteFIFOMetrics: true, + }) + defer reg.Close() + fr := make(filters.Registry) + fr.Register(NewFifoWithBody()) + args := append(tt.args, backend.URL) + doc := fmt.Sprintf(`r: * -> fifoWithBody(%v, %v, "%v") -> "%s"`, args...) + t.Logf("%s", doc) + dc, err := testdataclient.NewDoc(doc) + if err != nil { + t.Fatalf("Failed to create testdataclient: %v", err) } - if tt.wantParseErr { - return + defer dc.Close() + ro := routing.Options{ + SignalFirstLoad: true, + FilterRegistry: fr, + DataClients: []routing.DataClient{dc}, + PostProcessors: []routing.PostProcessor{reg}, } + rt := routing.New(ro) + defer rt.Close() + <-rt.FirstLoad() + tracer := &testTracer{MockTracer: mocktracer.New()} + pr := proxy.WithParams(proxy.Params{ + Routing: rt, + OpenTracing: &proxy.OpenTracingParams{Tracer: tracer}, + }) + defer pr.Close() + ts := stdlibhttptest.NewServer(pr) + defer ts.Close() - f, ok := ff.(*fifoFilter) - if !ok { - t.Fatal("Failed to convert filter to *fifoFilter") + // simple test + rsp, err := ts.Client().Get(ts.URL + "/test") + if err != nil { + t.Fatalf("Failed to get response from %s: %v", ts.URL, err) + } + defer rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + t.Fatalf("Failed to get valid response from endpoint: %d", rsp.StatusCode) + } + b, err := io.ReadAll(rsp.Body) + if err != nil { + t.Fatalf("Failed to read response body from: %v", err) + } + if len(b) != tt.responseSize { + t.Fatalf("Failed to read the size, got: %v, want: %v", len(b), tt.responseSize) } - // validate config - config := f.Config() - if config != tt.wantConfig { - t.Fatalf("Failed to get Config, got: %v, want: %v", config, tt.wantConfig) + t.Log("the streaming test") + // the streaming test + rspCH := make(chan *http.Response) + errCH := make(chan error) + defer func() { + close(rspCH) + close(errCH) + }() + waithCH := make(chan struct{}) + go func() { + rsp, err := ts.Client().Get(ts.URL + "/1") + t.Logf("rsp1: %s", rsp.Status) + close(waithCH) + if err != nil { + errCH <- err + } else { + rspCH <- rsp + } + }() + + <-waithCH + rsp2, err2 := ts.Client().Get(ts.URL + "/2") + t.Logf("rsp2: %s", rsp.Status) + if tt.wantErr { + n, err := io.Copy(io.Discard, rsp2.Body) + if n != 0 { + t.Fatalf("Failed to get error copied %d bytes, err: %v", n, err) + } + rsp2.Body.Close() + } else { + if err2 != nil { + t.Errorf("Failed to do 2nd request: %v", err2) + } else { + b, err2 := io.ReadAll(rsp2.Body) + if err2 != nil { + t.Errorf("Failed 2nd request to read body: %v", err2) + } + if len(b) != tt.responseSize { + t.Errorf("Failed 2nd request to get response size: %d, want: %d", len(b), tt.responseSize) + } + } } - if f.queue != f.GetQueue() { - t.Fatal("Failed to get expected queue") + + // read body from first request + select { + case err := <-errCH: + t.Fatalf("Failed to do request: %v", err) + case rsp := <-rspCH: + t.Logf("client1 got %s", rsp.Status) + b, err := io.ReadAll(rsp.Body) + if err != nil { + t.Fatalf("Failed to read body: %v", err) + } + if len(b) != tt.responseSize { + t.Fatalf("Failed to get response size: %d, want: %d", len(b), tt.responseSize) + } } + }) } } func TestFifo(t *testing.T) { for _, tt := range []struct { - name string - filter string - freq int - per time.Duration - backendTime time.Duration - wantOkRate float64 + name string + filter string + freq int + per time.Duration + backendTime time.Duration + clientTimeout time.Duration + wantOkRate float64 }{ { name: "fifo simple ok", @@ -153,28 +403,70 @@ func TestFifo(t *testing.T) { wantOkRate: 1.0, }, { - name: "fifo with reaching max concurrency and queue timeouts", - filter: `fifo(3, 5, "10ms")`, - freq: 200, - per: 100 * time.Millisecond, - backendTime: 10 * time.Millisecond, - wantOkRate: 0.1, + name: "fifoWithBody simple ok", + filter: `fifoWithBody(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 1.0, }, { - name: "fifo with reaching max concurrency and queue full", - filter: `fifo(3, 5, "250ms")`, - freq: 200, - per: 100 * time.Millisecond, - backendTime: 100 * time.Millisecond, - wantOkRate: 0.0008, + name: "fifo simple client canceled", + filter: `fifo(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Nanosecond, + wantOkRate: 0, + }, + { + name: "fifoWithBody simple client canceled", + filter: `fifoWithBody(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Nanosecond, + wantOkRate: 0, + }, + { + name: "fifo with reaching max concurrency and queue timeouts", + filter: `fifo(3, 5, "10ms")`, + freq: 20, + per: 10 * time.Millisecond, + backendTime: 11 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.005, + }, + { + name: "fifoWithBody with reaching max concurrency and queue timeouts", + filter: `fifoWithBody(3, 5, "10ms")`, + freq: 20, + per: 10 * time.Millisecond, + backendTime: 11 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.005, + }, + { + name: "fifo with reaching max concurrency and queue full", + filter: `fifo(1, 1, "250ms")`, + freq: 200, + per: 100 * time.Millisecond, + backendTime: 100 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.0008, + }, + { + name: "fifoWithBody with reaching max concurrency and queue full", + filter: `fifoWithBody(1, 1, "250ms")`, + freq: 200, + per: 100 * time.Millisecond, + backendTime: 100 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.0008, }, } { t.Run(tt.name, func(t *testing.T) { - fs := NewFifo() - if fs.Name() != filters.FifoName { - t.Fatalf("Failed to get name got %s want %s", fs.Name(), filters.FifoName) - } - metrics := &metricstest.MockMetrics{} reg := scheduler.RegistryWith(scheduler.Options{ Metrics: metrics, @@ -183,7 +475,8 @@ func TestFifo(t *testing.T) { defer reg.Close() fr := make(filters.Registry) - fr.Register(fs) + fr.Register(NewFifo()) + fr.Register(NewFifoWithBody()) backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(tt.backendTime) @@ -235,18 +528,26 @@ func TestFifo(t *testing.T) { t.Fatalf("Failed to get valid response from endpoint: %d", rsp.StatusCode) } - const clientTimeout = 1 * time.Second - - va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, clientTimeout) + va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, tt.clientTimeout) va.Attack(io.Discard, 1*time.Second, tt.name) t.Logf("Success [0..1]: %0.2f", va.Success()) t.Logf("requests: %d", va.TotalRequests()) + count200, _ := va.CountStatus(200) + count499, _ := va.CountStatus(0) + count502, _ := va.CountStatus(502) + count503, _ := va.CountStatus(503) + t.Logf("status 200: %d", count200) + t.Logf("status 499: %d", count499) + t.Logf("status 502: %d", count502) + t.Logf("status 503: %d", count503) + got := va.TotalSuccess() want := tt.wantOkRate * float64(va.TotalRequests()) if got < want { t.Fatalf("OK rate too low got 0 { t.Fatal("no OK") @@ -266,30 +567,36 @@ func TestFifo(t *testing.T) { func TestFifoConstantRouteUpdates(t *testing.T) { for _, tt := range []struct { - name string - filter string - freq int - per time.Duration - updateRate time.Duration - backendTime time.Duration - wantOkRate float64 + name string + filter string + freq int + per time.Duration + updateRate time.Duration + backendTime time.Duration + clientTimeout time.Duration + wantOkRate float64 }{ { - name: "fifo simple ok", - filter: `fifo(3, 5, "1s")`, - freq: 20, - per: 100 * time.Millisecond, - updateRate: 25 * time.Millisecond, - backendTime: 1 * time.Millisecond, - wantOkRate: 1.0, + name: "fifo simple ok", + filter: `fifo(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + updateRate: 25 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 1.0, + }, { + name: "fifoWithBody simple ok", + filter: `fifoWithBody(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + updateRate: 25 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 1.0, }, } { t.Run(tt.name, func(t *testing.T) { - fs := NewFifo() - if fs.Name() != filters.FifoName { - t.Fatalf("Failed to get name got %s want %s", fs.Name(), filters.FifoName) - } - metrics := &metricstest.MockMetrics{} reg := scheduler.RegistryWith(scheduler.Options{ Metrics: metrics, @@ -298,7 +605,8 @@ func TestFifoConstantRouteUpdates(t *testing.T) { defer reg.Close() fr := make(filters.Registry) - fr.Register(fs) + fr.Register(NewFifo()) + fr.Register(NewFifoWithBody()) backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(tt.backendTime) @@ -312,7 +620,6 @@ func TestFifoConstantRouteUpdates(t *testing.T) { } doc := fmt.Sprintf(`aroute: * -> %s -> "%s"`, tt.filter, backend.URL) - dc, err := testdataclient.NewDoc(doc) if err != nil { t.Fatalf("Failed to create testdataclient: %v", err) @@ -351,7 +658,7 @@ func TestFifoConstantRouteUpdates(t *testing.T) { // run dataclient updates quit := make(chan struct{}) - newDoc := fmt.Sprintf(`aroute: * -> fifo(100, 200, "250ms") -> "%s"`, backend.URL) + newDoc := fmt.Sprintf(`aroute: * -> %s -> "%s"`, tt.filter, backend.URL) go func(q chan<- struct{}, updateRate time.Duration, doc1, doc2 string) { i := 0 for { @@ -371,9 +678,7 @@ func TestFifoConstantRouteUpdates(t *testing.T) { }(quit, tt.updateRate, doc, newDoc) - const clientTimeout = 1 * time.Second - - va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, clientTimeout) + va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, tt.clientTimeout) va.Attack(io.Discard, 1*time.Second, tt.name) quit <- struct{}{} diff --git a/proxy/proxy.go b/proxy/proxy.go index ba119efb3f..3498185683 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -1454,6 +1454,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { err := p.do(ctx) + // writeTimeout() filter if d, ok := ctx.StateBag()[filters.WriteTimeout].(time.Duration); ok { e := ctx.ResponseController().SetWriteDeadline(time.Now().Add(d)) if e != nil { @@ -1461,12 +1462,22 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } + // stream response body to client if err != nil { p.errorResponse(ctx, err) } else { p.serveResponse(ctx) } + // fifoWtihBody() filter + if sbf, ok := ctx.StateBag()[filters.FifoWithBodyName]; ok { + if fs, ok := sbf.([]func()); ok { + for i := len(fs) - 1; i >= 0; i-- { + fs[i]() + } + } + } + if ctx.cancelBackendContext != nil { ctx.cancelBackendContext() } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 5da0dd803e..e2e7de35e4 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -155,6 +155,20 @@ func (fq *fifoQueue) wait(ctx context.Context) (func(), error) { cnt := fq.counter fq.mu.RUnlock() + // check request context expired + // https://github.com/golang/go/issues/63615 + if err := ctx.Err(); err != nil { + switch err { + case context.DeadlineExceeded: + return nil, ErrQueueTimeout + case context.Canceled: + return nil, ErrClientCanceled + default: + // does not exist yet in Go stdlib as of Go1.18.4 + return nil, err + } + } + // handle queue all := cnt.Add(1) // queue full?