From 215d8d9316882cf36427a1faeb0a0bfd5c88bbb3 Mon Sep 17 00:00:00 2001 From: garmr Date: Wed, 7 Aug 2024 14:52:56 -0700 Subject: [PATCH] add exponential backoff on post error --- flashlight.go | 8 ++++++- services/bypass.go | 8 ++++--- services/config.go | 23 ++++++++++---------- services/http.go | 54 +++++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 74 insertions(+), 19 deletions(-) diff --git a/flashlight.go b/flashlight.go index 35bf61817..ae80edbb3 100644 --- a/flashlight.go +++ b/flashlight.go @@ -496,7 +496,13 @@ func (f *Flashlight) startConfigService() (services.StopFn, error) { detour.SetCountry(nc) } - proxyMap := f.convertNewProxyConfToOld(new.GetProxy().GetProxies()) + pconfig := new.GetProxy() + if pconfig == nil || len(pconfig.GetProxies()) == 0 { + return // return early since there are no new proxy configs + } + + log.Debug("Received new proxy configs") + proxyMap := f.convertNewProxyConfToOld(pconfig.GetProxies()) f.notifyProxyListeners(proxyMap, config.Fetched) } diff --git a/services/bypass.go b/services/bypass.go index f514412c3..5fbd2a1d0 100644 --- a/services/bypass.go +++ b/services/bypass.go @@ -153,6 +153,7 @@ type proxy struct { name string dfRoundTripper http.RoundTripper proxyRoundTripper http.RoundTripper + sender *sender toggle *atomic.Bool userConfig common.UserConfig } @@ -167,10 +168,11 @@ func newProxy( return &proxy{ ProxyConfig: pc, name: name, - toggle: atomic.NewBool(mrand.Float32() < 0.5), + proxyRoundTripper: newProxyRoundTripper(name, pc, userConfig, dialer), dfRoundTripper: proxied.Fronted(0), + sender: &sender{}, + toggle: atomic.NewBool(mrand.Float32() < 0.5), userConfig: userConfig, - proxyRoundTripper: newProxyRoundTripper(name, pc, userConfig, dialer), } } @@ -237,7 +239,7 @@ func (p *proxy) sendToBypass() (int64, error) { } logger.Debugf("bypass: Sending traffic for bypass server: %v", p.name) - resp, sleep, err := post( + resp, sleep, err := p.sender.post( endpoint, bytes.NewReader(bypassBuf), rt, diff --git a/services/config.go b/services/config.go index 03a95b2dc..168a261b0 100644 --- a/services/config.go +++ b/services/config.go @@ -55,6 +55,8 @@ type configService struct { configHandler ConfigHandler lastFetched time.Time + sender *sender + done chan struct{} running bool mu sync.Mutex @@ -69,13 +71,11 @@ type ConfigHandler interface { } var ( - // initialize variable so we don't have to lock mutex and check if it's nil every time someone - // calls GetClientConfig - _configService = &configService{} + _configService = &configService{sender: &sender{}} ) // StartConfigService starts a new config service with the given options and returns a func to stop -// it. It will return an error if opts.OriginURL, opts.Rt, opts.Fetcher, or opts.OnConfig are nil. +// it. It will return an error if opts.OriginURL, opts.Rt, or opts.OnConfig are nil. func StartConfigService(handler ConfigHandler, opts *ConfigOptions) (StopFn, error) { _configService.mu.Lock() defer _configService.mu.Unlock() @@ -166,7 +166,7 @@ func (cs *configService) fetchConfig() (int64, error) { logger.Debug("configservice: Received config") curConf := cs.configHandler.GetConfig() - if curConf != nil && !configIsNew(curConf, newConf) { + if curConf != nil && !configIsNew(newConf) { op.Set("config_changed", false) logger.Debug("configservice: Config is unchanged") return sleep, nil @@ -190,7 +190,7 @@ func (cs *configService) fetch() (*apipb.ConfigResponse, int64, error) { return nil, 0, fmt.Errorf("unable to marshal config request: %w", err) } - resp, sleep, err := post( + resp, sleep, err := cs.sender.post( cs.opts.OriginURL, bytes.NewReader(buf), cs.opts.RoundTripper, @@ -246,10 +246,9 @@ func (cs *configService) newRequest() *apipb.ConfigRequest { return confReq } -// configIsNew returns true if country, proToken, or ip in currInfo differ from new or if new has -// proxy configs. -func configIsNew(cur, new *apipb.ConfigResponse) bool { - return cur.GetCountry() != new.GetCountry() || - cur.GetProToken() != new.GetProToken() || - len(new.GetProxy().GetProxies()) > 0 +// configIsNew returns true if any fields contain values. +func configIsNew(new *apipb.ConfigResponse) bool { + // We only need to check if the fields we're interested in contain values because the server + // will only send us new values if they have changed. + return new.Country != "" || new.ProToken != "" || len(new.Proxy.Proxies) > 0 } diff --git a/services/http.go b/services/http.go index 78a69a63d..7cb3d880f 100644 --- a/services/http.go +++ b/services/http.go @@ -3,14 +3,63 @@ package services import ( "fmt" "io" + "math" "net/http" "strconv" + "time" "github.com/getlantern/flashlight/v7/common" ) -// it is the caller's responsibility to read the response body to completion and close it -func post( +const ( + retryWaitMillis = 100 + maxRetryWait = 10 * time.Minute +) + +// sender is a helper for sending post requests. If the request fails, sender calulates an +// exponential backoff time and return it as the sleep time. +type sender struct { + failCount int + atMaxRetryWait bool +} + +// post posts data to the specified URL and returns the response body, as a ReadCloser, the sleep +// time in seconds, and any error that occurred. +// +// Note: it is the responsibility of the caller to read the ReadCloser to completion and close it. +func (s *sender) post( + originURL string, + buf io.Reader, + rt http.RoundTripper, + user common.UserConfig, +) (io.ReadCloser, int64, error) { + reader, sleepVal, err := s.post(originURL, buf, rt, user) + if err == nil { + s.failCount = 0 + s.atMaxRetryWait = false + return reader, sleepVal, nil + } + + if s.atMaxRetryWait { + // we've already reached the max wait time, so we don't need to perform the calculation again. + // we'll still increment the fail count to keep track of the number of failures + s.failCount++ + return reader, int64(maxRetryWait.Seconds()), err + } + + wait := time.Duration(math.Pow(2, float64(s.failCount)) * float64(retryWaitMillis)) + wait *= time.Millisecond + s.failCount++ + + if wait > maxRetryWait { + s.atMaxRetryWait = true + return reader, int64(maxRetryWait.Seconds()), err + } + + return reader, int64(wait.Seconds()), err +} + +func (s *sender) doPost( originURL string, buf io.Reader, rt http.RoundTripper, @@ -23,7 +72,6 @@ func post( common.AddCommonHeaders(user, req) req.Header.Set("Content-Type", "application/x-protobuf") - req.Header.Set("Accept", "application/x-gzip") // Prevents intermediate nodes (domain-fronters) from caching the content req.Header.Set("Cache-Control", "no-cache")