Skip to content

Commit

Permalink
feature: filter fifoWithBody (#2685)
Browse files Browse the repository at this point in the history
* feature: filter fifoWithBody that works similar to fifo(), but release deferred until body streaming to client was finished
* fix: fifo() and fifoWithBody() with canceled requests

Both filters did not check for canceled context from request before semaphore.Acquire, see golang/go#63615

Signed-off-by: Sandor Szücs <[email protected]>
  • Loading branch information
szuecs authored Oct 20, 2023
1 parent c7c8a9c commit 7f45e4e
Show file tree
Hide file tree
Showing 7 changed files with 460 additions and 89 deletions.
25 changes: 25 additions & 0 deletions docs/reference/filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -2819,6 +2819,31 @@ Example:
fifo(100, 150, "10s")
```
### fifoWithBody
This Filter is similar to the [lifo](#lifo) filter in regards to
parameters and status codes.
Performance considerations are similar to [fifo](#fifo).
The difference between fifo and fifoWithBody is that fifo will decrement
the concurrency as soon as the backend sent response headers and
fifoWithBody will decrement the concurrency if the response body was
served. Normally both are very similar, but if you have a fully async
component that serves multiple website fragments, this would decrement
concurrency too early.
Parameters:
* MaxConcurrency specifies how many goroutines are allowed to work on this queue (int)
* MaxQueueSize sets the queue size (int)
* Timeout sets the timeout to get request scheduled (time)
Example:
```
fifoWithBody(100, 150, "10s")
```
### lifo
This Filter changes skipper to handle the route with a bounded last in
Expand Down
1 change: 1 addition & 0 deletions filters/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func Filters() []filters.Spec {
auth.NewForwardToken(),
auth.NewForwardTokenField(),
scheduler.NewFifo(),
scheduler.NewFifoWithBody(),
scheduler.NewLIFO(),
scheduler.NewLIFOGroup(),
rfc.NewPath(),
Expand Down
1 change: 1 addition & 0 deletions filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ const (
SetDynamicBackendUrl = "setDynamicBackendUrl"
ApiUsageMonitoringName = "apiUsageMonitoring"
FifoName = "fifo"
FifoWithBodyName = "fifoWithBody"
LifoName = "lifo"
LifoGroupName = "lifoGroup"
RfcPathName = "rfcPath"
Expand Down
52 changes: 33 additions & 19 deletions filters/scheduler/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,31 @@ import (
"github.com/zalando/skipper/scheduler"
)

const (
fifoKey string = "fifo"
)

type (
fifoSpec struct{}
fifoSpec struct {
typ string
}
fifoFilter struct {
config scheduler.Config
queue *scheduler.FifoQueue
typ string
}
)

func NewFifo() filters.Spec {
return &fifoSpec{}
return &fifoSpec{
typ: filters.FifoName,
}
}

func NewFifoWithBody() filters.Spec {
return &fifoSpec{
typ: filters.FifoWithBodyName,
}
}

func (*fifoSpec) Name() string {
return filters.FifoName
func (s *fifoSpec) Name() string {
return s.typ
}

// CreateFilter creates a fifoFilter, that will use a semaphore based
Expand Down Expand Up @@ -65,6 +72,7 @@ func (s *fifoSpec) CreateFilter(args []interface{}) (filters.Filter, error) {
}

return &fifoFilter{
typ: s.typ,
config: scheduler.Config{
MaxConcurrency: cc,
MaxQueueSize: qs,
Expand Down Expand Up @@ -132,23 +140,29 @@ func (f *fifoFilter) Request(ctx filters.FilterContext) {
}

// ok
pending, _ := ctx.StateBag()[fifoKey].([]func())
ctx.StateBag()[fifoKey] = append(pending, done)
pending, _ := ctx.StateBag()[f.typ].([]func())
ctx.StateBag()[f.typ] = append(pending, done)
}

// Response will decrease the number of inflight requests to release
// the concurrency reservation for the request.
func (f *fifoFilter) Response(ctx filters.FilterContext) {
pending, ok := ctx.StateBag()[fifoKey].([]func())
if !ok {
return
}
last := len(pending) - 1
if last < 0 {
return
switch f.typ {
case filters.FifoName:
pending, ok := ctx.StateBag()[f.typ].([]func())
if !ok {
return
}
last := len(pending) - 1
if last < 0 {
return
}
pending[last]()
ctx.StateBag()[f.typ] = pending[:last]

case filters.FifoWithBodyName:
// nothing to do here, handled in the proxy after copyStream()
}
pending[last]()
ctx.StateBag()[fifoKey] = pending[:last]
}

// HandleErrorResponse is to opt-in for filters to get called
Expand Down
Loading

0 comments on commit 7f45e4e

Please sign in to comment.