Skip to content

Commit

Permalink
fix: making Ready returns a <- chan error
Browse files Browse the repository at this point in the history
  • Loading branch information
WendelHime committed Nov 19, 2024
1 parent 3638a4b commit d35e60c
Show file tree
Hide file tree
Showing 20 changed files with 93 additions and 110 deletions.
23 changes: 16 additions & 7 deletions bandit/bandit.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,24 @@ func (o *BanditDialer) chooseDialerForDomain(network, addr string) (Dialer, int)
chosenArm := o.bandit.SelectArm(rand.Float64())
var dialer Dialer
notAllFailing := hasNotFailing(o.dialers)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for i := 0; i < (len(o.dialers) * 2); i++ {
dialer = o.dialers[chosenArm]
ready, err := dialer.IsReady()
if err != nil {
// If the dialer won't be ready since it returned an error, we need to
// choose another dialer
select {
case err := <-dialer.Ready():
if err != nil {
log.Errorf("dialer %q initialization failed: %w", dialer.Name(), err)
chosenArm = differentArm(chosenArm, len(o.dialers))
continue
}
case <-ctx.Done():
log.Errorf("dialer %q initialization timed out", dialer.Name())
chosenArm = differentArm(chosenArm, len(o.dialers))
continue
}

if (dialer.ConsecFailures() > 0 && notAllFailing) || !dialer.SupportsAddr(network, addr) || !ready {
if (dialer.ConsecFailures() > 0 && notAllFailing) || !dialer.SupportsAddr(network, addr) {
// If the chosen dialer has consecutive failures and there are other
// dialers that are succeeding, we should choose a different dialer.
//
Expand Down Expand Up @@ -363,8 +370,10 @@ type Dialer interface {
// connections created via this dialer.
DataRecv() uint64

// IsReady indicates when the dialer is ready for dialing
IsReady() (bool, error)
// Ready returns a channel which will have a value on it only when initialization
// of the dialer is complete. If initialization failed, the channel will have a non-nil
// error value.
Ready() <-chan error

// Stop stops background processing for this Dialer.
Stop()
Expand Down
4 changes: 2 additions & 2 deletions bandit/bandit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ type tcpConnDialer struct {
server net.Conn
}

func (*tcpConnDialer) IsReady() (bool, error) {
return true, nil
func (*tcpConnDialer) Ready() <-chan error {
return nil
}

// DialProxy implements Dialer.
Expand Down
47 changes: 18 additions & 29 deletions bypass/bypass.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ type bypass struct {
// Start sends periodic traffic to the bypass server. The client periodically sends traffic to the server both via
// domain fronting and proxying to determine if proxies are blocked.
func Start(listen func(func(map[string]*commonconfig.ProxyConfig, config.Source)), configDir string, userConfig common.UserConfig) func() {
mrand.Seed(time.Now().UnixNano())
b := &bypass{
infos: make(map[string]*commonconfig.ProxyConfig),
proxies: make([]*proxy, 0),
Expand All @@ -70,8 +69,6 @@ func Start(listen func(func(map[string]*commonconfig.ProxyConfig, config.Source)
}

func (b *bypass) OnProxies(infos map[string]*commonconfig.ProxyConfig, configDir string, userConfig common.UserConfig) {
b.mxProxies.Lock()
defer b.mxProxies.Unlock()
b.reset()

// Some pluggable transports don't support bypass, filter these out here.
Expand All @@ -91,18 +88,11 @@ func (b *bypass) OnProxies(infos map[string]*commonconfig.ProxyConfig, configDir
continue
}

// if dialer is not ready, try to load it async
ready, err := dialer.IsReady()
if err != nil {
log.Errorf("dialer %q isn't ready and returned an error: %w", name, err)
continue
}
if !ready {
log.Debugf("dialer %q is not ready, starting in background", name)
readyCh := dialer.Ready()
if readyCh != nil {
go b.loadProxyAsync(name, config, configDir, userConfig, dialer)
continue
}

b.startProxy(name, config, configDir, userConfig, dialer)
}
}
Expand All @@ -112,35 +102,32 @@ func (b *bypass) loadProxyAsync(proxyName string, config *commonconfig.ProxyConf
defer cancel()
readyChan := make(chan struct{})
go func() {
for {
select {
case <-ctx.Done():
select {
case err := <-dialer.Ready():
if err != nil {
log.Errorf("dialer %q initialization failed: %w", proxyName, err)
cancel()
return
default:
time.Sleep(15 * time.Second)
ready, err := dialer.IsReady()
if err != nil {
log.Errorf("dialer %q isn't ready and returned an error: %w", proxyName, err)
cancel()
return
}
if ready {
b.startProxy(proxyName, config, configDir, userConfig, dialer)
readyChan <- struct{}{}
return
}
}
b.startProxy(proxyName, config, configDir, userConfig, dialer)
readyChan <- struct{}{}
return
case <-ctx.Done():
log.Errorf("proxy %q took to long to start: %w", proxyName, ctx.Err())
return
}
}()
select {
case <-readyChan:
log.Debugf("proxy ready!")
case <-ctx.Done():
log.Errorf("proxy %q took to long to get ready", proxyName)
log.Errorf("proxy %q took to long to start: %w", proxyName, ctx.Err())
}
}

func (b *bypass) startProxy(proxyName string, config *commonconfig.ProxyConfig, configDir string, userConfig common.UserConfig, dialer bandit.Dialer) {
b.mxProxies.Lock()
defer b.mxProxies.Unlock()
pc := chained.CopyConfig(config)
// Set the name in the info since we know it here.
pc.Name = proxyName
Expand All @@ -164,6 +151,8 @@ func (b *bypass) newProxy(name string, pc *commonconfig.ProxyConfig, configDir s
}

func (b *bypass) reset() {
b.mxProxies.Lock()
defer b.mxProxies.Unlock()
for _, v := range b.proxies {
v.stop()
}
Expand Down
4 changes: 2 additions & 2 deletions chained/algeneva_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func (a *algenevaImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, er
return conn, nil
}

func (*algenevaImpl) isReady() (bool, error) {
return true, nil
func (*algenevaImpl) ready() <-chan error {
return nil
}

// algenevaDialer is a algeneva.Dialer wrapper around a reportDialCore. algeneva accepts an optional
Expand Down
4 changes: 2 additions & 2 deletions chained/broflake_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func newBroflakeImpl(pc *config.ProxyConfig, reportDialCore reportDialCoreFn) (p
}, nil
}

func (*broflakeImpl) isReady() (bool, error) {
return true, nil
func (*broflakeImpl) ready() <-chan error {
return nil
}

func (b *broflakeImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) {
Expand Down
4 changes: 2 additions & 2 deletions chained/chained_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func (impl *testImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, err
return impl.d(ctx)
}

func (*testImpl) isReady() (bool, error) {
return true, nil
func (*testImpl) ready() <-chan error {
return nil
}

func newDialer(dialServer func(ctx context.Context) (net.Conn, error)) (func(network, addr string) (net.Conn, error), error) {
Expand Down
4 changes: 2 additions & 2 deletions chained/http_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ func (impl *httpImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, err
return impl.dialCore(op, ctx, impl.addr)
}

func (*httpImpl) isReady() (bool, error) {
return true, nil
func (*httpImpl) ready() <-chan error {
return nil
}
4 changes: 2 additions & 2 deletions chained/https_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func (impl *httpsImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, er
return result.Conn, nil
}

func (*httpsImpl) isReady() (bool, error) {
return true, nil
func (*httpsImpl) ready() <-chan error {
return nil
}

func timeoutFor(ctx context.Context) time.Duration {
Expand Down
4 changes: 2 additions & 2 deletions chained/multipath.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (impl *multipathImpl) FormatStats() []string {
return impl.dialer.(multipath.Stats).FormatStats()
}

func (*multipathImpl) isReady() (bool, error) {
return true, nil
func (*multipathImpl) ready() <-chan error {
return nil
}

func CreateMPDialer(configDir, endpoint string, ss map[string]*config.ProxyConfig, uc common.UserConfig) (bandit.Dialer, error) {
Expand Down
4 changes: 2 additions & 2 deletions chained/multiplexed_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func (impl *multiplexedImpl) dialServer(op *ops.Op, ctx context.Context) (net.Co
return impl.multiplexedDial(ctx, "", "")
}

func (impl *multiplexedImpl) isReady() (bool, error) {
return true, nil
func (impl *multiplexedImpl) ready() <-chan error {
return nil
}

// createMultiplexedProtocol configures a cmux multiplexing protocol
Expand Down
4 changes: 2 additions & 2 deletions chained/preconnecting_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func (pd *preconnectingDialer) dialServer(op *ops.Op, ctx context.Context) (conn
}
}

func (*preconnectingDialer) isReady() (bool, error) {
return true, nil
func (*preconnectingDialer) ready() <-chan error {
return nil
}

func (pd *preconnectingDialer) preconnectIfNecessary(op *ops.Op) {
Expand Down
6 changes: 3 additions & 3 deletions chained/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type proxyImpl interface {
// close releases the resources associated with the implementation, if any.
close()
// isReady returns if the implementation is ready to dial
isReady() (bool, error)
ready() <-chan error
}

// nopCloser is a mixin to implement a do-nothing close() method of proxyImpl.
Expand Down Expand Up @@ -587,6 +587,6 @@ func splitClientHello(hello []byte) [][]byte {
return splits
}

func (p *proxy) IsReady() (bool, error) {
return p.impl.isReady()
func (p *proxy) Ready() <-chan error {
return p.impl.ready()
}
4 changes: 2 additions & 2 deletions chained/quic_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ func (impl *quicImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, err
})
}

func (*quicImpl) isReady() (bool, error) {
return true, nil
func (*quicImpl) ready() <-chan error {
return nil
}
4 changes: 2 additions & 2 deletions chained/shadowsocks_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func (impl *shadowsocksImpl) dialServer(op *ops.Op, ctx context.Context) (net.Co
})
}

func (*shadowsocksImpl) isReady() (bool, error) {
return true, nil
func (*shadowsocksImpl) ready() <-chan error {
return nil
}

// generateUpstream() creates a marker upstream address. This isn't an
Expand Down
4 changes: 2 additions & 2 deletions chained/starbridge_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func newStarbridgeImpl(name, addr string, pc *config.ProxyConfig, reportDialCore
}, nil
}

func (*starbridge) isReady() (bool, error) {
return true, nil
func (*starbridge) ready() <-chan error {
return nil
}

// Adapted from https://github.com/OperatorFoundation/Starbridge-go/blob/v3.0.12/Starbridge/v3/starbridge.go#L237-L253
Expand Down
4 changes: 2 additions & 2 deletions chained/tlsmasq_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func newTLSMasqImpl(configDir, name, addr string, pc *config.ProxyConfig, uc com
}, nil
}

func (*tlsMasqImpl) isReady() (bool, error) {
return true, nil
func (*tlsMasqImpl) ready() <-chan error {
return nil
}

func decodeUint16(s string) (uint16, error) {
Expand Down
Loading

0 comments on commit d35e60c

Please sign in to comment.