From fc54e44385ef2b87c9bc52cdc86a247a58ad0088 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Fri, 13 Oct 2023 23:29:28 +0200 Subject: [PATCH 1/5] feature: filter fifoWithBody that works similar to fifo(), but release defered until body streaming to client was finished MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- docs/reference/filters.md | 25 +++ filters/builtin/builtin.go | 1 + filters/filters.go | 4 + filters/scheduler/fifo.go | 61 ++++-- filters/scheduler/fifo_test.go | 334 ++++++++++++++++++++++++++------- proxy/proxy.go | 5 + 6 files changed, 348 insertions(+), 82 deletions(-) diff --git a/docs/reference/filters.md b/docs/reference/filters.md index 98fea702f5..fa2da89075 100644 --- a/docs/reference/filters.md +++ b/docs/reference/filters.md @@ -2819,6 +2819,31 @@ Example: fifo(100, 150, "10s") ``` +### fifoWithBody + +This Filter is similar to the [lifo](#lifo) filter in regards to +parameters and status codes. +Performance considerations are similar to [fifo](#fifo). + +The difference between fifo and fifoWithBody is that fifo will decrement +the concurrency as soon as the backend sent response headers and +fifoWithBody will decrement the concurrency if the response body was +served. Normally both are very similar, but if you have a fully async +component that serves multiple website fragments, this would decrement +concurrency too early. + +Parameters: + +* MaxConcurrency specifies how many goroutines are allowed to work on this queue (int) +* MaxQueueSize sets the queue size (int) +* Timeout sets the timeout to get request scheduled (time) + +Example: + +``` +fifoWithBody(100, 150, "10s") +``` + ### lifo This Filter changes skipper to handle the route with a bounded last in diff --git a/filters/builtin/builtin.go b/filters/builtin/builtin.go index 0609d87412..d4e1895bf4 100644 --- a/filters/builtin/builtin.go +++ b/filters/builtin/builtin.go @@ -216,6 +216,7 @@ func Filters() []filters.Spec { auth.NewForwardToken(), auth.NewForwardTokenField(), scheduler.NewFifo(), + scheduler.NewFifoWithBody(), scheduler.NewLIFO(), scheduler.NewLIFOGroup(), rfc.NewPath(), diff --git a/filters/filters.go b/filters/filters.go index 6caaaa7a41..c68cb93405 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -35,6 +35,9 @@ const ( // BackendRatelimit is the key used in the state bag to configure backend ratelimit in proxy BackendRatelimit = "backend:ratelimit" + + // FifoWithBody + FifoWithBody = "fifo:body:func" ) // FilterContext object providing state and information that is unique to a request. @@ -327,6 +330,7 @@ const ( SetDynamicBackendUrl = "setDynamicBackendUrl" ApiUsageMonitoringName = "apiUsageMonitoring" FifoName = "fifo" + FifoWithBodyName = "fifoWithBody" LifoName = "lifo" LifoGroupName = "lifoGroup" RfcPathName = "rfcPath" diff --git a/filters/scheduler/fifo.go b/filters/scheduler/fifo.go index eca58e59fe..a92619d3c0 100644 --- a/filters/scheduler/fifo.go +++ b/filters/scheduler/fifo.go @@ -15,20 +15,44 @@ const ( fifoKey string = "fifo" ) +type fifoType int + +const ( + fifo fifoType = iota + 1 + fifoWithBody +) + type ( - fifoSpec struct{} + fifoSpec struct { + typ fifoType + } fifoFilter struct { config scheduler.Config queue *scheduler.FifoQueue + typ fifoType } ) func NewFifo() filters.Spec { - return &fifoSpec{} + return &fifoSpec{ + typ: fifo, + } +} + +func NewFifoWithBody() filters.Spec { + return &fifoSpec{ + typ: fifoWithBody, + } } -func (*fifoSpec) Name() string { - return filters.FifoName +func (f *fifoSpec) Name() string { + switch f.typ { + case fifo: + return filters.FifoName + case fifoWithBody: + return filters.FifoWithBodyName + } + return "unknown" } // CreateFilter creates a fifoFilter, that will use a semaphore based @@ -65,6 +89,7 @@ func (s *fifoSpec) CreateFilter(args []interface{}) (filters.Filter, error) { } return &fifoFilter{ + typ: s.typ, config: scheduler.Config{ MaxConcurrency: cc, MaxQueueSize: qs, @@ -139,16 +164,28 @@ func (f *fifoFilter) Request(ctx filters.FilterContext) { // Response will decrease the number of inflight requests to release // the concurrency reservation for the request. func (f *fifoFilter) Response(ctx filters.FilterContext) { - pending, ok := ctx.StateBag()[fifoKey].([]func()) - if !ok { - return + g := f.createResponse(ctx) + switch f.typ { + case fifo: + g() + case fifoWithBody: + ctx.StateBag()[filters.FifoWithBody] = g } - last := len(pending) - 1 - if last < 0 { - return +} + +func (f *fifoFilter) createResponse(ctx filters.FilterContext) func() { + return func() { + pending, ok := ctx.StateBag()[fifoKey].([]func()) + if !ok { + return + } + last := len(pending) - 1 + if last < 0 { + return + } + pending[last]() + ctx.StateBag()[fifoKey] = pending[:last] } - pending[last]() - ctx.StateBag()[fifoKey] = pending[:last] } // HandleErrorResponse is to opt-in for filters to get called diff --git a/filters/scheduler/fifo_test.go b/filters/scheduler/fifo_test.go index e23a410e66..c76aa2b5f9 100644 --- a/filters/scheduler/fifo_test.go +++ b/filters/scheduler/fifo_test.go @@ -5,6 +5,7 @@ import ( "io" "net/http" stdlibhttptest "net/http/httptest" + "strings" "testing" "time" @@ -19,7 +20,29 @@ import ( "github.com/zalando/skipper/scheduler" ) -func TestFifoCreateFilter(t *testing.T) { +func TestCreateFifoName(t *testing.T) { + for _, tt := range []struct { + name string + filterFunc func() filters.Spec + }{ + { + name: filters.FifoName, + filterFunc: NewFifo, + }, + { + name: filters.FifoWithBodyName, + filterFunc: NewFifoWithBody, + }, + } { + t.Run(tt.name, func(t *testing.T) { + if tt.filterFunc().Name() != tt.name { + t.Fatalf("got %q, want %q", tt.filterFunc().Name(), tt.name) + } + }) + } +} + +func TestCreateFifoFilter(t *testing.T) { for _, tt := range []struct { name string args []interface{} @@ -58,6 +81,33 @@ func TestFifoCreateFilter(t *testing.T) { Timeout: 1 * time.Second, }, }, + { + name: "fifo negative value arg1", + args: []interface{}{ + -3, + 5, + "1s", + }, + wantParseErr: true, + }, + { + name: "fifo negative value arg2", + args: []interface{}{ + 3, + -5, + "1s", + }, + wantParseErr: true, + }, + { + name: "fifo too small value arg3", + args: []interface{}{ + 3, + 5, + "1ns", + }, + wantParseErr: true, + }, { name: "fifo wrong type arg1", args: []interface{}{ @@ -106,43 +156,160 @@ func TestFifoCreateFilter(t *testing.T) { }, } { t.Run(tt.name, func(t *testing.T) { - spec := &fifoSpec{} - ff, err := spec.CreateFilter(tt.args) - if err != nil && !tt.wantParseErr { - t.Fatalf("Failed to parse filter: %v", err) + for _, f := range []func() filters.Spec{NewFifo, NewFifoWithBody} { + spec := f() + ff, err := spec.CreateFilter(tt.args) + if err != nil && !tt.wantParseErr { + t.Fatalf("Failed to parse filter: %v", err) + } + if err == nil && tt.wantParseErr { + t.Fatal("Failed to get wanted error on create filter") + } + + if _, ok := ff.(*fifoFilter); !ok && err == nil { + t.Fatal("Failed to convert filter to *fifoFilter") + } } - if err == nil && tt.wantParseErr { - t.Fatal("Failed to get wanted error on create filter") + }) + } +} + +func TestFifoWithBody(t *testing.T) { + for _, tt := range []struct { + name string + args []interface{} + backendTime time.Duration + responseSize int + }{ + { + name: "fifoWithBody 1024", + args: []interface{}{1, 0, "1s"}, + backendTime: 10 * time.Millisecond, + responseSize: 1024, + }, + { + name: "fifoWithBody 100MB", + args: []interface{}{1, 0, "20ms"}, + backendTime: 10 * time.Millisecond, + responseSize: 100 * 1000 * 1024, + }, + } { + t.Run(tt.name, func(t *testing.T) { + + backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + // sleep here to test the difference between streaming response and not + time.Sleep(tt.backendTime) + // TODO: maybe better to do slow body streaming? + w.Write([]byte(strings.Repeat("A", tt.responseSize))) + })) + defer backend.Close() + + // proxy + metrics := &metricstest.MockMetrics{} + reg := scheduler.RegistryWith(scheduler.Options{ + Metrics: metrics, + EnableRouteFIFOMetrics: true, + }) + defer reg.Close() + fr := make(filters.Registry) + fr.Register(NewFifoWithBody()) + args := append(tt.args, backend.URL) + doc := fmt.Sprintf(`r: * -> fifoWithBody(%v, %v, "%v") -> "%s"`, args...) + t.Logf("%s", doc) + dc, err := testdataclient.NewDoc(doc) + if err != nil { + t.Fatalf("Failed to create testdataclient: %v", err) } - if tt.wantParseErr { - return + defer dc.Close() + ro := routing.Options{ + SignalFirstLoad: true, + FilterRegistry: fr, + DataClients: []routing.DataClient{dc}, + PostProcessors: []routing.PostProcessor{reg}, } + rt := routing.New(ro) + defer rt.Close() + <-rt.FirstLoad() + tracer := &testTracer{MockTracer: mocktracer.New()} + pr := proxy.WithParams(proxy.Params{ + Routing: rt, + OpenTracing: &proxy.OpenTracingParams{Tracer: tracer}, + }) + defer pr.Close() + ts := stdlibhttptest.NewServer(pr) + defer ts.Close() - f, ok := ff.(*fifoFilter) - if !ok { - t.Fatal("Failed to convert filter to *fifoFilter") + // simple test + rsp, err := ts.Client().Get(ts.URL) + if err != nil { + t.Fatalf("Failed to get response from %s: %v", ts.URL, err) + } + defer rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + t.Fatalf("Failed to get valid response from endpoint: %d", rsp.StatusCode) } + b, err := io.ReadAll(rsp.Body) + if err != nil { + t.Fatalf("Failed to read response body from: %v", err) + } + if len(b) != tt.responseSize { + t.Fatalf("Failed to read the size, got: %v, want: %v", len(b), tt.responseSize) + } + + // the streaming test + rspCH := make(chan *http.Response) + errCH := make(chan error) + waithCH := make(chan struct{}) + go func() { + rsp, err := ts.Client().Get(ts.URL) + waithCH <- struct{}{} + if err != nil { + errCH <- err + } else { + rspCH <- rsp + } + }() - // validate config - config := f.Config() - if config != tt.wantConfig { - t.Fatalf("Failed to get Config, got: %v, want: %v", config, tt.wantConfig) + <-waithCH + rsp, err = ts.Client().Get(ts.URL) + if err != nil { + t.Fatalf("Failed to do 2nd request: %v", err) + } else { + b, err := io.ReadAll(rsp.Body) + if err != nil { + t.Fatalf("Failed 2nd request to read body: %v", err) + } + if len(b) != tt.responseSize { + t.Fatalf("Failed 2nd request to get response size: %d, want: %d", len(b), tt.responseSize) + } } - if f.queue != f.GetQueue() { - t.Fatal("Failed to get expected queue") + select { + case err := <-errCH: + t.Fatalf("Failed to do request: %v", err) + case rsp := <-rspCH: + b, err := io.ReadAll(rsp.Body) + if err != nil { + t.Fatalf("Failed to read body: %v", err) + } + if len(b) != tt.responseSize { + t.Fatalf("Failed to get response size: %d, want: %d", len(b), tt.responseSize) + } } + }) } } func TestFifo(t *testing.T) { for _, tt := range []struct { - name string - filter string - freq int - per time.Duration - backendTime time.Duration - wantOkRate float64 + name string + filter string + freq int + per time.Duration + backendTime time.Duration + clientTimeout time.Duration + wantOkRate float64 }{ { name: "fifo simple ok", @@ -153,28 +320,52 @@ func TestFifo(t *testing.T) { wantOkRate: 1.0, }, { - name: "fifo with reaching max concurrency and queue timeouts", - filter: `fifo(3, 5, "10ms")`, - freq: 200, - per: 100 * time.Millisecond, - backendTime: 10 * time.Millisecond, - wantOkRate: 0.1, + name: "fifoWithbody simple ok", + filter: `fifoWithbody(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 1.0, }, { - name: "fifo with reaching max concurrency and queue full", - filter: `fifo(3, 5, "250ms")`, - freq: 200, - per: 100 * time.Millisecond, - backendTime: 100 * time.Millisecond, - wantOkRate: 0.0008, + name: "fifo with reaching max concurrency and queue timeouts", + filter: `fifo(3, 5, "10ms")`, + freq: 200, + per: 100 * time.Millisecond, + backendTime: 10 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.1, + }, + { + name: "fifoWithbody with reaching max concurrency and queue timeouts", + filter: `fifoWithbody(3, 5, "10ms")`, + freq: 200, + per: 100 * time.Millisecond, + backendTime: 10 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.1, + }, + { + name: "fifo with reaching max concurrency and queue full", + filter: `fifo(1, 1, "250ms")`, + freq: 200, + per: 100 * time.Millisecond, + backendTime: 100 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.0008, + }, + { + name: "fifoWithBody with reaching max concurrency and queue full", + filter: `fifoWithBody(1, 1, "250ms")`, + freq: 200, + per: 100 * time.Millisecond, + backendTime: 100 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.0008, }, } { t.Run(tt.name, func(t *testing.T) { - fs := NewFifo() - if fs.Name() != filters.FifoName { - t.Fatalf("Failed to get name got %s want %s", fs.Name(), filters.FifoName) - } - metrics := &metricstest.MockMetrics{} reg := scheduler.RegistryWith(scheduler.Options{ Metrics: metrics, @@ -183,7 +374,8 @@ func TestFifo(t *testing.T) { defer reg.Close() fr := make(filters.Registry) - fr.Register(fs) + fr.Register(NewFifo()) + fr.Register(NewFifoWithBody()) backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(tt.backendTime) @@ -235,9 +427,7 @@ func TestFifo(t *testing.T) { t.Fatalf("Failed to get valid response from endpoint: %d", rsp.StatusCode) } - const clientTimeout = 1 * time.Second - - va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, clientTimeout) + va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, tt.clientTimeout) va.Attack(io.Discard, 1*time.Second, tt.name) t.Logf("Success [0..1]: %0.2f", va.Success()) @@ -266,30 +456,36 @@ func TestFifo(t *testing.T) { func TestFifoConstantRouteUpdates(t *testing.T) { for _, tt := range []struct { - name string - filter string - freq int - per time.Duration - updateRate time.Duration - backendTime time.Duration - wantOkRate float64 + name string + filter string + freq int + per time.Duration + updateRate time.Duration + backendTime time.Duration + clientTimeout time.Duration + wantOkRate float64 }{ { - name: "fifo simple ok", - filter: `fifo(3, 5, "1s")`, - freq: 20, - per: 100 * time.Millisecond, - updateRate: 25 * time.Millisecond, - backendTime: 1 * time.Millisecond, - wantOkRate: 1.0, + name: "fifo simple ok", + filter: `fifo(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + updateRate: 25 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 1.0, + }, { + name: "fifoWithBody simple ok", + filter: `fifoWithBody(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + updateRate: 25 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 1.0, }, } { t.Run(tt.name, func(t *testing.T) { - fs := NewFifo() - if fs.Name() != filters.FifoName { - t.Fatalf("Failed to get name got %s want %s", fs.Name(), filters.FifoName) - } - metrics := &metricstest.MockMetrics{} reg := scheduler.RegistryWith(scheduler.Options{ Metrics: metrics, @@ -298,7 +494,8 @@ func TestFifoConstantRouteUpdates(t *testing.T) { defer reg.Close() fr := make(filters.Registry) - fr.Register(fs) + fr.Register(NewFifo()) + fr.Register(NewFifoWithBody()) backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(tt.backendTime) @@ -312,7 +509,6 @@ func TestFifoConstantRouteUpdates(t *testing.T) { } doc := fmt.Sprintf(`aroute: * -> %s -> "%s"`, tt.filter, backend.URL) - dc, err := testdataclient.NewDoc(doc) if err != nil { t.Fatalf("Failed to create testdataclient: %v", err) @@ -351,7 +547,7 @@ func TestFifoConstantRouteUpdates(t *testing.T) { // run dataclient updates quit := make(chan struct{}) - newDoc := fmt.Sprintf(`aroute: * -> fifo(100, 200, "250ms") -> "%s"`, backend.URL) + newDoc := fmt.Sprintf(`aroute: * -> %s -> "%s"`, tt.filter, backend.URL) go func(q chan<- struct{}, updateRate time.Duration, doc1, doc2 string) { i := 0 for { @@ -371,9 +567,7 @@ func TestFifoConstantRouteUpdates(t *testing.T) { }(quit, tt.updateRate, doc, newDoc) - const clientTimeout = 1 * time.Second - - va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, clientTimeout) + va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, tt.clientTimeout) va.Attack(io.Discard, 1*time.Second, tt.name) quit <- struct{}{} diff --git a/proxy/proxy.go b/proxy/proxy.go index ba119efb3f..69df8d5a13 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -1460,6 +1460,11 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx.Logger().Errorf("Failed to set write deadline: %v", e) } } + if sbf, ok := ctx.StateBag()[filters.FifoWithBody]; ok { + if f, ok := sbf.(func()); ok { + defer f() + } + } if err != nil { p.errorResponse(ctx, err) From 40b96b55e8308a81063762e55aee681049f19d60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Tue, 17 Oct 2023 19:13:29 +0200 Subject: [PATCH 2/5] fix staticcheck MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- filters/scheduler/fifo.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/filters/scheduler/fifo.go b/filters/scheduler/fifo.go index a92619d3c0..0abf801297 100644 --- a/filters/scheduler/fifo.go +++ b/filters/scheduler/fifo.go @@ -11,10 +11,6 @@ import ( "github.com/zalando/skipper/scheduler" ) -const ( - fifoKey string = "fifo" -) - type fifoType int const ( @@ -22,6 +18,16 @@ const ( fifoWithBody ) +func (t fifoType) String() string { + switch t { + case fifo: + return filters.FifoName + case fifoWithBody: + return filters.FifoWithBodyName + } + return "unknown" +} + type ( fifoSpec struct { typ fifoType @@ -45,14 +51,8 @@ func NewFifoWithBody() filters.Spec { } } -func (f *fifoSpec) Name() string { - switch f.typ { - case fifo: - return filters.FifoName - case fifoWithBody: - return filters.FifoWithBodyName - } - return "unknown" +func (s *fifoSpec) Name() string { + return s.typ.String() } // CreateFilter creates a fifoFilter, that will use a semaphore based @@ -157,8 +157,8 @@ func (f *fifoFilter) Request(ctx filters.FilterContext) { } // ok - pending, _ := ctx.StateBag()[fifoKey].([]func()) - ctx.StateBag()[fifoKey] = append(pending, done) + pending, _ := ctx.StateBag()[f.typ.String()].([]func()) + ctx.StateBag()[f.typ.String()] = append(pending, done) } // Response will decrease the number of inflight requests to release @@ -175,7 +175,7 @@ func (f *fifoFilter) Response(ctx filters.FilterContext) { func (f *fifoFilter) createResponse(ctx filters.FilterContext) func() { return func() { - pending, ok := ctx.StateBag()[fifoKey].([]func()) + pending, ok := ctx.StateBag()[f.typ.String()].([]func()) if !ok { return } @@ -184,7 +184,7 @@ func (f *fifoFilter) createResponse(ctx filters.FilterContext) func() { return } pending[last]() - ctx.StateBag()[fifoKey] = pending[:last] + ctx.StateBag()[f.typ.String()] = pending[:last] } } From 4b1b793f43427d55c503616135a14e6c02832afe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Thu, 19 Oct 2023 12:27:54 +0200 Subject: [PATCH 3/5] fix: fifo() and fifoWithBody() with canceled requests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both filters did not check for canceled context from request before semaphore.Acquire, see https://github.com/golang/go/issues/63615 Signed-off-by: Sandor Szücs --- filters/filters.go | 3 - filters/scheduler/fifo.go | 49 +++------- filters/scheduler/fifo_test.go | 163 +++++++++++++++++++++++++++------ proxy/proxy.go | 16 +++- scheduler/scheduler.go | 26 ++++++ 5 files changed, 187 insertions(+), 70 deletions(-) diff --git a/filters/filters.go b/filters/filters.go index c68cb93405..9f9a1f333a 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -35,9 +35,6 @@ const ( // BackendRatelimit is the key used in the state bag to configure backend ratelimit in proxy BackendRatelimit = "backend:ratelimit" - - // FifoWithBody - FifoWithBody = "fifo:body:func" ) // FilterContext object providing state and information that is unique to a request. diff --git a/filters/scheduler/fifo.go b/filters/scheduler/fifo.go index 0abf801297..50ceb95b65 100644 --- a/filters/scheduler/fifo.go +++ b/filters/scheduler/fifo.go @@ -11,48 +11,31 @@ import ( "github.com/zalando/skipper/scheduler" ) -type fifoType int - -const ( - fifo fifoType = iota + 1 - fifoWithBody -) - -func (t fifoType) String() string { - switch t { - case fifo: - return filters.FifoName - case fifoWithBody: - return filters.FifoWithBodyName - } - return "unknown" -} - type ( fifoSpec struct { - typ fifoType + typ string } fifoFilter struct { config scheduler.Config queue *scheduler.FifoQueue - typ fifoType + typ string } ) func NewFifo() filters.Spec { return &fifoSpec{ - typ: fifo, + typ: filters.FifoName, } } func NewFifoWithBody() filters.Spec { return &fifoSpec{ - typ: fifoWithBody, + typ: filters.FifoWithBodyName, } } func (s *fifoSpec) Name() string { - return s.typ.String() + return s.typ } // CreateFilter creates a fifoFilter, that will use a semaphore based @@ -157,25 +140,16 @@ func (f *fifoFilter) Request(ctx filters.FilterContext) { } // ok - pending, _ := ctx.StateBag()[f.typ.String()].([]func()) - ctx.StateBag()[f.typ.String()] = append(pending, done) + pending, _ := ctx.StateBag()[f.typ].([]func()) + ctx.StateBag()[f.typ] = append(pending, done) } // Response will decrease the number of inflight requests to release // the concurrency reservation for the request. func (f *fifoFilter) Response(ctx filters.FilterContext) { - g := f.createResponse(ctx) switch f.typ { - case fifo: - g() - case fifoWithBody: - ctx.StateBag()[filters.FifoWithBody] = g - } -} - -func (f *fifoFilter) createResponse(ctx filters.FilterContext) func() { - return func() { - pending, ok := ctx.StateBag()[f.typ.String()].([]func()) + case filters.FifoName: + pending, ok := ctx.StateBag()[f.typ].([]func()) if !ok { return } @@ -184,7 +158,10 @@ func (f *fifoFilter) createResponse(ctx filters.FilterContext) func() { return } pending[last]() - ctx.StateBag()[f.typ.String()] = pending[:last] + ctx.StateBag()[f.typ] = pending[:last] + + case filters.FifoWithBodyName: + // nothing to do here, handled in the proxy after copyStream() } } diff --git a/filters/scheduler/fifo_test.go b/filters/scheduler/fifo_test.go index c76aa2b5f9..cd27b72f13 100644 --- a/filters/scheduler/fifo_test.go +++ b/filters/scheduler/fifo_test.go @@ -1,15 +1,19 @@ package scheduler import ( + "bytes" "fmt" "io" "net/http" stdlibhttptest "net/http/httptest" "strings" "testing" + "testing/iotest" "time" "github.com/opentracing/opentracing-go/mocktracer" + "github.com/sirupsen/logrus" + "github.com/zalando/skipper/eskip" "github.com/zalando/skipper/filters" "github.com/zalando/skipper/metrics/metricstest" @@ -174,39 +178,100 @@ func TestCreateFifoFilter(t *testing.T) { } } +type flusher struct { + w http.ResponseWriter +} + +func (f *flusher) Flush() { + f.w.(http.Flusher).Flush() +} + +func (f *flusher) Unwrap() http.ResponseWriter { + return f.w +} + +func (f *flusher) Write(p []byte) (n int, err error) { + n, err = f.w.Write(p) + if err == nil { + f.Flush() + } + return +} + +type slowReader struct { + r io.Reader + d time.Duration +} + +func (sr *slowReader) Read(p []byte) (int, error) { + logrus.Infof("slowReader: %d", len(p)) + if len(p) == 0 { + return 0, nil + } + time.Sleep(sr.d) + n, err := sr.r.Read(p) + logrus.Infof("slowReader: %d %v", n, err) + return n, err +} + func TestFifoWithBody(t *testing.T) { for _, tt := range []struct { name string args []interface{} backendTime time.Duration responseSize int + wantErr bool }{ { - name: "fifoWithBody 1024", - args: []interface{}{1, 0, "1s"}, + name: "fifoWithBody 1024B with 1 queue should be ok", + args: []interface{}{1, 1, "1s"}, backendTime: 10 * time.Millisecond, responseSize: 1024, }, { - name: "fifoWithBody 100MB", - args: []interface{}{1, 0, "20ms"}, + name: "fifoWithBody 1024B with 0 queue should fail", + args: []interface{}{1, 0, "10ms"}, + backendTime: 50 * time.Millisecond, + responseSize: 1024, + wantErr: true, + }, + { + name: "fifoWithBody 2x 1024B with 1 queue should be ok", + args: []interface{}{1, 1, "1s"}, + backendTime: 10 * time.Millisecond, + responseSize: 2 * 1024, + }, + { + name: "fifoWithBody 2x 1024B with 0 queue should fail", + args: []interface{}{1, 0, "15ms"}, backendTime: 10 * time.Millisecond, - responseSize: 100 * 1000 * 1024, + responseSize: 2 * 1024, + wantErr: true, }, } { t.Run(tt.name, func(t *testing.T) { backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Logf("backend path: %s", r.URL.Path) + buf := bytes.NewBufferString(strings.Repeat("A", tt.responseSize)) + halfReader := iotest.HalfReader(buf) + sr := &slowReader{ + d: 100 * time.Millisecond, + r: halfReader, + } + w.WriteHeader(http.StatusOK) // sleep here to test the difference between streaming response and not time.Sleep(tt.backendTime) // TODO: maybe better to do slow body streaming? - w.Write([]byte(strings.Repeat("A", tt.responseSize))) + b := make([]byte, 1024) + io.CopyBuffer(&flusher{w}, sr, b) })) defer backend.Close() // proxy metrics := &metricstest.MockMetrics{} + defer metrics.Close() reg := scheduler.RegistryWith(scheduler.Options{ Metrics: metrics, EnableRouteFIFOMetrics: true, @@ -241,7 +306,7 @@ func TestFifoWithBody(t *testing.T) { defer ts.Close() // simple test - rsp, err := ts.Client().Get(ts.URL) + rsp, err := ts.Client().Get(ts.URL + "/test") if err != nil { t.Fatalf("Failed to get response from %s: %v", ts.URL, err) } @@ -257,13 +322,19 @@ func TestFifoWithBody(t *testing.T) { t.Fatalf("Failed to read the size, got: %v, want: %v", len(b), tt.responseSize) } + t.Log("the streaming test") // the streaming test rspCH := make(chan *http.Response) errCH := make(chan error) + defer func() { + close(rspCH) + close(errCH) + }() waithCH := make(chan struct{}) go func() { - rsp, err := ts.Client().Get(ts.URL) - waithCH <- struct{}{} + rsp, err := ts.Client().Get(ts.URL + "/1") + t.Logf("rsp1: %s", rsp.Status) + close(waithCH) if err != nil { errCH <- err } else { @@ -272,22 +343,34 @@ func TestFifoWithBody(t *testing.T) { }() <-waithCH - rsp, err = ts.Client().Get(ts.URL) - if err != nil { - t.Fatalf("Failed to do 2nd request: %v", err) - } else { - b, err := io.ReadAll(rsp.Body) - if err != nil { - t.Fatalf("Failed 2nd request to read body: %v", err) + rsp2, err2 := ts.Client().Get(ts.URL + "/2") + t.Logf("rsp2: %s", rsp.Status) + if tt.wantErr { + n, err := io.Copy(io.Discard, rsp2.Body) + if n != 0 { + t.Fatalf("Failed to get error copied %d bytes, err: %v", n, err) } - if len(b) != tt.responseSize { - t.Fatalf("Failed 2nd request to get response size: %d, want: %d", len(b), tt.responseSize) + rsp2.Body.Close() + } else { + if err2 != nil { + t.Errorf("Failed to do 2nd request: %v", err2) + } else { + b, err2 := io.ReadAll(rsp2.Body) + if err2 != nil { + t.Errorf("Failed 2nd request to read body: %v", err2) + } + if len(b) != tt.responseSize { + t.Errorf("Failed 2nd request to get response size: %d, want: %d", len(b), tt.responseSize) + } } } + + // read body from first request select { case err := <-errCH: t.Fatalf("Failed to do request: %v", err) case rsp := <-rspCH: + t.Logf("client1 got %s", rsp.Status) b, err := io.ReadAll(rsp.Body) if err != nil { t.Fatalf("Failed to read body: %v", err) @@ -328,23 +411,41 @@ func TestFifo(t *testing.T) { clientTimeout: time.Second, wantOkRate: 1.0, }, + { + name: "fifo simple client canceled", + filter: `fifo(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Nanosecond, + wantOkRate: 0, + }, + { + name: "fifoWithbody simple client canceled", + filter: `fifoWithbody(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Nanosecond, + wantOkRate: 0, + }, { name: "fifo with reaching max concurrency and queue timeouts", filter: `fifo(3, 5, "10ms")`, - freq: 200, - per: 100 * time.Millisecond, - backendTime: 10 * time.Millisecond, + freq: 20, + per: 10 * time.Millisecond, + backendTime: 11 * time.Millisecond, clientTimeout: time.Second, - wantOkRate: 0.1, + wantOkRate: 0.005, }, { name: "fifoWithbody with reaching max concurrency and queue timeouts", filter: `fifoWithbody(3, 5, "10ms")`, - freq: 200, - per: 100 * time.Millisecond, - backendTime: 10 * time.Millisecond, + freq: 20, + per: 10 * time.Millisecond, + backendTime: 11 * time.Millisecond, clientTimeout: time.Second, - wantOkRate: 0.1, + wantOkRate: 0.005, }, { name: "fifo with reaching max concurrency and queue full", @@ -432,11 +533,21 @@ func TestFifo(t *testing.T) { t.Logf("Success [0..1]: %0.2f", va.Success()) t.Logf("requests: %d", va.TotalRequests()) + count200, _ := va.CountStatus(200) + count499, _ := va.CountStatus(0) + count502, _ := va.CountStatus(502) + count503, _ := va.CountStatus(503) + t.Logf("status 200: %d", count200) + t.Logf("status 499: %d", count499) + t.Logf("status 502: %d", count502) + t.Logf("status 503: %d", count503) + got := va.TotalSuccess() want := tt.wantOkRate * float64(va.TotalRequests()) if got < want { t.Fatalf("OK rate too low got 0 { t.Fatal("no OK") diff --git a/proxy/proxy.go b/proxy/proxy.go index 69df8d5a13..3498185683 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -1454,24 +1454,30 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { err := p.do(ctx) + // writeTimeout() filter if d, ok := ctx.StateBag()[filters.WriteTimeout].(time.Duration); ok { e := ctx.ResponseController().SetWriteDeadline(time.Now().Add(d)) if e != nil { ctx.Logger().Errorf("Failed to set write deadline: %v", e) } } - if sbf, ok := ctx.StateBag()[filters.FifoWithBody]; ok { - if f, ok := sbf.(func()); ok { - defer f() - } - } + // stream response body to client if err != nil { p.errorResponse(ctx, err) } else { p.serveResponse(ctx) } + // fifoWtihBody() filter + if sbf, ok := ctx.StateBag()[filters.FifoWithBodyName]; ok { + if fs, ok := sbf.([]func()); ok { + for i := len(fs) - 1; i >= 0; i-- { + fs[i]() + } + } + } + if ctx.cancelBackendContext != nil { ctx.cancelBackendContext() } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 5da0dd803e..14f9c68308 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -155,6 +155,20 @@ func (fq *fifoQueue) wait(ctx context.Context) (func(), error) { cnt := fq.counter fq.mu.RUnlock() + // check request context expired + // https://github.com/golang/go/issues/63615 + if err := ctx.Err(); err != nil { + switch err { + case context.DeadlineExceeded: + return nil, ErrQueueTimeout + case context.Canceled: + return nil, ErrClientCanceled + default: + // does not exist yet in Go stdlib as of Go1.18.4 + return nil, err + } + } + // handle queue all := cnt.Add(1) // queue full? @@ -179,6 +193,18 @@ func (fq *fifoQueue) wait(ctx context.Context) (func(), error) { // does not exist yet in Go stdlib as of Go1.18.4 return nil, err } + } else { + // semaphore will not fail on Acquire when context timed out, nor when it's canceled. + // see also: https://github.com/golang/go/issues/63615 + // The behavior can change as in the proposed change, so the code path on error would apply. + // If the proposed change was merge this code path below can likely be cleaned up. + select { + case <-c.Done(): + // We timed out in Acquire, so it never increased semaphore by one + cnt.Add(-1) + return nil, ErrQueueTimeout + default: + } } return func() { From f769033accf95901ffb5a550391ca0f74bf58581 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Thu, 19 Oct 2023 14:15:47 +0200 Subject: [PATCH 4/5] cleanup: as tested and discussed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- scheduler/scheduler.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 14f9c68308..e2e7de35e4 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -193,18 +193,6 @@ func (fq *fifoQueue) wait(ctx context.Context) (func(), error) { // does not exist yet in Go stdlib as of Go1.18.4 return nil, err } - } else { - // semaphore will not fail on Acquire when context timed out, nor when it's canceled. - // see also: https://github.com/golang/go/issues/63615 - // The behavior can change as in the proposed change, so the code path on error would apply. - // If the proposed change was merge this code path below can likely be cleaned up. - select { - case <-c.Done(): - // We timed out in Acquire, so it never increased semaphore by one - cnt.Add(-1) - return nil, ErrQueueTimeout - default: - } } return func() { From c7bedd102c5a8dc53cb09c423480c3040260527c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Thu, 19 Oct 2023 17:14:42 +0200 Subject: [PATCH 5/5] fix: test had filternames spelled wrong MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- filters/scheduler/fifo_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/filters/scheduler/fifo_test.go b/filters/scheduler/fifo_test.go index cd27b72f13..c9f0093f20 100644 --- a/filters/scheduler/fifo_test.go +++ b/filters/scheduler/fifo_test.go @@ -403,8 +403,8 @@ func TestFifo(t *testing.T) { wantOkRate: 1.0, }, { - name: "fifoWithbody simple ok", - filter: `fifoWithbody(3, 5, "1s")`, + name: "fifoWithBody simple ok", + filter: `fifoWithBody(3, 5, "1s")`, freq: 20, per: 100 * time.Millisecond, backendTime: 1 * time.Millisecond, @@ -421,8 +421,8 @@ func TestFifo(t *testing.T) { wantOkRate: 0, }, { - name: "fifoWithbody simple client canceled", - filter: `fifoWithbody(3, 5, "1s")`, + name: "fifoWithBody simple client canceled", + filter: `fifoWithBody(3, 5, "1s")`, freq: 20, per: 100 * time.Millisecond, backendTime: 1 * time.Millisecond, @@ -439,8 +439,8 @@ func TestFifo(t *testing.T) { wantOkRate: 0.005, }, { - name: "fifoWithbody with reaching max concurrency and queue timeouts", - filter: `fifoWithbody(3, 5, "10ms")`, + name: "fifoWithBody with reaching max concurrency and queue timeouts", + filter: `fifoWithBody(3, 5, "10ms")`, freq: 20, per: 10 * time.Millisecond, backendTime: 11 * time.Millisecond,