Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: filter fifoWithBody #2676

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions filters/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func Filters() []filters.Spec {
auth.NewForwardToken(),
auth.NewForwardTokenField(),
scheduler.NewFifo(),
scheduler.NewFifoWithBody(),
scheduler.NewLIFO(),
scheduler.NewLIFOGroup(),
rfc.NewPath(),
Expand Down
4 changes: 4 additions & 0 deletions filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (

// BackendRatelimit is the key used in the state bag to configure backend ratelimit in proxy
BackendRatelimit = "backend:ratelimit"

// FifoWithBody
FifoWithBody = "fifo:body:func"
)

// FilterContext object providing state and information that is unique to a request.
Expand Down Expand Up @@ -327,6 +330,7 @@ const (
SetDynamicBackendUrl = "setDynamicBackendUrl"
ApiUsageMonitoringName = "apiUsageMonitoring"
FifoName = "fifo"
FifoWithBodyName = "fifoWithBody"
LifoName = "lifo"
LifoGroupName = "lifoGroup"
RfcPathName = "rfcPath"
Expand Down
85 changes: 85 additions & 0 deletions filters/scheduler/cleanup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package scheduler

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/proxy"
"github.com/zalando/skipper/routing"
"github.com/zalando/skipper/routing/testdataclient"
"github.com/zalando/skipper/scheduler"
)

func TestCleanupOnBackendErrors(t *testing.T) {
doc := `
aroute: *
-> lifo(1, 1, "100ms")
-> lifoGroup("foo", 1, 1, "100ms")
-> lifo(2, 2, "200ms")
-> lifoGroup("bar", 1, 1, "100ms")
-> fifo(1, 1, "200ms")
-> "http://test.invalid"
`

dc, err := testdataclient.NewDoc(doc)
require.NoError(t, err)
defer dc.Close()

reg := scheduler.RegistryWith(scheduler.Options{})
defer reg.Close()

fr := make(filters.Registry)
fr.Register(NewLIFO())
fr.Register(NewLIFOGroup())
fr.Register(NewFifo())

ro := routing.Options{
SignalFirstLoad: true,
FilterRegistry: fr,
DataClients: []routing.DataClient{dc},
PostProcessors: []routing.PostProcessor{reg},
}

rt := routing.New(ro)
defer rt.Close()

<-rt.FirstLoad()

pr := proxy.WithParams(proxy.Params{
Routing: rt,
})
defer pr.Close()

ts := httptest.NewServer(pr)
defer ts.Close()

rsp, err := http.Get(ts.URL)
require.NoError(t, err)
rsp.Body.Close()

var route *routing.Route
{
req, err := http.NewRequest("GET", ts.URL, nil)
require.NoError(t, err)

route, _ = rt.Get().Do(req)
require.NotNil(t, route, "failed to lookup route")
}

for _, f := range route.Filters {
if qf, ok := f.Filter.(interface{ GetQueue() *scheduler.Queue }); ok {
status := qf.GetQueue().Status()
assert.Equal(t, scheduler.QueueStatus{ActiveRequests: 0, QueuedRequests: 0, Closed: false}, status)
} else if qf, ok := f.Filter.(interface{ GetQueue() *scheduler.FifoQueue }); ok {
status := qf.GetQueue().Status()
assert.Equal(t, scheduler.QueueStatus{ActiveRequests: 0, QueuedRequests: 0, Closed: false}, status)
} else {
t.Fatal("filter does not implement GetQueue()")
}
}
}
67 changes: 55 additions & 12 deletions filters/scheduler/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,51 @@
fifoKey string = "fifo"
)

type fifoType int

const (
fifo fifoType = iota + 1
fifoWithBody
)
Comment on lines +20 to +23
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use filter name as spec.typ and have withBody bool in filter similar to

