Skip to content

Commit

Permalink
add exponential backoff on post error
Browse files Browse the repository at this point in the history
  • Loading branch information
garmr-ulfr committed Aug 14, 2024
1 parent b17a9b9 commit 215d8d9
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 19 deletions.
8 changes: 7 additions & 1 deletion flashlight.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,13 @@ func (f *Flashlight) startConfigService() (services.StopFn, error) {
detour.SetCountry(nc)
}

proxyMap := f.convertNewProxyConfToOld(new.GetProxy().GetProxies())
pconfig := new.GetProxy()
if pconfig == nil || len(pconfig.GetProxies()) == 0 {
return // return early since there are no new proxy configs
}

log.Debug("Received new proxy configs")
proxyMap := f.convertNewProxyConfToOld(pconfig.GetProxies())
f.notifyProxyListeners(proxyMap, config.Fetched)
}

Expand Down
8 changes: 5 additions & 3 deletions services/bypass.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type proxy struct {
name string
dfRoundTripper http.RoundTripper
proxyRoundTripper http.RoundTripper
sender *sender
toggle *atomic.Bool
userConfig common.UserConfig
}
Expand All @@ -167,10 +168,11 @@ func newProxy(
return &proxy{
ProxyConfig: pc,
name: name,
toggle: atomic.NewBool(mrand.Float32() < 0.5),
proxyRoundTripper: newProxyRoundTripper(name, pc, userConfig, dialer),
dfRoundTripper: proxied.Fronted(0),
sender: &sender{},
toggle: atomic.NewBool(mrand.Float32() < 0.5),
userConfig: userConfig,
proxyRoundTripper: newProxyRoundTripper(name, pc, userConfig, dialer),
}
}

Expand Down Expand Up @@ -237,7 +239,7 @@ func (p *proxy) sendToBypass() (int64, error) {
}

logger.Debugf("bypass: Sending traffic for bypass server: %v", p.name)
resp, sleep, err := post(
resp, sleep, err := p.sender.post(
endpoint,
bytes.NewReader(bypassBuf),
rt,
Expand Down
23 changes: 11 additions & 12 deletions services/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type configService struct {
configHandler ConfigHandler
lastFetched time.Time

sender *sender

done chan struct{}
running bool
mu sync.Mutex
Expand All @@ -69,13 +71,11 @@ type ConfigHandler interface {
}

var (
// initialize variable so we don't have to lock mutex and check if it's nil every time someone
// calls GetClientConfig
_configService = &configService{}
_configService = &configService{sender: &sender{}}
)

// StartConfigService starts a new config service with the given options and returns a func to stop
// it. It will return an error if opts.OriginURL, opts.Rt, opts.Fetcher, or opts.OnConfig are nil.
// it. It will return an error if opts.OriginURL, opts.Rt, or opts.OnConfig are nil.
func StartConfigService(handler ConfigHandler, opts *ConfigOptions) (StopFn, error) {
_configService.mu.Lock()
defer _configService.mu.Unlock()
Expand Down Expand Up @@ -166,7 +166,7 @@ func (cs *configService) fetchConfig() (int64, error) {

logger.Debug("configservice: Received config")
curConf := cs.configHandler.GetConfig()
if curConf != nil && !configIsNew(curConf, newConf) {
if curConf != nil && !configIsNew(newConf) {
op.Set("config_changed", false)
logger.Debug("configservice: Config is unchanged")
return sleep, nil
Expand All @@ -190,7 +190,7 @@ func (cs *configService) fetch() (*apipb.ConfigResponse, int64, error) {
return nil, 0, fmt.Errorf("unable to marshal config request: %w", err)
}

resp, sleep, err := post(
resp, sleep, err := cs.sender.post(
cs.opts.OriginURL,
bytes.NewReader(buf),
cs.opts.RoundTripper,
Expand Down Expand Up @@ -246,10 +246,9 @@ func (cs *configService) newRequest() *apipb.ConfigRequest {
return confReq
}

// configIsNew returns true if country, proToken, or ip in currInfo differ from new or if new has
// proxy configs.
func configIsNew(cur, new *apipb.ConfigResponse) bool {
return cur.GetCountry() != new.GetCountry() ||
cur.GetProToken() != new.GetProToken() ||
len(new.GetProxy().GetProxies()) > 0
// configIsNew returns true if any fields contain values.
func configIsNew(new *apipb.ConfigResponse) bool {
// We only need to check if the fields we're interested in contain values because the server
// will only send us new values if they have changed.
return new.Country != "" || new.ProToken != "" || len(new.Proxy.Proxies) > 0
}
54 changes: 51 additions & 3 deletions services/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,63 @@ package services
import (
"fmt"
"io"
"math"
"net/http"
"strconv"
"time"

"github.com/getlantern/flashlight/v7/common"
)

// it is the caller's responsibility to read the response body to completion and close it
func post(
const (
retryWaitMillis = 100
maxRetryWait = 10 * time.Minute
)

// sender is a helper for sending post requests. If the request fails, sender calulates an
// exponential backoff time and return it as the sleep time.
type sender struct {
failCount int
atMaxRetryWait bool
}

// post posts data to the specified URL and returns the response body, as a ReadCloser, the sleep
// time in seconds, and any error that occurred.
//
// Note: it is the responsibility of the caller to read the ReadCloser to completion and close it.
func (s *sender) post(
originURL string,
buf io.Reader,
rt http.RoundTripper,
user common.UserConfig,
) (io.ReadCloser, int64, error) {
reader, sleepVal, err := s.post(originURL, buf, rt, user)
if err == nil {
s.failCount = 0
s.atMaxRetryWait = false
return reader, sleepVal, nil
}

if s.atMaxRetryWait {
// we've already reached the max wait time, so we don't need to perform the calculation again.
// we'll still increment the fail count to keep track of the number of failures
s.failCount++
return reader, int64(maxRetryWait.Seconds()), err
}

wait := time.Duration(math.Pow(2, float64(s.failCount)) * float64(retryWaitMillis))
wait *= time.Millisecond
s.failCount++

if wait > maxRetryWait {
s.atMaxRetryWait = true
return reader, int64(maxRetryWait.Seconds()), err
}

return reader, int64(wait.Seconds()), err
}

func (s *sender) doPost(
originURL string,
buf io.Reader,
rt http.RoundTripper,
Expand All @@ -23,7 +72,6 @@ func post(

common.AddCommonHeaders(user, req)
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Accept", "application/x-gzip")
// Prevents intermediate nodes (domain-fronters) from caching the content
req.Header.Set("Cache-Control", "no-cache")

Expand Down

0 comments on commit 215d8d9

Please sign in to comment.