Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding IsReady to dialer interface #1436

Merged
merged 42 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
fb4a378
feat: adding IsReady to BanditDialer and isReady to proxy implementat…
WendelHime Nov 6, 2024
41c2f11
feat: only select dialer when ready to dial
WendelHime Nov 6, 2024
2b48657
fix: adding IsReady to test dialer
WendelHime Nov 6, 2024
55ffa64
fix: loading b64 wasm in background so channel doesn't lock and only …
WendelHime Nov 6, 2024
d39b38f
fix: removing err declaration
WendelHime Nov 6, 2024
9f0c6f0
fix: renaming channel and waiting dialer to be available before check…
WendelHime Nov 6, 2024
bd1fcf4
feat: if dialer is not ready, try to add to bypass again in 60s
WendelHime Nov 6, 2024
45e08c8
feat: adding log debug and continue when loading dialer in background
WendelHime Nov 6, 2024
76b38eb
fix: adopt suggestions and make IsReady return an error
WendelHime Nov 8, 2024
38dba03
feat: verifying IsReady error at bandit before checking if it's ready…
WendelHime Nov 8, 2024
f67faab
fix: updating bypass function for using and checking context timeout,…
WendelHime Nov 11, 2024
8555f52
feat: adding locker at package lever for avoiding downloading WASM fi…
WendelHime Nov 11, 2024
4812992
feat: add support for water multiplex
WendelHime Nov 13, 2024
2771c4c
fix: typo
WendelHime Nov 18, 2024
ddc0ce7
fix: checking error at bypass before using ready
WendelHime Nov 18, 2024
fa1391e
fix: canceling context if dialer IsReady returns an error
WendelHime Nov 18, 2024
5181511
fix: start proxy if dialer is ready
WendelHime Nov 18, 2024
681ae9e
fix: removing retry atomic bool and replace by select statement
WendelHime Nov 18, 2024
eeb9656
fix: removing if statement for checking if thereadyChain is closed an…
WendelHime Nov 18, 2024
2079bb0
fix: removing close calls for finishedToLoad; updated dialer closed m…
WendelHime Nov 19, 2024
3638a4b
fix: using a sync.Map for handling lockers per WATER transport
WendelHime Nov 19, 2024
d35e60c
fix: making Ready returns a <- chan error
WendelHime Nov 19, 2024
43dd35d
fix: checking if ready chan is nil
WendelHime Nov 19, 2024
b82c397
Merge branch 'main' of github.com:getlantern/flashlight into feat/144…
WendelHime Nov 19, 2024
04bb1c2
fix: adding on success to dialer
WendelHime Nov 19, 2024
810a92a
fix: deleting bandit package
WendelHime Nov 19, 2024
6aa256a
fix: removing errLoadingWASM var and send it directly to the channel
WendelHime Nov 19, 2024
e093e69
Merge branch 'main' of github.com:getlantern/flashlight into feat/144…
WendelHime Nov 25, 2024
a5e1ef8
feat: go mod tidy
WendelHime Nov 25, 2024
fafdfb9
fix: replace log by logger
WendelHime Nov 25, 2024
8f897ac
fix: add comment for making explicit IsReady can return a nil value i…
WendelHime Nov 26, 2024
8eba259
fix: replace sync.Map usage by creating water WASM lock map and a loc…
WendelHime Nov 26, 2024
7721192
fix: set default behavior instead of waiting for dialer to be ready a…
WendelHime Nov 26, 2024
8b1762d
fix: make water wasm map
WendelHime Nov 26, 2024
05a45ed
feat: broadcast ready status to all chan listeners
WendelHime Nov 28, 2024
6063f51
fix: update bandit message
WendelHime Nov 28, 2024
e281571
fix: using a buffered channel so we can simplify logic (replacing go …
WendelHime Nov 29, 2024
52d2b46
Merge branch 'main' of github.com:getlantern/flashlight into feat/144…
WendelHime Dec 3, 2024
337278f
fix: updating types.proto
WendelHime Dec 3, 2024
3866761
fix: adding fields to apipib legacy and update water impl connect opt…
WendelHime Dec 3, 2024
98d477b
fix: updating test
WendelHime Dec 3, 2024
8b3f92f
Merge branch 'main' of github.com:getlantern/flashlight into feat/144…
WendelHime Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion bandit/bandit.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,14 @@ func (o *BanditDialer) chooseDialerForDomain(network, addr string) (Dialer, int)
notAllFailing := hasNotFailing(o.dialers)
for i := 0; i < (len(o.dialers) * 2); i++ {
dialer = o.dialers[chosenArm]
if (dialer.ConsecFailures() > 0 && notAllFailing) || !dialer.SupportsAddr(network, addr) {
if (dialer.ConsecFailures() > 0 && notAllFailing) || !dialer.SupportsAddr(network, addr) || !dialer.IsReady() {
// If the chosen dialer has consecutive failures and there are other
// dialers that are succeeding, we should choose a different dialer.
//
// If the chosen dialer does not support the address, we should also
// choose a different dialer.
//
// If the dialer isn't ready we should also choose a another dialer.
WendelHime marked this conversation as resolved.
Show resolved Hide resolved
chosenArm = differentArm(chosenArm, len(o.dialers))
continue
}
Expand Down Expand Up @@ -353,6 +355,9 @@ type Dialer interface {
// connections created via this dialer.
DataRecv() uint64

// IsReady indicates when the dialer is ready for dialing
IsReady() bool

// Stop stops background processing for this Dialer.
Stop()

Expand Down
4 changes: 4 additions & 0 deletions bandit/bandit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ type tcpConnDialer struct {
server net.Conn
}

func (*tcpConnDialer) IsReady() bool {
return true
}

// DialProxy implements Dialer.
func (t *tcpConnDialer) DialProxy(ctx context.Context) (net.Conn, error) {
if t.shouldFail {
Expand Down
41 changes: 30 additions & 11 deletions bypass/bypass.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,43 @@ func (b *bypass) OnProxies(infos map[string]*commonconfig.ProxyConfig, configDir
}

dialers := chained.CreateDialersMap(configDir, supportedInfos, userConfig)
for k, v := range supportedInfos {
dialer := dialers[k]
for name, config := range supportedInfos {
dialer := dialers[name]
if dialer == nil {
log.Errorf("No dialer for %v", k)
log.Errorf("No dialer for %v", name)
continue
}

pc := chained.CopyConfig(v)
// Set the name in the info since we know it here.
pc.Name = k
// Kill the cert to avoid it taking up unnecessary space.
pc.Cert = ""
p := b.newProxy(k, pc, configDir, userConfig, dialer)
b.proxies = append(b.proxies, p)
go p.start()
// if dialer is not ready, try again in 60s
if !dialer.IsReady() {
log.Debugf("dialer %q is not ready, starting in background", name)
go func() {
for {
time.Sleep(60 * time.Second)
if dialer.IsReady() {
b.startProxy(name, config, configDir, userConfig, dialer)
break
}
}
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have a way to stop this in the event a dialer never becomes ready or becomes ready after new proxies are already assigned. It's kind of unlikely a dialer is never ready, but if someone (probably me) forgets/unaware this goroutine is created, they might not add a timeout to the dialer themselves.

Maybe a chan or ctx that gets closed/canceled in Reset and is recreated if we need to wait for dialers again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmmm thanks for the suggestion! I'll make IsReady return an error too, so if err is nil and ok is false then it should try again.

Copy link
Contributor Author

@WendelHime WendelHime Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this way we can also customize when to retry to start a proxy or call the func to create a implementation again depending of the returned error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will still suffer from the same problem though; It's not guaranteed that isReady will eventually return an it's ready or an error. If, at some point, we were to update water to retry downloading the WASM when it fails, or add a new protocol that might not be ready immediately, the caller can't assume that the dialer itself isn't running an infinite loop. Or there might be a bug where isReady never returns true or a non nil error. The goroutines won't ever be cleaned up, leading to memory leaks that we may not catch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmm it makes sense, thanks catching this! I've updated the bypass code:

  1. Added a go routine function loadProxyAsync for creating a context that timeout after 10 min.
  2. The function also provides an atomic boolean for the internal go routine that actually check if the proxy is ready or not. So the loadProxyAsync waits until the context timeout or it receives a channel signal saying it's ready. When ready the internal go routine should stop looping with the flag marked as false and the ready channel is closed.

continue
}

b.startProxy(name, config, configDir, userConfig, dialer)
}
}

func (b *bypass) startProxy(proxyName string, config *commonconfig.ProxyConfig, configDir string, userConfig common.UserConfig, dialer bandit.Dialer) {
pc := chained.CopyConfig(config)
// Set the name in the info since we know it here.
pc.Name = proxyName
// Kill the cert to avoid it taking up unnecessary space.
pc.Cert = ""
p := b.newProxy(proxyName, pc, configDir, userConfig, dialer)
b.proxies = append(b.proxies, p)
go p.start()
}

func (b *bypass) newProxy(name string, pc *commonconfig.ProxyConfig, configDir string, userConfig common.UserConfig, dialer bandit.Dialer) *proxy {
return &proxy{
ProxyConfig: pc,
Expand Down
4 changes: 4 additions & 0 deletions chained/algeneva_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (a *algenevaImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, er
return conn, nil
}

func (*algenevaImpl) isReady() bool {
return true
}

// algenevaDialer is a algeneva.Dialer wrapper around a reportDialCore. algeneva accepts an optional
// Dialer interface which it will use to dial the server and then wrap the resulting connection.
type algenevaDialer struct {
Expand Down
4 changes: 4 additions & 0 deletions chained/broflake_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func newBroflakeImpl(pc *config.ProxyConfig, reportDialCore reportDialCoreFn) (p
}, nil
}

func (*broflakeImpl) isReady() bool {
return true
}

func (b *broflakeImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) {
// TODO: I don't know what to do with 'op'
return b.QUICLayer.DialContext(ctx)
Expand Down
4 changes: 4 additions & 0 deletions chained/chained_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (impl *testImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, err
return impl.d(ctx)
}

func (*testImpl) isReady() bool {
return true
}

func newDialer(dialServer func(ctx context.Context) (net.Conn, error)) (func(network, addr string) (net.Conn, error), error) {
p, err := newProxy("test", "addr:567", "proto", "netw", &config.ProxyConfig{
AuthToken: "token",
Expand Down
4 changes: 4 additions & 0 deletions chained/http_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ func newHTTPImpl(addr string, dialCore coreDialer) proxyImpl {
func (impl *httpImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) {
return impl.dialCore(op, ctx, impl.addr)
}

func (*httpImpl) isReady() bool {
return true
}
4 changes: 4 additions & 0 deletions chained/https_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func (impl *httpsImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, er
return result.Conn, nil
}

func (*httpsImpl) isReady() bool {
return true
}

func timeoutFor(ctx context.Context) time.Duration {
deadline, ok := ctx.Deadline()
if ok {
Expand Down
4 changes: 4 additions & 0 deletions chained/multipath.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (impl *multipathImpl) FormatStats() []string {
return impl.dialer.(multipath.Stats).FormatStats()
}

func (*multipathImpl) isReady() bool {
return true
}

func CreateMPDialer(configDir, endpoint string, ss map[string]*config.ProxyConfig, uc common.UserConfig) (bandit.Dialer, error) {
if len(ss) < 1 {
return nil, errors.New("no dialers")
Expand Down
4 changes: 4 additions & 0 deletions chained/multiplexed_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (impl *multiplexedImpl) dialServer(op *ops.Op, ctx context.Context) (net.Co
return impl.multiplexedDial(ctx, "", "")
}

func (impl *multiplexedImpl) isReady() bool {
return true
}

// createMultiplexedProtocol configures a cmux multiplexing protocol
// according to settings.
func createMultiplexedProtocol(s *config.ProxyConfig) (cmux.Protocol, error) {
Expand Down
4 changes: 4 additions & 0 deletions chained/preconnecting_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (pd *preconnectingDialer) dialServer(op *ops.Op, ctx context.Context) (conn
}
}

func (*preconnectingDialer) isReady() bool {
return true
}

func (pd *preconnectingDialer) preconnectIfNecessary(op *ops.Op) {
pd.statsMutex.Lock()
defer pd.statsMutex.Unlock()
Expand Down
6 changes: 6 additions & 0 deletions chained/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type proxyImpl interface {
dialServer(op *ops.Op, ctx context.Context) (net.Conn, error)
// close releases the resources associated with the implementation, if any.
close()
// isReady returns if the implementation is ready to dial
isReady() bool
}

// nopCloser is a mixin to implement a do-nothing close() method of proxyImpl.
Expand Down Expand Up @@ -583,3 +585,7 @@ func splitClientHello(hello []byte) [][]byte {
splits = append(splits, hello[start:])
return splits
}

func (p *proxy) IsReady() bool {
return p.impl.isReady()
}
4 changes: 4 additions & 0 deletions chained/quic_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,7 @@ func (impl *quicImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, err
return conn, err
})
}

func (*quicImpl) isReady() bool {
return true
}
8 changes: 5 additions & 3 deletions chained/shadowsocks_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type shadowsocksImpl struct {
rng *mrand.Rand
rngmx sync.Mutex
tlsConfig *tls.Config
nopCloser
}

type PrefixSaltGen struct {
Expand Down Expand Up @@ -125,9 +126,6 @@ func newShadowsocksImpl(name, addr string, pc *config.ProxyConfig, reportDialCor
}, nil
}

func (impl *shadowsocksImpl) close() {
}

func (impl *shadowsocksImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) {
return impl.reportDialCore(op, func() (net.Conn, error) {
conn, err := impl.client.DialStream(ctx, impl.generateUpstream())
Expand All @@ -142,6 +140,10 @@ func (impl *shadowsocksImpl) dialServer(op *ops.Op, ctx context.Context) (net.Co
})
}

func (*shadowsocksImpl) isReady() bool {
return true
}

// generateUpstream() creates a marker upstream address. This isn't an
// acutal upstream that will be dialed, it signals that the upstream
// should be determined by other methods. It's just a bit random just to
Expand Down
4 changes: 4 additions & 0 deletions chained/starbridge_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func newStarbridgeImpl(name, addr string, pc *config.ProxyConfig, reportDialCore
}, nil
}

func (*starbridge) isReady() bool {
return true
}

// Adapted from https://github.com/OperatorFoundation/Starbridge-go/blob/v3.0.12/Starbridge/v3/starbridge.go#L237-L253
func getClientConfig(serverPublicKey string) replicant.ClientConfig {
polishClientConfig := polish.DarkStarPolishClientConfig{
Expand Down
4 changes: 4 additions & 0 deletions chained/tlsmasq_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func newTLSMasqImpl(configDir, name, addr string, pc *config.ProxyConfig, uc com
}, nil
}

func (*tlsMasqImpl) isReady() bool {
return true
}

func decodeUint16(s string) (uint16, error) {
b, err := hex.DecodeString(strings.TrimPrefix(s, "0x"))
if err != nil {
Expand Down
Loading
Loading