f := &histFilter{
response: s.typ == filters.HistogramResponseLatencyName,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the enum style better than string.


type (
fifoSpec struct{}
fifoSpec struct {
typ fifoType
}
fifoFilter struct {
config scheduler.Config
queue *scheduler.FifoQueue
typ fifoType
}
)

func NewFifo() filters.Spec {
return &fifoSpec{}
return &fifoSpec{
typ: fifo,
}
}

func NewFifoWithBody() filters.Spec {
return &fifoSpec{
typ: fifoWithBody,
}
}

func (*fifoSpec) Name() string {
return filters.FifoName
func (f *fifoSpec) Name() string {
switch f.typ {
case fifo:
return filters.FifoName
case fifoWithBody:
return filters.FifoWithBodyName
}
return "unknown"
}

// CreateFilter creates a fifoFilter, that will use a semaphore based
// queue for handling requests to limit concurrency of a route. The first
// parameter is maxConcurrency the second maxQueueSize and the third
// timeout.
func (s *fifoSpec) CreateFilter(args []interface{}) (filters.Filter, error) {

Check failure on line 62 in filters/scheduler/fifo.go

View workflow job for this annotation

GitHub Actions / tests

methods on the same type should have the same receiver name (seen 1x "f", 1x "s") (ST1016)

Check failure on line 62 in filters/scheduler/fifo.go

View workflow job for this annotation

GitHub Actions / tests

methods on the same type should have the same receiver name (seen 1x "f", 1x "s") (ST1016)
if len(args) != 3 {
return nil, filters.ErrInvalidFilterParameters
}
Expand Down Expand Up @@ -65,6 +89,7 @@
}

return &fifoFilter{
typ: s.typ,
config: scheduler.Config{
MaxConcurrency: cc,
MaxQueueSize: qs,
Expand Down Expand Up @@ -139,14 +164,32 @@
// Response will decrease the number of inflight requests to release
// the concurrency reservation for the request.
func (f *fifoFilter) Response(ctx filters.FilterContext) {
pending, ok := ctx.StateBag()[fifoKey].([]func())
if !ok {
return
g := f.createResponse(ctx)
switch f.typ {
case fifo:
g()
case fifoWithBody:
ctx.StateBag()[filters.FifoWithBody] = g
Comment on lines +171 to +172
Copy link
Member

@AlexanderYastrebov AlexanderYastrebov Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not clear how multiple fifoWithBody would chain and how they would interplay with regular fifo.
I think you'd need to have the same logic stored under different keys and then run chain of hooks in two places in proxy depending on the key at proxy.do and proxy.ServeHTTP:

...
func (f *fifoFilter) createResponse(ctx filters.FilterContext, typ string) {
        pending, ok := ctx.StateBag()[typ].([]func())
        if !ok {
	        return
        }
        last := len(pending) - 1
        if last < 0 {
	        return
        }
        pending[last]()
        ctx.StateBag()[fifoKey] = pending[:last]
}

...
// existing in proxy.do
pendingFIFO, _ := stateBag[scheduler.FIFOKey].([]func())
for _, done := range pendingFIFO {
	done()
}
...
// in proxy.ServeHTTP
pendingFIFO, _ := stateBag[scheduler.FIFOWithBodyKey].([]func())
for _, done := range pendingFIFO {
	done()
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's a good point. I also thought about that I need to do something like that.
What do you like more to support the copystream part? this one or the other?

Copy link
Member

@AlexanderYastrebov AlexanderYastrebov Oct 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove existing pendingFIFO cleanup from proxy.go see #2680

}
last := len(pending) - 1
if last < 0 {
return
}

func (f *fifoFilter) createResponse(ctx filters.FilterContext) func() {
return func() {
pending, ok := ctx.StateBag()[fifoKey].([]func())
if !ok {
return
}
last := len(pending) - 1
if last < 0 {
return
}
pending[last]()
ctx.StateBag()[fifoKey] = pending[:last]
}
pending[last]()
ctx.StateBag()[fifoKey] = pending[:last]
}

// HandleErrorResponse is to opt-in for filters to get called
// Response(ctx) in case of errors via proxy. It has to return true to opt-in.
func (f *fifoFilter) HandleErrorResponse() bool {
return true
}
Loading
Loading