Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop old dialers when we get a new proxy config #1445

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions dialer/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,37 @@ import (
"io"
"net"
"runtime/debug"
"sync/atomic"
"time"

"github.com/getlantern/golog"
)

var log = golog.LoggerFor("dialer")

var currentDialer atomic.Value

// New creates a new dialer that first tries to connect as quickly as possilbe while also
// optimizing for the fastest dialer.
func New(opts *Options) Dialer {
return NewTwoPhaseDialer(opts, func(opts *Options, existing Dialer) Dialer {
if currentDialer.Load() != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we assuming that there will only ever be one place New is called? (ignoring tests)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only called when we get new configs

log.Debug("Closing existing dialer")
currentDialer.Load().(Dialer).Close()
}
d := NewTwoPhaseDialer(opts, func(opts *Options, existing Dialer) Dialer {
bandit, err := NewBandit(opts)
if err != nil {
log.Errorf("Unable to create bandit: %v", err)
return existing
}
return bandit
})
currentDialer.Store(d)
return d
}

// NoDialer returns a dialer that does nothing. This is useful during startup
// until a real dialer is available.
// when we don't yet have proxies to dial through.
func NoDialer() Dialer {
return &noDialer{}
}
Expand All @@ -48,6 +57,11 @@ func (d *noDialer) DialContext(ctx context.Context, network, addr string) (net.C
return nil, errors.New("no dialer available")
}

func (d *noDialer) Close() {}

// Make sure noDialer implements Dialer
var _ Dialer = (*noDialer)(nil)

const (
// NetworkConnect is a pseudo network name to instruct the dialer to establish
// a CONNECT tunnel to the proxy.
Expand Down Expand Up @@ -90,6 +104,9 @@ type Dialer interface {

// DialContext dials out to the domain or IP address representing a destination site.
DialContext(ctx context.Context, network, addr string) (net.Conn, error)

// Close closes the dialer and cleans up any resources
Close()
}

// hasSucceedingDialer checks whether or not any of the given dialers is able to successfully dial our proxies
Expand Down
33 changes: 27 additions & 6 deletions dialer/fastconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ type fastConnectDialer struct {

next func(*Options, Dialer) Dialer
opts *Options

// Create a channel for stopping connections to dialers
stopCh chan struct{}
}

// Make sure fastConnectDialer implements Dialer
var _ Dialer = (*fastConnectDialer)(nil)

func newFastConnectDialer(opts *Options, next func(opts *Options, existing Dialer) Dialer) *fastConnectDialer {
if opts.OnError == nil {
opts.OnError = func(error, bool) {}
Expand All @@ -45,6 +51,7 @@ func newFastConnectDialer(opts *Options, next func(opts *Options, existing Diale
opts: opts,
next: next,
topDialer: protectedDialer{},
stopCh: make(chan struct{}, 10),
}
}

Expand Down Expand Up @@ -76,6 +83,13 @@ func (fcd *fastConnectDialer) DialContext(ctx context.Context, network, addr str
return conn, err
}

func (fcd *fastConnectDialer) Close() {
// We don't call Stop on the Dialers themselves here because they are likely
// in use by other Dialers, such as the BanditDialer.
// Stop all dialing
fcd.stopCh <- struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make stopCh buffered or just close it so this doesn't block until outerLoop tries to read from it again.

}

func (fcd *fastConnectDialer) onConnected(pd ProxyDialer, connectTime time.Duration) {
log.Debugf("Connected to %v", pd.Name())

Expand All @@ -99,13 +113,20 @@ func (fcd *fastConnectDialer) connectAll(dialers []ProxyDialer) {
return
}
log.Debugf("Dialing all dialers in parallel %#v", dialers)
// Loop until we're connected
for len(fcd.connected.dialers) < 2 {
fcd.parallelDial(dialers)
// Add jitter to avoid thundering herd
time.Sleep(time.Duration(rand.Intn(4000)) * time.Millisecond)
for {
// Loop until we're connected
if len(fcd.connected.dialers) < 2 {
fcd.parallelDial(dialers)
} else {
break
}
select {
case <-fcd.stopCh:
log.Debug("Stopping parallel dialing")
return
case <-time.After(time.Duration(rand.Intn(4000)) * time.Millisecond):
}
}

// At this point, we've tried all of the dialers, and they've all either
// succeeded or failed.

Expand Down
11 changes: 11 additions & 0 deletions dialer/two_phase_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type twoPhaseDialer struct {
activeDialer activeDialer
}

// Make sure twoPhaseDialer implements Dialer
var _ Dialer = (*twoPhaseDialer)(nil)

// NewTwoPhaseDialer creates a new dialer for checking proxy connectivity.
func NewTwoPhaseDialer(opts *Options, next func(opts *Options, existing Dialer) Dialer) Dialer {
log.Debugf("Creating new two phase dialer with %d dialers", len(opts.Dialers))
Expand Down Expand Up @@ -45,6 +48,14 @@ func (ccd *twoPhaseDialer) DialContext(ctx context.Context, network string, addr
return td.DialContext(ctx, network, addr)
}

// Close implements Dialer.
func (ccd *twoPhaseDialer) Close() {
td := ccd.activeDialer.get()
if td != nil {
td.Close()
}
}

// protectedDialer protects a dialer.Dialer with a RWMutex. We can't use an atomic.Value here
// because Dialer is an interface.
type activeDialer struct {
Expand Down
Loading