From d35e60c9d564358983bc6e3e59d02bd9e5775ceb Mon Sep 17 00:00:00 2001 From: WendelHime <6754291+WendelHime@users.noreply.github.com> Date: Tue, 19 Nov 2024 14:01:45 -0300 Subject: [PATCH] fix: making Ready returns a <- chan error --- bandit/bandit.go | 23 ++++++++---- bandit/bandit_test.go | 4 +-- bypass/bypass.go | 47 ++++++++++-------------- chained/algeneva_impl.go | 4 +-- chained/broflake_impl.go | 4 +-- chained/chained_test.go | 4 +-- chained/http_impl.go | 4 +-- chained/https_impl.go | 4 +-- chained/multipath.go | 4 +-- chained/multiplexed_impl.go | 4 +-- chained/preconnecting_dialer.go | 4 +-- chained/proxy.go | 6 ++-- chained/quic_impl.go | 4 +-- chained/shadowsocks_impl.go | 4 +-- chained/starbridge_impl.go | 4 +-- chained/tlsmasq_impl.go | 4 +-- chained/water_impl.go | 63 +++++++++++++-------------------- chained/water_impl_test.go | 4 +-- chained/wss_impl.go | 4 +-- client/client_test.go | 4 +-- 20 files changed, 93 insertions(+), 110 deletions(-) diff --git a/bandit/bandit.go b/bandit/bandit.go index 60ccc985b..7f93c14b8 100644 --- a/bandit/bandit.go +++ b/bandit/bandit.go @@ -202,17 +202,24 @@ func (o *BanditDialer) chooseDialerForDomain(network, addr string) (Dialer, int) chosenArm := o.bandit.SelectArm(rand.Float64()) var dialer Dialer notAllFailing := hasNotFailing(o.dialers) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() for i := 0; i < (len(o.dialers) * 2); i++ { dialer = o.dialers[chosenArm] - ready, err := dialer.IsReady() - if err != nil { - // If the dialer won't be ready since it returned an error, we need to - // choose another dialer + select { + case err := <-dialer.Ready(): + if err != nil { + log.Errorf("dialer %q initialization failed: %w", dialer.Name(), err) + chosenArm = differentArm(chosenArm, len(o.dialers)) + continue + } + case <-ctx.Done(): + log.Errorf("dialer %q initialization timed out", dialer.Name()) chosenArm = differentArm(chosenArm, len(o.dialers)) continue } - if (dialer.ConsecFailures() > 0 && notAllFailing) || !dialer.SupportsAddr(network, addr) || !ready { + if (dialer.ConsecFailures() > 0 && notAllFailing) || !dialer.SupportsAddr(network, addr) { // If the chosen dialer has consecutive failures and there are other // dialers that are succeeding, we should choose a different dialer. // @@ -363,8 +370,10 @@ type Dialer interface { // connections created via this dialer. DataRecv() uint64 - // IsReady indicates when the dialer is ready for dialing - IsReady() (bool, error) + // Ready returns a channel which will have a value on it only when initialization + // of the dialer is complete. If initialization failed, the channel will have a non-nil + // error value. + Ready() <-chan error // Stop stops background processing for this Dialer. Stop() diff --git a/bandit/bandit_test.go b/bandit/bandit_test.go index 41e5f6c82..9b95e3fb7 100644 --- a/bandit/bandit_test.go +++ b/bandit/bandit_test.go @@ -333,8 +333,8 @@ type tcpConnDialer struct { server net.Conn } -func (*tcpConnDialer) IsReady() (bool, error) { - return true, nil +func (*tcpConnDialer) Ready() <-chan error { + return nil } // DialProxy implements Dialer. diff --git a/bypass/bypass.go b/bypass/bypass.go index c20e2079a..608db4c86 100644 --- a/bypass/bypass.go +++ b/bypass/bypass.go @@ -58,7 +58,6 @@ type bypass struct { // Start sends periodic traffic to the bypass server. The client periodically sends traffic to the server both via // domain fronting and proxying to determine if proxies are blocked. func Start(listen func(func(map[string]*commonconfig.ProxyConfig, config.Source)), configDir string, userConfig common.UserConfig) func() { - mrand.Seed(time.Now().UnixNano()) b := &bypass{ infos: make(map[string]*commonconfig.ProxyConfig), proxies: make([]*proxy, 0), @@ -70,8 +69,6 @@ func Start(listen func(func(map[string]*commonconfig.ProxyConfig, config.Source) } func (b *bypass) OnProxies(infos map[string]*commonconfig.ProxyConfig, configDir string, userConfig common.UserConfig) { - b.mxProxies.Lock() - defer b.mxProxies.Unlock() b.reset() // Some pluggable transports don't support bypass, filter these out here. @@ -91,18 +88,11 @@ func (b *bypass) OnProxies(infos map[string]*commonconfig.ProxyConfig, configDir continue } - // if dialer is not ready, try to load it async - ready, err := dialer.IsReady() - if err != nil { - log.Errorf("dialer %q isn't ready and returned an error: %w", name, err) - continue - } - if !ready { - log.Debugf("dialer %q is not ready, starting in background", name) + readyCh := dialer.Ready() + if readyCh != nil { go b.loadProxyAsync(name, config, configDir, userConfig, dialer) continue } - b.startProxy(name, config, configDir, userConfig, dialer) } } @@ -112,35 +102,32 @@ func (b *bypass) loadProxyAsync(proxyName string, config *commonconfig.ProxyConf defer cancel() readyChan := make(chan struct{}) go func() { - for { - select { - case <-ctx.Done(): + select { + case err := <-dialer.Ready(): + if err != nil { + log.Errorf("dialer %q initialization failed: %w", proxyName, err) + cancel() return - default: - time.Sleep(15 * time.Second) - ready, err := dialer.IsReady() - if err != nil { - log.Errorf("dialer %q isn't ready and returned an error: %w", proxyName, err) - cancel() - return - } - if ready { - b.startProxy(proxyName, config, configDir, userConfig, dialer) - readyChan <- struct{}{} - return - } } + b.startProxy(proxyName, config, configDir, userConfig, dialer) + readyChan <- struct{}{} + return + case <-ctx.Done(): + log.Errorf("proxy %q took to long to start: %w", proxyName, ctx.Err()) + return } }() select { case <-readyChan: log.Debugf("proxy ready!") case <-ctx.Done(): - log.Errorf("proxy %q took to long to get ready", proxyName) + log.Errorf("proxy %q took to long to start: %w", proxyName, ctx.Err()) } } func (b *bypass) startProxy(proxyName string, config *commonconfig.ProxyConfig, configDir string, userConfig common.UserConfig, dialer bandit.Dialer) { + b.mxProxies.Lock() + defer b.mxProxies.Unlock() pc := chained.CopyConfig(config) // Set the name in the info since we know it here. pc.Name = proxyName @@ -164,6 +151,8 @@ func (b *bypass) newProxy(name string, pc *commonconfig.ProxyConfig, configDir s } func (b *bypass) reset() { + b.mxProxies.Lock() + defer b.mxProxies.Unlock() for _, v := range b.proxies { v.stop() } diff --git a/chained/algeneva_impl.go b/chained/algeneva_impl.go index 5d21d62f8..f9d0c63a9 100644 --- a/chained/algeneva_impl.go +++ b/chained/algeneva_impl.go @@ -73,8 +73,8 @@ func (a *algenevaImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, er return conn, nil } -func (*algenevaImpl) isReady() (bool, error) { - return true, nil +func (*algenevaImpl) ready() <-chan error { + return nil } // algenevaDialer is a algeneva.Dialer wrapper around a reportDialCore. algeneva accepts an optional diff --git a/chained/broflake_impl.go b/chained/broflake_impl.go index db508a9dd..031f4f18f 100644 --- a/chained/broflake_impl.go +++ b/chained/broflake_impl.go @@ -55,8 +55,8 @@ func newBroflakeImpl(pc *config.ProxyConfig, reportDialCore reportDialCoreFn) (p }, nil } -func (*broflakeImpl) isReady() (bool, error) { - return true, nil +func (*broflakeImpl) ready() <-chan error { + return nil } func (b *broflakeImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) { diff --git a/chained/chained_test.go b/chained/chained_test.go index 0055650e7..42e62948e 100644 --- a/chained/chained_test.go +++ b/chained/chained_test.go @@ -51,8 +51,8 @@ func (impl *testImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, err return impl.d(ctx) } -func (*testImpl) isReady() (bool, error) { - return true, nil +func (*testImpl) ready() <-chan error { + return nil } func newDialer(dialServer func(ctx context.Context) (net.Conn, error)) (func(network, addr string) (net.Conn, error), error) { diff --git a/chained/http_impl.go b/chained/http_impl.go index 35beebe84..7fc13b823 100644 --- a/chained/http_impl.go +++ b/chained/http_impl.go @@ -23,6 +23,6 @@ func (impl *httpImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, err return impl.dialCore(op, ctx, impl.addr) } -func (*httpImpl) isReady() (bool, error) { - return true, nil +func (*httpImpl) ready() <-chan error { + return nil } diff --git a/chained/https_impl.go b/chained/https_impl.go index c3ed13b89..c3e4d306d 100644 --- a/chained/https_impl.go +++ b/chained/https_impl.go @@ -106,8 +106,8 @@ func (impl *httpsImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, er return result.Conn, nil } -func (*httpsImpl) isReady() (bool, error) { - return true, nil +func (*httpsImpl) ready() <-chan error { + return nil } func timeoutFor(ctx context.Context) time.Duration { diff --git a/chained/multipath.go b/chained/multipath.go index 3630da3c6..a3a931740 100644 --- a/chained/multipath.go +++ b/chained/multipath.go @@ -49,8 +49,8 @@ func (impl *multipathImpl) FormatStats() []string { return impl.dialer.(multipath.Stats).FormatStats() } -func (*multipathImpl) isReady() (bool, error) { - return true, nil +func (*multipathImpl) ready() <-chan error { + return nil } func CreateMPDialer(configDir, endpoint string, ss map[string]*config.ProxyConfig, uc common.UserConfig) (bandit.Dialer, error) { diff --git a/chained/multiplexed_impl.go b/chained/multiplexed_impl.go index d43375385..46c56dc01 100644 --- a/chained/multiplexed_impl.go +++ b/chained/multiplexed_impl.go @@ -51,8 +51,8 @@ func (impl *multiplexedImpl) dialServer(op *ops.Op, ctx context.Context) (net.Co return impl.multiplexedDial(ctx, "", "") } -func (impl *multiplexedImpl) isReady() (bool, error) { - return true, nil +func (impl *multiplexedImpl) ready() <-chan error { + return nil } // createMultiplexedProtocol configures a cmux multiplexing protocol diff --git a/chained/preconnecting_dialer.go b/chained/preconnecting_dialer.go index ade558d5e..ca2254b3c 100644 --- a/chained/preconnecting_dialer.go +++ b/chained/preconnecting_dialer.go @@ -73,8 +73,8 @@ func (pd *preconnectingDialer) dialServer(op *ops.Op, ctx context.Context) (conn } } -func (*preconnectingDialer) isReady() (bool, error) { - return true, nil +func (*preconnectingDialer) ready() <-chan error { + return nil } func (pd *preconnectingDialer) preconnectIfNecessary(op *ops.Op) { diff --git a/chained/proxy.go b/chained/proxy.go index 05e10f2d3..8740b8814 100644 --- a/chained/proxy.go +++ b/chained/proxy.go @@ -68,7 +68,7 @@ type proxyImpl interface { // close releases the resources associated with the implementation, if any. close() // isReady returns if the implementation is ready to dial - isReady() (bool, error) + ready() <-chan error } // nopCloser is a mixin to implement a do-nothing close() method of proxyImpl. @@ -587,6 +587,6 @@ func splitClientHello(hello []byte) [][]byte { return splits } -func (p *proxy) IsReady() (bool, error) { - return p.impl.isReady() +func (p *proxy) Ready() <-chan error { + return p.impl.ready() } diff --git a/chained/quic_impl.go b/chained/quic_impl.go index 663a9470f..7cea0dbc7 100644 --- a/chained/quic_impl.go +++ b/chained/quic_impl.go @@ -72,6 +72,6 @@ func (impl *quicImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, err }) } -func (*quicImpl) isReady() (bool, error) { - return true, nil +func (*quicImpl) ready() <-chan error { + return nil } diff --git a/chained/shadowsocks_impl.go b/chained/shadowsocks_impl.go index 9c2ce94cd..85d132e5c 100644 --- a/chained/shadowsocks_impl.go +++ b/chained/shadowsocks_impl.go @@ -140,8 +140,8 @@ func (impl *shadowsocksImpl) dialServer(op *ops.Op, ctx context.Context) (net.Co }) } -func (*shadowsocksImpl) isReady() (bool, error) { - return true, nil +func (*shadowsocksImpl) ready() <-chan error { + return nil } // generateUpstream() creates a marker upstream address. This isn't an diff --git a/chained/starbridge_impl.go b/chained/starbridge_impl.go index d1eda762b..2736da74c 100644 --- a/chained/starbridge_impl.go +++ b/chained/starbridge_impl.go @@ -67,8 +67,8 @@ func newStarbridgeImpl(name, addr string, pc *config.ProxyConfig, reportDialCore }, nil } -func (*starbridge) isReady() (bool, error) { - return true, nil +func (*starbridge) ready() <-chan error { + return nil } // Adapted from https://github.com/OperatorFoundation/Starbridge-go/blob/v3.0.12/Starbridge/v3/starbridge.go#L237-L253 diff --git a/chained/tlsmasq_impl.go b/chained/tlsmasq_impl.go index 9f830b37b..f23b1c229 100644 --- a/chained/tlsmasq_impl.go +++ b/chained/tlsmasq_impl.go @@ -111,8 +111,8 @@ func newTLSMasqImpl(configDir, name, addr string, pc *config.ProxyConfig, uc com }, nil } -func (*tlsMasqImpl) isReady() (bool, error) { - return true, nil +func (*tlsMasqImpl) ready() <-chan error { + return nil } func decodeUint16(s string) (uint16, error) { diff --git a/chained/water_impl.go b/chained/water_impl.go index 2940e220e..9e2f9c789 100644 --- a/chained/water_impl.go +++ b/chained/water_impl.go @@ -25,8 +25,7 @@ type waterImpl struct { dialer water.Dialer errLoadingWASM error readyMutex sync.Locker - ready bool - finishedToLoad chan struct{} + readyChan chan error } var httpClient *http.Client @@ -42,44 +41,46 @@ func newWaterImpl(dir, addr string, pc *config.ProxyConfig, reportDialCore repor d := &waterImpl{ raddr: addr, reportDialCore: reportDialCore, - finishedToLoad: make(chan struct{}), + readyChan: make(chan error), readyMutex: new(sync.Mutex), } b64WASM := ptSetting(pc, "water_wasm") if b64WASM != "" { go func() { - defer d.finishedLoading() wasm, err := base64.StdEncoding.DecodeString(b64WASM) if err != nil { d.errLoadingWASM = log.Errorf("failed to decode water wasm: %w", err) + d.readyChan <- d.errLoadingWASM return } d.dialer, err = createDialer(ctx, wasm, transport) if err != nil { d.errLoadingWASM = log.Errorf("failed to create dialer: %w", err) + d.readyChan <- d.errLoadingWASM return } - d.setReady() + d.readyChan <- nil }() return d, nil } if wasmAvailableAt != "" { go func() { - defer d.finishedLoading() log.Debugf("Loading WASM for %q. If not available, it should try to download from the following URLs: %+v. The file should be available at: %q", transport, strings.Split(wasmAvailableAt, ","), dir) r, err := d.loadWASM(ctx, transport, dir, wasmAvailableAt) if err != nil { d.errLoadingWASM = log.Errorf("failed to read wasm: %w", err) + d.readyChan <- d.errLoadingWASM return } defer r.Close() b, err := io.ReadAll(r) if err != nil { d.errLoadingWASM = log.Errorf("failed to load wasm bytes: %w", err) + d.readyChan <- d.errLoadingWASM return } @@ -88,9 +89,10 @@ func newWaterImpl(dir, addr string, pc *config.ProxyConfig, reportDialCore repor d.dialer, err = createDialer(ctx, b, transport) if err != nil { d.errLoadingWASM = log.Errorf("failed to create dialer: %w", err) + d.readyChan <- d.errLoadingWASM return } - d.setReady() + d.readyChan <- nil }() } @@ -122,12 +124,6 @@ func (d *waterImpl) loadWASM(ctx context.Context, transport string, dir string, return r, nil } -func (d *waterImpl) finishedLoading() { - select { - case d.finishedToLoad <- struct{}{}: - } -} - func createDialer(ctx context.Context, wasm []byte, transport string) (water.Dialer, error) { cfg := &water.Config{ TransportModuleBin: wasm, @@ -141,40 +137,29 @@ func createDialer(ctx context.Context, wasm []byte, transport string) (water.Dia return dialer, nil } -func (d *waterImpl) isReady() (bool, error) { - d.readyMutex.Lock() - defer d.readyMutex.Unlock() - return d.ready && d.dialer != nil, d.errLoadingWASM -} - -func (d *waterImpl) setReady() { - d.readyMutex.Lock() - defer d.readyMutex.Unlock() - d.ready = true +func (d *waterImpl) close() { + close(d.readyChan) } -func (d *waterImpl) close() { - close(d.finishedToLoad) +func (d *waterImpl) ready() <-chan error { + return d.readyChan } func (d *waterImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) { return d.reportDialCore(op, func() (net.Conn, error) { // if dialer is not ready, wait until WASM is downloaded or context timeout - ready, err := d.isReady() - if err != nil { - return nil, log.Errorf("failed to load WASM for dialer: %w", err) - } - - if !ready { - select { - case _, ok := <-d.finishedToLoad: - if !ok { - return nil, log.Error("dialer closed") - } - log.Debug("download finished!") - case <-ctx.Done(): - return nil, log.Errorf("context completed while waiting for WASM download: %w", ctx.Err()) + select { + case err, ok := <-d.readyChan: + if !ok { + return nil, log.Error("dialer closed") } + if err != nil { + return nil, log.Errorf("failed to load dialer: %w", err) + } + + log.Debug("dialer ready!") + case <-ctx.Done(): + return nil, log.Errorf("context completed while waiting for WASM download: %w", ctx.Err()) } // TODO: At water 0.7.0 (currently), the library is hanging onto the dial context diff --git a/chained/water_impl_test.go b/chained/water_impl_test.go index 4bcdb00b2..b234cab66 100644 --- a/chained/water_impl_test.go +++ b/chained/water_impl_test.go @@ -69,7 +69,7 @@ func TestNewWaterImpl(t *testing.T) { assert: func(t *testing.T, actual *waterImpl, err error) { require.NoError(t, err) require.NotNil(t, actual) - <-actual.finishedToLoad + <-actual.readyChan require.NotNil(t, actual.dialer) assert.NotNil(t, actual.reportDialCore) }, @@ -106,7 +106,7 @@ func TestNewWaterImpl(t *testing.T) { require.NoError(t, err) require.NotNil(t, actual) assert.NoError(t, actual.errLoadingWASM) - <-actual.finishedToLoad + <-actual.readyChan assert.NotNil(t, actual.dialer) assert.NotNil(t, actual.reportDialCore) }, diff --git a/chained/wss_impl.go b/chained/wss_impl.go index a9e71dde5..9aaf9586a 100644 --- a/chained/wss_impl.go +++ b/chained/wss_impl.go @@ -63,8 +63,8 @@ func (impl *wssImpl) close() { impl.dialer.Close() } -func (*wssImpl) isReady() (bool, error) { - return true, nil +func (*wssImpl) ready() <-chan error { + return nil } func (impl *wssImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) { diff --git a/client/client_test.go b/client/client_test.go index 5adb2f455..a28f75d56 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -434,8 +434,8 @@ func (d *testDialer) DialProxy(ctx context.Context) (net.Conn, error) { return nil, fmt.Errorf("Not implemented") } -func (d *testDialer) IsReady() (bool, error) { - return true, nil +func (d *testDialer) Ready() <-chan error { + return nil } // Name returns the name for this Dialer