Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: filter fifoWithBody #2685

Merged
merged 5 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/reference/filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -2819,6 +2819,31 @@ Example:
fifo(100, 150, "10s")
```

### fifoWithBody

This Filter is similar to the [lifo](#lifo) filter in regards to
parameters and status codes.
Performance considerations are similar to [fifo](#fifo).

The difference between fifo and fifoWithBody is that fifo will decrement
the concurrency as soon as the backend sent response headers and
fifoWithBody will decrement the concurrency if the response body was
served. Normally both are very similar, but if you have a fully async
component that serves multiple website fragments, this would decrement
concurrency too early.

Parameters:

* MaxConcurrency specifies how many goroutines are allowed to work on this queue (int)
* MaxQueueSize sets the queue size (int)
* Timeout sets the timeout to get request scheduled (time)

Example:

```
fifoWithBody(100, 150, "10s")
```

### lifo

This Filter changes skipper to handle the route with a bounded last in
Expand Down
1 change: 1 addition & 0 deletions filters/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func Filters() []filters.Spec {
auth.NewForwardToken(),
auth.NewForwardTokenField(),
scheduler.NewFifo(),
scheduler.NewFifoWithBody(),
scheduler.NewLIFO(),
scheduler.NewLIFOGroup(),
rfc.NewPath(),
Expand Down
1 change: 1 addition & 0 deletions filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ const (
SetDynamicBackendUrl = "setDynamicBackendUrl"
ApiUsageMonitoringName = "apiUsageMonitoring"
FifoName = "fifo"
FifoWithBodyName = "fifoWithBody"
LifoName = "lifo"
LifoGroupName = "lifoGroup"
RfcPathName = "rfcPath"
Expand Down
52 changes: 33 additions & 19 deletions filters/scheduler/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,31 @@ import (
"github.com/zalando/skipper/scheduler"
)

const (
fifoKey string = "fifo"
)

type (
fifoSpec struct{}
fifoSpec struct {
typ string
}
fifoFilter struct {
config scheduler.Config
queue *scheduler.FifoQueue
typ string
}
)

func NewFifo() filters.Spec {
return &fifoSpec{}
return &fifoSpec{
typ: filters.FifoName,
}
}

func NewFifoWithBody() filters.Spec {
return &fifoSpec{
typ: filters.FifoWithBodyName,
}
}

func (*fifoSpec) Name() string {
return filters.FifoName
func (s *fifoSpec) Name() string {
return s.typ
}

// CreateFilter creates a fifoFilter, that will use a semaphore based
Expand Down Expand Up @@ -65,6 +72,7 @@ func (s *fifoSpec) CreateFilter(args []interface{}) (filters.Filter, error) {
}

return &fifoFilter{
typ: s.typ,
config: scheduler.Config{
MaxConcurrency: cc,
MaxQueueSize: qs,
Expand Down Expand Up @@ -132,23 +140,29 @@ func (f *fifoFilter) Request(ctx filters.FilterContext) {
}

// ok
pending, _ := ctx.StateBag()[fifoKey].([]func())
ctx.StateBag()[fifoKey] = append(pending, done)
pending, _ := ctx.StateBag()[f.typ].([]func())
ctx.StateBag()[f.typ] = append(pending, done)
}

// Response will decrease the number of inflight requests to release
// the concurrency reservation for the request.
func (f *fifoFilter) Response(ctx filters.FilterContext) {
pending, ok := ctx.StateBag()[fifoKey].([]func())
if !ok {
return
}
last := len(pending) - 1
if last < 0 {
return
switch f.typ {
case filters.FifoName:
pending, ok := ctx.StateBag()[f.typ].([]func())
if !ok {
return
}
last := len(pending) - 1
if last < 0 {
return
}
pending[last]()
ctx.StateBag()[f.typ] = pending[:last]

case filters.FifoWithBodyName:
// nothing to do here, handled in the proxy after copyStream()
}
pending[last]()
ctx.StateBag()[fifoKey] = pending[:last]
}

// HandleErrorResponse is to opt-in for filters to get called
Expand Down
Loading