Skip to content

Commit

Permalink
[bugfix] Relax outgoing http request queue (#760)
Browse files Browse the repository at this point in the history
* add request queue trace logging

* fix  misleading wording

* implement request slots per host per method

* undo formatting change (?)

* remove gratuitous trace logging

* rename done -> release
avoids confusion with ctx.Done
  • Loading branch information
tsmethurst authored Aug 27, 2022
1 parent c951ba1 commit 969c194
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 6 deletions.
19 changes: 13 additions & 6 deletions internal/httpclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type Config struct {
// is available (context channels still respected)
type Client struct {
client http.Client
queue chan struct{}
rc *requestQueue
bmax int64
}

Expand Down Expand Up @@ -118,7 +118,9 @@ func New(cfg Config) *Client {

// Prepare client fields
c.bmax = cfg.MaxBodySize
c.queue = make(chan struct{}, cfg.MaxOpenConns)
c.rc = &requestQueue{
maxOpenConns: cfg.MaxOpenConns,
}
c.client.Timeout = cfg.Timeout

// Set underlying HTTP client roundtripper
Expand All @@ -143,13 +145,18 @@ func New(cfg Config) *Client {
// as the standard http.Client{}.Do() implementation except that response body will
// be wrapped by an io.LimitReader() to limit response body sizes.
func (c *Client) Do(req *http.Request) (*http.Response, error) {
// request a spot in the wait queue...
wait, release := c.rc.getWaitSpot(req.Host, req.Method)

// ... and wait our turn
select {
// Request context cancelled
case <-req.Context().Done():
// the request was canceled before we
// got to our turn: no need to release
return nil, req.Context().Err()
case wait <- struct{}{}:
// it's our turn!

// Slot in queue acquired
case c.queue <- struct{}{}:
// NOTE:
// Ideally here we would set the slot release to happen either
// on error return, or via callback from the response body closer.
Expand All @@ -160,7 +167,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
// that connections may not be closed until response body is closed.
// The current implementation will reduce the viability of denial of
// service attacks, but if there are future issues heed this advice :]
defer func() { <-c.queue }()
defer release()
}

// Firstly, ensure this is a valid request
Expand Down
68 changes: 68 additions & 0 deletions internal/httpclient/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
GoToSocial
Copyright (C) 2021-2022 GoToSocial Authors [email protected]
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package httpclient

import (
"strings"
"sync"

"github.com/superseriousbusiness/gotosocial/internal/log"
)

type requestQueue struct {
hostQueues sync.Map // map of `hostQueue`
maxOpenConns int // max open conns per host per request method
}

type hostQueue struct {
slotsByMethod sync.Map
}

// getWaitSpot returns a wait channel and release function for http clients
// that want to do requests politely: that is, wait for their turn.
//
// To wait, a caller should do a select on an attempted insert into the
// returned wait channel. Once the insert succeeds, then the caller should
// proceed with the http request that pertains to the given host + method.
// It doesn't matter what's put into the wait channel, just any interface{}.
//
// When the caller is finished with their http request, they should free up the
// slot they were occupying in the wait queue, by calling the release function.
//
// The reason for the caller needing to provide host and method, is that each
// remote host has a separate wait queue, and there's a separate wait queue
// per method for that host as well. This ensures that outgoing requests can still
// proceed for others hosts and methods while other requests are undergoing,
// while also preventing one host from being spammed with, for example, a
// shitload of GET requests all at once.
func (rc *requestQueue) getWaitSpot(host string, method string) (wait chan<- interface{}, release func()) {
hostQueueI, _ := rc.hostQueues.LoadOrStore(host, new(hostQueue))
hostQueue, ok := hostQueueI.(*hostQueue)
if !ok {
log.Panic("hostQueueI was not a *hostQueue")
}

waitSlotI, _ := hostQueue.slotsByMethod.LoadOrStore(strings.ToUpper(method), make(chan interface{}, rc.maxOpenConns))
methodQueue, ok := waitSlotI.(chan interface{})
if !ok {
log.Panic("waitSlotI was not a chan interface{}")
}

return methodQueue, func() { <-methodQueue }
}
106 changes: 106 additions & 0 deletions internal/httpclient/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
GoToSocial
Copyright (C) 2021-2022 GoToSocial Authors [email protected]
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package httpclient

import (
"net/http"
"testing"
"time"

"github.com/stretchr/testify/suite"
)

type QueueTestSuite struct {
suite.Suite
}

func (suite *QueueTestSuite) TestQueue() {
maxOpenConns := 5
waitTimeout := 1 * time.Second

rc := &requestQueue{
maxOpenConns: maxOpenConns,
}

// fill all the open connections
var release func()
for i, n := range make([]interface{}, maxOpenConns) {
w, r := rc.getWaitSpot("example.org", http.MethodPost)
w <- n
if i == maxOpenConns-1 {
// save the last release function
release = r
}
}

// try to wait again for the same host/method combo, it should timeout
waitAgain, _ := rc.getWaitSpot("example.org", "post")

select {
case waitAgain <- struct{}{}:
suite.FailNow("first wait did not time out")
case <-time.After(waitTimeout):
break
}

// now close the final release that we derived earlier
release()

// try waiting again, it should work this time
select {
case waitAgain <- struct{}{}:
break
case <-time.After(waitTimeout):
suite.FailNow("second wait timed out")
}

// the POST queue is now sitting on full
suite.Len(waitAgain, maxOpenConns)

// we should still be able to make a GET for the same host though
getWait, getRelease := rc.getWaitSpot("example.org", http.MethodGet)
select {
case getWait <- struct{}{}:
break
case <-time.After(waitTimeout):
suite.FailNow("get wait timed out")
}

// the GET queue has one request waiting
suite.Len(getWait, 1)
// clear it...
getRelease()
suite.Empty(getWait)

// even though the POST queue for example.org is full, we
// should still be able to make a POST request to another host :)
waitForAnotherHost, _ := rc.getWaitSpot("somewhere.else", http.MethodPost)
select {
case waitForAnotherHost <- struct{}{}:
break
case <-time.After(waitTimeout):
suite.FailNow("get wait timed out")
}

suite.Len(waitForAnotherHost, 1)
}

func TestQueueTestSuite(t *testing.T) {
suite.Run(t, &QueueTestSuite{})
}

0 comments on commit 969c194

Please sign in to comment.