Skip to content

Commit

Permalink
Merge branch 'main' into group-client-reqs
Browse files Browse the repository at this point in the history
  • Loading branch information
garmr-ulfr committed Nov 15, 2024
2 parents 2612e45 + 392245a commit b11f96c
Show file tree
Hide file tree
Showing 24 changed files with 3,318 additions and 9,727 deletions.
8 changes: 4 additions & 4 deletions chained/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"github.com/getlantern/idletiming"
gp "github.com/getlantern/proxy/v3"

"github.com/getlantern/flashlight/v7/bandit"
"github.com/getlantern/flashlight/v7/bandwidth"
"github.com/getlantern/flashlight/v7/common"
"github.com/getlantern/flashlight/v7/dialer"
"github.com/getlantern/flashlight/v7/domainrouting"
"github.com/getlantern/flashlight/v7/ops"
)
Expand Down Expand Up @@ -144,7 +144,7 @@ func (p *proxy) DialContext(ctx context.Context, network, addr string) (conn net
return nil, err == errUpstream, err
}

if network == bandit.NetworkConnect {
if network == dialer.NetworkConnect {
// only mark success if we did a CONNECT request because that involves a
// full round-trip to/from the proxy
p.markSuccess()
Expand Down Expand Up @@ -189,12 +189,12 @@ func dialOrigin(op *ops.Op, ctx context.Context, p *proxy, network, addr string)
// that we should send a CONNECT request and tunnel all traffic through
// that.
switch network {
case bandit.NetworkConnect:
case dialer.NetworkConnect:
log.Trace("Sending CONNECT request")
bconn := bufconn.Wrap(conn)
conn = bconn
err = p.sendCONNECT(op, addr, bconn)
case bandit.NetworkPersistent:
case dialer.NetworkPersistent:
log.Trace("Sending GET request to establish persistent HTTP connection")
err = p.initPersistentConnection(addr, conn)
}
Expand Down
4 changes: 2 additions & 2 deletions chained/multipath.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (

"github.com/getlantern/common/config"
"github.com/getlantern/errors"
"github.com/getlantern/flashlight/v7/bandit"
"github.com/getlantern/flashlight/v7/common"
"github.com/getlantern/flashlight/v7/dialer"
"github.com/getlantern/flashlight/v7/ops"
"github.com/getlantern/multipath"
)
Expand Down Expand Up @@ -49,7 +49,7 @@ func (impl *multipathImpl) FormatStats() []string {
return impl.dialer.(multipath.Stats).FormatStats()
}

func CreateMPDialer(configDir, endpoint string, ss map[string]*config.ProxyConfig, uc common.UserConfig) (bandit.Dialer, error) {
func CreateMPDialer(configDir, endpoint string, ss map[string]*config.ProxyConfig, uc common.UserConfig) (dialer.ProxyDialer, error) {
if len(ss) < 1 {
return nil, errors.New("no dialers")
}
Expand Down
14 changes: 7 additions & 7 deletions chained/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"sync"
"time"

"github.com/getlantern/flashlight/v7/bandit"
"github.com/getlantern/flashlight/v7/dialer"
)

var (
statsTrackingDialers []bandit.Dialer
statsTrackingDialers []dialer.ProxyDialer

statsMx sync.Mutex

Expand All @@ -22,7 +22,7 @@ var (

// TrackStatsFor enables periodic checkpointing of the given proxies' stats to
// disk.
func TrackStatsFor(dialers []bandit.Dialer, configDir string) {
func TrackStatsFor(dialers []dialer.ProxyDialer, configDir string) {
statsMx.Lock()

statsFilePath := filepath.Join(configDir, "proxystats.csv")
Expand All @@ -37,8 +37,8 @@ func TrackStatsFor(dialers []bandit.Dialer, configDir string) {
})
}

func applyExistingStats(statsFile string, dialers []bandit.Dialer) {
dialersMap := make(map[string]bandit.Dialer, len(dialers))
func applyExistingStats(statsFile string, dialers []dialer.ProxyDialer) {
dialersMap := make(map[string]dialer.ProxyDialer, len(dialers))
for _, d := range dialers {
dialersMap[d.Addr()] = d
}
Expand Down Expand Up @@ -135,7 +135,7 @@ func persistStats(statsFilePath string) {
for {
time.Sleep(15 * time.Second)
statsMx.Lock()
dialers := make([]bandit.Dialer, 0, len(statsTrackingDialers))
dialers := make([]dialer.ProxyDialer, 0, len(statsTrackingDialers))
for _, d := range statsTrackingDialers {
dialers = append(dialers, d)
}
Expand All @@ -144,7 +144,7 @@ func persistStats(statsFilePath string) {
}
}

func doPersistStats(statsFile string, dialers []bandit.Dialer) {
func doPersistStats(statsFile string, dialers []dialer.ProxyDialer) {

out, err := os.OpenFile(fmt.Sprintf("%v.tmp", statsFile), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions chained/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/getlantern/mtime"
"github.com/getlantern/netx"

"github.com/getlantern/flashlight/v7/bandit"
"github.com/getlantern/flashlight/v7/common"
"github.com/getlantern/flashlight/v7/dialer"
"github.com/getlantern/flashlight/v7/domainrouting"
"github.com/getlantern/flashlight/v7/ops"
)
Expand Down Expand Up @@ -75,12 +75,12 @@ type nopCloser struct{}
func (c nopCloser) close() {}

// CreateDialers creates a list of Proxies (bandit.Dialer) with supplied server info.
func CreateDialers(configDir string, proxies map[string]*config.ProxyConfig, uc common.UserConfig) []bandit.Dialer {
func CreateDialers(configDir string, proxies map[string]*config.ProxyConfig, uc common.UserConfig) []dialer.ProxyDialer {
return lo.Values(CreateDialersMap(configDir, proxies, uc))
}

// CreateDialersMap creates a map of Proxies (bandit.Dialer) with supplied server info.
func CreateDialersMap(configDir string, proxies map[string]*config.ProxyConfig, uc common.UserConfig) map[string]bandit.Dialer {
func CreateDialersMap(configDir string, proxies map[string]*config.ProxyConfig, uc common.UserConfig) map[string]dialer.ProxyDialer {
groups := groupByMultipathEndpoint(proxies)

// We parallelize the creation of the dialers because some of them may take
Expand Down Expand Up @@ -120,17 +120,17 @@ func CreateDialersMap(configDir string, proxies map[string]*config.ProxyConfig,
}
}
wg.Wait()
mappedDialers := make(map[string]bandit.Dialer)
mappedDialers := make(map[string]dialer.ProxyDialer)
m.Range(func(k, v interface{}) bool {
mappedDialers[k.(string)] = v.(bandit.Dialer)
mappedDialers[k.(string)] = v.(dialer.ProxyDialer)
return true
})

return mappedDialers
}

// CreateDialer creates a Proxy (balancer.Dialer) with supplied server info.
func CreateDialer(configDir, name string, s *config.ProxyConfig, uc common.UserConfig) (bandit.Dialer, error) {
func CreateDialer(configDir, name string, s *config.ProxyConfig, uc common.UserConfig) (dialer.ProxyDialer, error) {
addr, transport, network, err := extractParams(s)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion chained/quic_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func newQUICImpl(name, addr string, pc *config.ProxyConfig, reportDialCore repor
}

disablePathMTUDiscovery := true
if ptSettingBool(pc, "path_mtu_discovery") == true {
if ptSettingBool(pc, "path_mtu_discovery") {
disablePathMTUDiscovery = false
}

Expand Down
92 changes: 63 additions & 29 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import (
"github.com/getlantern/proxy/v3/filters"
"github.com/getlantern/shortcut"

"github.com/getlantern/flashlight/v7/bandit"
"github.com/getlantern/flashlight/v7/chained"
"github.com/getlantern/flashlight/v7/common"
"github.com/getlantern/flashlight/v7/dialer"
"github.com/getlantern/flashlight/v7/domainrouting"
"github.com/getlantern/flashlight/v7/ops"
"github.com/getlantern/flashlight/v7/stats"
Expand Down Expand Up @@ -105,7 +105,7 @@ type Client struct {
requestTimeout time.Duration

// Dialer that uses multi-armed bandit to select the best proxy to use.
dialer *bandit.BanditDialer
dialer *protectedDialer

proxy proxy.Proxy

Expand Down Expand Up @@ -173,14 +173,15 @@ func NewClient(
if err != nil {
return nil, errors.New("Unable to create rewrite LRU: %v", err)
}
banditDialer, err := bandit.New(bandit.Options{})
if err != nil {
return nil, errors.New("Unable to create bandit: %v", err)
}

client := &Client{
configDir: configDir,
requestTimeout: requestTimeout,
dialer: banditDialer,
configDir: configDir,
requestTimeout: requestTimeout,
dialer: &protectedDialer{
// This is just a placeholder dialer until we're able to fetch the
// actual proxy dialers from the config.
dialer: dialer.NoDialer(),
},
disconnected: disconnected,
proxyAll: proxyAll,
useShortcut: useShortcut,
Expand Down Expand Up @@ -292,7 +293,12 @@ func (client *Client) ListenAndServeHTTP(requestedAddr string, onListeningFn fun
}
return fmt.Errorf("unable to accept connection: %v", err)
}
go client.handle(conn)
go func(conn net.Conn) {
err := client.handle(conn)
if err != nil {
log.Errorf("Error handling connection: %v", err)
}
}(conn)
}
}

Expand Down Expand Up @@ -354,14 +360,15 @@ func (client *Client) Connect(dialCtx context.Context, downstreamReader io.Reade
// Configure updates the client's configuration. Configure can be called
// before or after ListenAndServe, and can be called multiple times. If
// no error occurred, then the new dialers are returned.
func (client *Client) Configure(proxies map[string]*commonconfig.ProxyConfig) []bandit.Dialer {
func (client *Client) Configure(proxies map[string]*commonconfig.ProxyConfig) []dialer.ProxyDialer {
log.Debug("Configure() called")
dialers, dialer, err := client.initDialers(proxies)
if err != nil {
log.Error(err)
return nil
}
client.dialer = dialer
client.dialer.set(dialer)
log.Debug("Reset dialer")
chained.PersistSessionStates(client.configDir)
chained.TrackStatsFor(dialers, client.configDir)
return dialers
Expand Down Expand Up @@ -427,11 +434,7 @@ func (client *Client) dial(ctx context.Context, isConnect bool, network, addr st
// * If the host or port is configured not proxyable, dial directly.
// * If the site is allowed by shortcut, dial directly. If it failed before the deadline, try proxying.
// * Try dial the site directly with 1/5th of the requestTimeout, then try proxying.
func (client *Client) doDial(
op *ops.Op,
ctx context.Context,
isCONNECT bool,
addr string,
func (client *Client) doDial(op *ops.Op, ctx context.Context, isCONNECT bool, addr string,
dnsResolutionMapForDirectDials map[string]string) (net.Conn, error) {

dialDirect := func(ctx context.Context, network, addr string) (net.Conn, error) {
Expand All @@ -449,7 +452,7 @@ func (client *Client) doDial(

dialProxied := func(ctx context.Context, _unused, addr string) (net.Conn, error) {
op.Set("remotely_proxied", true)
proto := bandit.NetworkPersistent
proto := dialer.NetworkPersistent
if isCONNECT {
// UGLY HACK ALERT! In this case, we know we need to send a CONNECT request
// to the chained server. We need to send that request from chained/dialer.go
Expand All @@ -458,13 +461,12 @@ func (client *Client) doDial(
// that is effectively always "tcp" in the end, but we look for this
// special "transport" in the dialer and send a CONNECT request in that
// case.
proto = bandit.NetworkConnect
}
start := time.Now()
conn, err := client.dialer.DialContext(ctx, proto, addr)
if log.IsTraceEnabled() {
log.Tracef("Dialing proxy takes %v for %s", time.Since(start), addr)
proto = dialer.NetworkConnect
}
defer func(start time.Time) {
log.Debugf("Dialing via the proxy takes %v for %s", time.Since(start), addr)
}(time.Now())
conn, err := client.dialer.get().DialContext(ctx, proto, addr)
if conn != nil {
conn = &proxiedConn{conn}
}
Expand Down Expand Up @@ -706,22 +708,35 @@ func errorResponse(_ *filters.ConnectionState, req *http.Request, _ bool, err er

// initDialers takes hosts from cfg.ChainedServers and it uses them to create a
// new dialer. Returns the new dialers.
func (client *Client) initDialers(proxies map[string]*commonconfig.ProxyConfig) ([]bandit.Dialer, *bandit.BanditDialer, error) {
func (client *Client) initDialers(proxies map[string]*commonconfig.ProxyConfig) ([]dialer.ProxyDialer, dialer.Dialer, error) {
if len(proxies) == 0 {
return nil, nil, fmt.Errorf("no chained servers configured, not initializing dialers")
}
log.Debug("initDialers called")
defer func(start time.Time) {
log.Debugf("initDialers took %v", time.Since(start))
}(time.Now())
configDir := client.configDir
chained.PersistSessionStates(configDir)
dialers := chained.CreateDialers(configDir, proxies, client.user)
dialer, err := bandit.New(bandit.Options{
dialer := dialer.New(&dialer.Options{
Dialers: dialers,
OnError: client.onDialError,
OnSuccess: func(dialer bandit.Dialer) {
OnSuccess: func(d dialer.ProxyDialer) {
client.onSucceedingProxy()
client.statsTracker.SetHasSucceedingProxy(true)
countryCode, country, city := d.Location()
previousStats := client.statsTracker.Latest()
if previousStats.CountryCode == "" || previousStats.CountryCode != countryCode {
client.statsTracker.SetActiveProxyLocation(
city,
country,
countryCode,
)
}
},
StatsTracker: client.statsTracker,
})
return dialers, dialer, err
return dialers, dialer, nil
}

// Creates a local server to capture client hello messages from the browser and
Expand All @@ -732,3 +747,22 @@ func (client *Client) cacheClientHellos() {
// Try to snag a hello from the browser.
chained.ActivelyObtainBrowserHello(ctx, client.configDir)
}

// protectedDialer protects a dialer.Dialer with a RWMutex. We can't use an atomic.Value here
// because dialer.Dialer is an interface.
type protectedDialer struct {
sync.RWMutex
dialer dialer.Dialer
}

func (pd *protectedDialer) get() dialer.Dialer {
pd.RLock()
defer pd.RUnlock()
return pd.dialer
}

func (pd *protectedDialer) set(dialer dialer.Dialer) {
pd.Lock()
defer pd.Unlock()
pd.dialer = dialer
}
Loading

0 comments on commit b11f96c

Please sign in to comment.