Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding IsReady to dialer interface #1436

Merged
merged 42 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
fb4a378
feat: adding IsReady to BanditDialer and isReady to proxy implementat…
WendelHime Nov 6, 2024
41c2f11
feat: only select dialer when ready to dial
WendelHime Nov 6, 2024
2b48657
fix: adding IsReady to test dialer
WendelHime Nov 6, 2024
55ffa64
fix: loading b64 wasm in background so channel doesn't lock and only …
WendelHime Nov 6, 2024
d39b38f
fix: removing err declaration
WendelHime Nov 6, 2024
9f0c6f0
fix: renaming channel and waiting dialer to be available before check…
WendelHime Nov 6, 2024
bd1fcf4
feat: if dialer is not ready, try to add to bypass again in 60s
WendelHime Nov 6, 2024
45e08c8
feat: adding log debug and continue when loading dialer in background
WendelHime Nov 6, 2024
76b38eb
fix: adopt suggestions and make IsReady return an error
WendelHime Nov 8, 2024
38dba03
feat: verifying IsReady error at bandit before checking if it's ready…
WendelHime Nov 8, 2024
f67faab
fix: updating bypass function for using and checking context timeout,…
WendelHime Nov 11, 2024
8555f52
feat: adding locker at package lever for avoiding downloading WASM fi…
WendelHime Nov 11, 2024
4812992
feat: add support for water multiplex
WendelHime Nov 13, 2024
2771c4c
fix: typo
WendelHime Nov 18, 2024
ddc0ce7
fix: checking error at bypass before using ready
WendelHime Nov 18, 2024
fa1391e
fix: canceling context if dialer IsReady returns an error
WendelHime Nov 18, 2024
5181511
fix: start proxy if dialer is ready
WendelHime Nov 18, 2024
681ae9e
fix: removing retry atomic bool and replace by select statement
WendelHime Nov 18, 2024
eeb9656
fix: removing if statement for checking if thereadyChain is closed an…
WendelHime Nov 18, 2024
2079bb0
fix: removing close calls for finishedToLoad; updated dialer closed m…
WendelHime Nov 19, 2024
3638a4b
fix: using a sync.Map for handling lockers per WATER transport
WendelHime Nov 19, 2024
d35e60c
fix: making Ready returns a <- chan error
WendelHime Nov 19, 2024
43dd35d
fix: checking if ready chan is nil
WendelHime Nov 19, 2024
b82c397
Merge branch 'main' of github.com:getlantern/flashlight into feat/144…
WendelHime Nov 19, 2024
04bb1c2
fix: adding on success to dialer
WendelHime Nov 19, 2024
810a92a
fix: deleting bandit package
WendelHime Nov 19, 2024
6aa256a
fix: removing errLoadingWASM var and send it directly to the channel
WendelHime Nov 19, 2024
e093e69
Merge branch 'main' of github.com:getlantern/flashlight into feat/144…
WendelHime Nov 25, 2024
a5e1ef8
feat: go mod tidy
WendelHime Nov 25, 2024
fafdfb9
fix: replace log by logger
WendelHime Nov 25, 2024
8f897ac
fix: add comment for making explicit IsReady can return a nil value i…
WendelHime Nov 26, 2024
8eba259
fix: replace sync.Map usage by creating water WASM lock map and a loc…
WendelHime Nov 26, 2024
7721192
fix: set default behavior instead of waiting for dialer to be ready a…
WendelHime Nov 26, 2024
8b1762d
fix: make water wasm map
WendelHime Nov 26, 2024
05a45ed
feat: broadcast ready status to all chan listeners
WendelHime Nov 28, 2024
6063f51
fix: update bandit message
WendelHime Nov 28, 2024
e281571
fix: using a buffered channel so we can simplify logic (replacing go …
WendelHime Nov 29, 2024
52d2b46
Merge branch 'main' of github.com:getlantern/flashlight into feat/144…
WendelHime Dec 3, 2024
337278f
fix: updating types.proto
WendelHime Dec 3, 2024
3866761
fix: adding fields to apipib legacy and update water impl connect opt…
WendelHime Dec 3, 2024
98d477b
fix: updating test
WendelHime Dec 3, 2024
8b3f92f
Merge branch 'main' of github.com:getlantern/flashlight into feat/144…
WendelHime Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions chained/algeneva_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (a *algenevaImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, er
return conn, nil
}

func (*algenevaImpl) ready() <-chan error {
return nil
}

// algenevaDialer is a algeneva.Dialer wrapper around a reportDialCore. algeneva accepts an optional
// Dialer interface which it will use to dial the server and then wrap the resulting connection.
type algenevaDialer struct {
Expand Down
4 changes: 4 additions & 0 deletions chained/broflake_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func newBroflakeImpl(pc *config.ProxyConfig, reportDialCore reportDialCoreFn) (p
}, nil
}

func (*broflakeImpl) ready() <-chan error {
return nil
}

func (b *broflakeImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) {
// TODO: I don't know what to do with 'op'
return b.QUICLayer.DialContext(ctx)
Expand Down
4 changes: 4 additions & 0 deletions chained/chained_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (impl *testImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, err
return impl.d(ctx)
}

func (*testImpl) ready() <-chan error {
return nil
}

func newDialer(dialServer func(ctx context.Context) (net.Conn, error)) (func(network, addr string) (net.Conn, error), error) {
p, err := newProxy("test", "addr:567", "proto", "netw", &config.ProxyConfig{
AuthToken: "token",
Expand Down
4 changes: 4 additions & 0 deletions chained/http_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ func newHTTPImpl(addr string, dialCore coreDialer) proxyImpl {
func (impl *httpImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) {
return impl.dialCore(op, ctx, impl.addr)
}

func (*httpImpl) ready() <-chan error {
return nil
}
4 changes: 4 additions & 0 deletions chained/https_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func (impl *httpsImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, er
return result.Conn, nil
}

func (*httpsImpl) ready() <-chan error {
return nil
}

func timeoutFor(ctx context.Context) time.Duration {
deadline, ok := ctx.Deadline()
if ok {
Expand Down
4 changes: 4 additions & 0 deletions chained/multipath.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (impl *multipathImpl) FormatStats() []string {
return impl.dialer.(multipath.Stats).FormatStats()
}

func (*multipathImpl) ready() <-chan error {
return nil
}

func CreateMPDialer(configDir, endpoint string, ss map[string]*config.ProxyConfig, uc common.UserConfig) (dialer.ProxyDialer, error) {
if len(ss) < 1 {
return nil, errors.New("no dialers")
Expand Down
4 changes: 4 additions & 0 deletions chained/multiplexed_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (impl *multiplexedImpl) dialServer(op *ops.Op, ctx context.Context) (net.Co
return impl.multiplexedDial(ctx, "", "")
}

func (impl *multiplexedImpl) ready() <-chan error {
return nil
}

// createMultiplexedProtocol configures a cmux multiplexing protocol
// according to settings.
func createMultiplexedProtocol(s *config.ProxyConfig) (cmux.Protocol, error) {
Expand Down
4 changes: 4 additions & 0 deletions chained/preconnecting_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (pd *preconnectingDialer) dialServer(op *ops.Op, ctx context.Context) (conn
}
}

func (*preconnectingDialer) ready() <-chan error {
return nil
}

func (pd *preconnectingDialer) preconnectIfNecessary(op *ops.Op) {
pd.statsMutex.Lock()
defer pd.statsMutex.Unlock()
Expand Down
11 changes: 9 additions & 2 deletions chained/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type proxyImpl interface {
dialServer(op *ops.Op, ctx context.Context) (net.Conn, error)
// close releases the resources associated with the implementation, if any.
close()
// isReady returns if the implementation is ready to dial
ready() <-chan error
}

// nopCloser is a mixin to implement a do-nothing close() method of proxyImpl.
Expand Down Expand Up @@ -250,7 +252,8 @@ func createImpl(configDir, name, addr, transport string, s *config.ProxyConfig,
func isAMultiplexedTransport(transport string) bool {
return transport == "tlsmasq" ||
transport == "starbridge" ||
transport == "algeneva"
transport == "algeneva" ||
transport == "water"
}

// ForceProxy forces everything through the HTTP proxy at forceAddr using
Expand Down Expand Up @@ -339,7 +342,7 @@ func newProxy(name, addr, protocol, network string, s *config.ProxyConfig, uc co
name: name,
protocol: protocol,
network: network,
multiplexed: s.MultiplexedAddr != "",
multiplexed: s.MultiplexedAddr != "" || isAMultiplexedTransport(name),
addr: addr,
authToken: s.AuthToken,
user: uc,
Expand Down Expand Up @@ -583,3 +586,7 @@ func splitClientHello(hello []byte) [][]byte {
splits = append(splits, hello[start:])
return splits
}

func (p *proxy) Ready() <-chan error {
return p.impl.ready()
}
4 changes: 4 additions & 0 deletions chained/quic_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,7 @@ func (impl *quicImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, err
return conn, err
})
}

func (*quicImpl) ready() <-chan error {
return nil
}
8 changes: 5 additions & 3 deletions chained/shadowsocks_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type shadowsocksImpl struct {
rng *mrand.Rand
rngmx sync.Mutex
tlsConfig *tls.Config
nopCloser
}

type PrefixSaltGen struct {
Expand Down Expand Up @@ -125,9 +126,6 @@ func newShadowsocksImpl(name, addr string, pc *config.ProxyConfig, reportDialCor
}, nil
}

func (impl *shadowsocksImpl) close() {
}

func (impl *shadowsocksImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) {
return impl.reportDialCore(op, func() (net.Conn, error) {
conn, err := impl.client.DialStream(ctx, impl.generateUpstream())
Expand All @@ -142,6 +140,10 @@ func (impl *shadowsocksImpl) dialServer(op *ops.Op, ctx context.Context) (net.Co
})
}

func (*shadowsocksImpl) ready() <-chan error {
return nil
}

// generateUpstream() creates a marker upstream address. This isn't an
// acutal upstream that will be dialed, it signals that the upstream
// should be determined by other methods. It's just a bit random just to
Expand Down
4 changes: 4 additions & 0 deletions chained/starbridge_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func newStarbridgeImpl(name, addr string, pc *config.ProxyConfig, reportDialCore
}, nil
}

func (*starbridge) ready() <-chan error {
return nil
}

// Adapted from https://github.com/OperatorFoundation/Starbridge-go/blob/v3.0.12/Starbridge/v3/starbridge.go#L237-L253
func getClientConfig(serverPublicKey string) replicant.ClientConfig {
polishClientConfig := polish.DarkStarPolishClientConfig{
Expand Down
4 changes: 4 additions & 0 deletions chained/tlsmasq_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func newTLSMasqImpl(configDir, name, addr string, pc *config.ProxyConfig, uc com
}, nil
}

func (*tlsMasqImpl) ready() <-chan error {
return nil
}

func decodeUint16(s string) (uint16, error) {
b, err := hex.DecodeString(strings.TrimPrefix(s, "0x"))
if err != nil {
Expand Down
116 changes: 74 additions & 42 deletions chained/water_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,85 +23,99 @@ type waterImpl struct {
raddr string
reportDialCore reportDialCoreFn
dialer water.Dialer
wgDownload *sync.WaitGroup
downloadErr error
nopCloser
readyChan chan error
}

var httpClient *http.Client

// waterLoadingWASMMutex prevents the WATER implementation to download/load the
// WASM file concurrently.
var waterTransportLocker sync.Map

func newWaterImpl(dir, addr string, pc *config.ProxyConfig, reportDialCore reportDialCoreFn) (*waterImpl, error) {
ctx := context.Background()
wasmAvailableAt := ptSetting(pc, "water_available_at")
transport := ptSetting(pc, "water_transport")
wg := new(sync.WaitGroup)
d := &waterImpl{
raddr: addr,
reportDialCore: reportDialCore,
wgDownload: wg,
readyChan: make(chan error),
}

b64WASM := ptSetting(pc, "water_wasm")
if b64WASM != "" {
var err error
wasm, err := base64.StdEncoding.DecodeString(b64WASM)
if err != nil {
return nil, fmt.Errorf("failed to decode water wasm: %w", err)
}
go func() {
wasm, err := base64.StdEncoding.DecodeString(b64WASM)
if err != nil {
d.readyChan <- log.Errorf("failed to decode water wasm: %w", err)
return
}

dialer, err := createDialer(ctx, wasm, transport)
if err != nil {
return nil, log.Errorf("failed to create dialer: %w", err)
}
d.dialer = dialer
d.dialer, err = createDialer(ctx, wasm, transport)
if err != nil {
d.readyChan <- log.Errorf("failed to create dialer: %w", err)
return
}
d.readyChan <- nil
}()
return d, nil
}

if wasmAvailableAt != "" {
wg.Add(1)
go func() {
defer wg.Done()
log.Debugf("Loading WASM for %q. If not available, it should try to download from the following URLs: %+v. The file should be available here: %s", transport, strings.Split(wasmAvailableAt, ","), dir)
vc := newWaterVersionControl(dir)
cli := httpClient
if cli == nil {
cli = proxied.ChainedThenDirectThenFrontedClient(1*time.Minute, "")
}
downloader, err := newWaterWASMDownloader(strings.Split(wasmAvailableAt, ","), cli)
if err != nil {
d.downloadErr = log.Errorf("failed to create wasm downloader: %w", err)
return
}
log.Debugf("Loading WASM for %q. If not available, it should try to download from the following URLs: %+v. The file should be available at: %q", transport, strings.Split(wasmAvailableAt, ","), dir)

r, err := vc.GetWASM(ctx, transport, downloader)
r, err := d.loadWASM(ctx, transport, dir, wasmAvailableAt)
if err != nil {
d.downloadErr = log.Errorf("failed to get wasm: %w", err)
d.readyChan <- log.Errorf("failed to read wasm: %w", err)
return
}
defer r.Close()

b, err := io.ReadAll(r)
if err != nil {
d.downloadErr = log.Errorf("failed to read wasm: %w", err)
d.readyChan <- log.Errorf("failed to load wasm bytes: %w", err)
return
}

if len(b) == 0 {
d.downloadErr = log.Errorf("received empty wasm")
return
}
log.Debugf("received wasm with %d bytes", len(b))

dialer, err := createDialer(ctx, b, transport)
d.dialer, err = createDialer(ctx, b, transport)
if err != nil {
d.downloadErr = log.Errorf("failed to create dialer: %w", err)
d.readyChan <- log.Errorf("failed to create dialer: %w", err)
return
}
d.dialer = dialer
d.readyChan <- nil
}()
}

return d, nil
}

func (d *waterImpl) loadWASM(ctx context.Context, transport string, dir string, wasmAvailableAt string) (io.ReadCloser, error) {
locker, ok := waterTransportLocker.Load(transport)
if !ok {
waterTransportLocker.Store(transport, new(sync.Mutex))
locker, _ = waterTransportLocker.Load(transport)
}
WendelHime marked this conversation as resolved.
Show resolved Hide resolved

locker.(sync.Locker).Lock()
defer locker.(sync.Locker).Unlock()
vc := newWaterVersionControl(dir)
cli := httpClient
if cli == nil {
cli = proxied.ChainedThenDirectThenFrontedClient(1*time.Minute, "")
}
downloader, err := newWaterWASMDownloader(strings.Split(wasmAvailableAt, ","), cli)
if err != nil {
return nil, log.Errorf("failed to create wasm downloader: %w", err)
}
r, err := vc.GetWASM(ctx, transport, downloader)
if err != nil {
return nil, log.Errorf("failed to get wasm: %w", err)
}
return r, nil
}

func createDialer(ctx context.Context, wasm []byte, transport string) (water.Dialer, error) {
cfg := &water.Config{
TransportModuleBin: wasm,
Expand All @@ -115,11 +129,29 @@ func createDialer(ctx context.Context, wasm []byte, transport string) (water.Dia
return dialer, nil
}

func (d *waterImpl) close() {
close(d.readyChan)
}

func (d *waterImpl) ready() <-chan error {
return d.readyChan
}

func (d *waterImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) {
return d.reportDialCore(op, func() (net.Conn, error) {
d.wgDownload.Wait()
if d.dialer == nil {
return nil, log.Errorf("dialer not available: %w", d.downloadErr)
// if dialer is not ready, wait until WASM is downloaded or context timeout
select {
case err, ok := <-d.readyChan:
if !ok {
return nil, log.Error("dialer closed")
}
if err != nil {
return nil, log.Errorf("failed to load dialer: %w", err)
}

log.Debug("dialer ready!")
case <-ctx.Done():
return nil, log.Errorf("context completed while waiting for WASM download: %w", ctx.Err())
}

// TODO: At water 0.7.0 (currently), the library is hanging onto the dial context
Expand Down
11 changes: 8 additions & 3 deletions chained/water_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func TestNewWaterImpl(t *testing.T) {
assert: func(t *testing.T, actual *waterImpl, err error) {
require.NoError(t, err)
require.NotNil(t, actual)
<-actual.readyChan
require.NotNil(t, actual.dialer)
assert.NotNil(t, actual.reportDialCore)
},
Expand Down Expand Up @@ -101,10 +102,10 @@ func TestNewWaterImpl(t *testing.T) {
},
},
assert: func(t *testing.T, actual *waterImpl, err error) {
defer actual.close()
require.NoError(t, err)
require.NotNil(t, actual)
actual.wgDownload.Wait()
assert.NoError(t, actual.downloadErr)
<-actual.readyChan
assert.NotNil(t, actual.dialer)
assert.NotNil(t, actual.reportDialCore)
},
Expand Down Expand Up @@ -145,7 +146,10 @@ func TestWaterDialServer(t *testing.T) {

b64WASM := base64.StdEncoding.EncodeToString(wasm)

pc := &config.ProxyConfig{PluggableTransportSettings: map[string]string{"water_wasm": b64WASM}}
pc := &config.ProxyConfig{PluggableTransportSettings: map[string]string{
"water_wasm": b64WASM,
}}

testOp := ops.Begin("test")
defer testOp.End()

Expand Down Expand Up @@ -198,6 +202,7 @@ func TestWaterDialServer(t *testing.T) {
tt.setHTTPClient()
waterImpl, err := newWaterImpl(tt.givenConfigDir, tt.givenAddr, pc, tt.givenReportDialCore)
require.NoError(t, err)
defer waterImpl.close()
conn, err := waterImpl.dialServer(tt.givenOp, tt.givenCtx)
tt.assert(t, conn, err)
})
Expand Down
Loading
Loading