Skip to content

Commit

Permalink
adding IsReady to dialer interface (#1436)
Browse files Browse the repository at this point in the history
* feat: adding IsReady to BanditDialer and isReady to proxy implementations

* feat: only select dialer when ready to dial

* fix: adding IsReady to test dialer

* fix: loading b64 wasm in background so channel doesn't lock and only wait for download when ready is isn't ready

* fix: removing err declaration

* fix: renaming channel and waiting dialer to be available before checking at test

* feat: if dialer is not ready, try to add to bypass again in 60s

* feat: adding log debug and continue when loading dialer in background

* fix: adopt suggestions and make IsReady return an error

* feat: verifying IsReady error at bandit before checking if it's ready or not

* fix: updating bypass function for using and checking context timeout, if it happens it should stop retrying to load the proxy async

* feat: adding locker at package lever for avoiding downloading WASM file with data race

* feat: add support for water multiplex

* fix: typo

* fix: checking error at bypass before using ready

* fix: canceling context if dialer IsReady returns an error

* fix: start proxy if dialer is ready

* fix: removing retry atomic bool and replace by select statement

* fix: removing if statement for checking if thereadyChain is closed and removing close

* fix: removing close calls for finishedToLoad; updated dialer closed message and isReady now returns true based on flag and dialer different than nil

* fix: using a sync.Map for handling lockers per WATER transport

* fix: making Ready returns a <- chan error

* fix: checking if ready chan is nil

* fix: adding on success to dialer

* fix: deleting bandit package

* fix: removing errLoadingWASM var and send it directly to the channel

* feat: go mod tidy

* fix: replace log by logger

* fix: add comment for making explicit IsReady can return a nil value if initialization is not required and verifying at bypass even when loading async

* fix: replace sync.Map usage by creating water WASM lock map and a locker for map; also renaming httpClient to waterHTTPClient

* fix: set default behavior instead of waiting for dialer to be ready at bandit

* fix: make water wasm map

* feat: broadcast ready status to all chan listeners

* fix: update bandit message

* fix: using a buffered channel so we can simplify logic (replacing go routines, context timeouts)

* fix: updating types.proto

* fix: adding fields to apipib legacy and update water impl connect options

* fix: updating test
  • Loading branch information
WendelHime authored Dec 9, 2024
1 parent 3e74446 commit 30d5ac4
Show file tree
Hide file tree
Showing 26 changed files with 531 additions and 204 deletions.
11 changes: 9 additions & 2 deletions apipb/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net"
"strconv"
"strings"
"time"

commonconfig "github.com/getlantern/common/config"
)
Expand Down Expand Up @@ -177,9 +178,15 @@ func ProxyToLegacyConfig(cfg *ProxyConnectConfig) (*commonconfig.ProxyConfig, er
}
case *ProxyConnectConfig_ConnectCfgWater:
legacy.PluggableTransport = "water"
duration, err := time.ParseDuration(pCfg.ConnectCfgWater.DownloadTimeout.String())
if err != nil {
duration = 5 * time.Minute
}
legacy.PluggableTransportSettings = map[string]string{
"water_wasm": base64.StdEncoding.EncodeToString(pCfg.ConnectCfgWater.Wasm),
"water_transport": pCfg.ConnectCfgWater.Transport,
"water_wasm": base64.StdEncoding.EncodeToString(pCfg.ConnectCfgWater.Wasm),
"water_transport": pCfg.ConnectCfgWater.Transport,
"wasm_available_at": strings.Join(pCfg.ConnectCfgWater.WasmAvailableAt, ","),
"download_timeout": duration.String(),
}

default:
Expand Down
294 changes: 165 additions & 129 deletions apipb/types.pb.go

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion apipb/types.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
syntax = "proto3";
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
option go_package = "github.com/getlantern/flashlight/apipb";

//
Expand Down Expand Up @@ -134,8 +135,14 @@ message ProxyConnectConfig {
}

message WATERConfig {
bytes wasm = 1;
// deprecated: use wasm_available_at instead
bytes wasm = 1 [deprecated = true];
string transport = 2;
// wasm_available_at provide a list of URLs that can be used for fetching the WASM module
// with different methods like a direct URL, a magnet link, etc.
repeated string wasm_available_at = 3;
// download_timeout specifies the time the HTTP client should wait to retrieve the WASM binary
google.protobuf.Duration download_timeout = 4;
}

oneof protocol_config {
Expand Down
4 changes: 4 additions & 0 deletions chained/algeneva_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (a *algenevaImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, er
return conn, nil
}

func (*algenevaImpl) ready() <-chan error {
return nil
}

// algenevaDialer is a algeneva.Dialer wrapper around a reportDialCore. algeneva accepts an optional
// Dialer interface which it will use to dial the server and then wrap the resulting connection.
type algenevaDialer struct {
Expand Down
4 changes: 4 additions & 0 deletions chained/broflake_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func newBroflakeImpl(pc *config.ProxyConfig, reportDialCore reportDialCoreFn) (p
}, nil
}

func (*broflakeImpl) ready() <-chan error {
return nil
}

func (b *broflakeImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) {
// TODO: I don't know what to do with 'op'
return b.QUICLayer.DialContext(ctx)
Expand Down
4 changes: 4 additions & 0 deletions chained/chained_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (impl *testImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, err
return impl.d(ctx)
}

func (*testImpl) ready() <-chan error {
return nil
}

func newDialer(dialServer func(ctx context.Context) (net.Conn, error)) (func(network, addr string) (net.Conn, error), error) {
p, err := newProxy("test", "addr:567", "proto", "netw", &config.ProxyConfig{
AuthToken: "token",
Expand Down
4 changes: 4 additions & 0 deletions chained/http_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ func newHTTPImpl(addr string, dialCore coreDialer) proxyImpl {
func (impl *httpImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) {
return impl.dialCore(op, ctx, impl.addr)
}

func (*httpImpl) ready() <-chan error {
return nil
}
4 changes: 4 additions & 0 deletions chained/https_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func (impl *httpsImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, er
return result.Conn, nil
}

func (*httpsImpl) ready() <-chan error {
return nil
}

func timeoutFor(ctx context.Context) time.Duration {
deadline, ok := ctx.Deadline()
if ok {
Expand Down
4 changes: 4 additions & 0 deletions chained/multipath.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (impl *multipathImpl) FormatStats() []string {
return impl.dialer.(multipath.Stats).FormatStats()
}

func (*multipathImpl) ready() <-chan error {
return nil
}

func CreateMPDialer(configDir, endpoint string, ss map[string]*config.ProxyConfig, uc common.UserConfig) (dialer.ProxyDialer, error) {
if len(ss) < 1 {
return nil, errors.New("no dialers")
Expand Down
4 changes: 4 additions & 0 deletions chained/multiplexed_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (impl *multiplexedImpl) dialServer(op *ops.Op, ctx context.Context) (net.Co
return impl.multiplexedDial(ctx, "", "")
}

func (impl *multiplexedImpl) ready() <-chan error {
return nil
}

// createMultiplexedProtocol configures a cmux multiplexing protocol
// according to settings.
func createMultiplexedProtocol(s *config.ProxyConfig) (cmux.Protocol, error) {
Expand Down
4 changes: 4 additions & 0 deletions chained/preconnecting_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (pd *preconnectingDialer) dialServer(op *ops.Op, ctx context.Context) (conn
}
}

func (*preconnectingDialer) ready() <-chan error {
return nil
}

func (pd *preconnectingDialer) preconnectIfNecessary(op *ops.Op) {
pd.statsMutex.Lock()
defer pd.statsMutex.Unlock()
Expand Down
6 changes: 6 additions & 0 deletions chained/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type proxyImpl interface {
dialServer(op *ops.Op, ctx context.Context) (net.Conn, error)
// close releases the resources associated with the implementation, if any.
close()
// isReady returns if the implementation is ready to dial
ready() <-chan error
}

// nopCloser is a mixin to implement a do-nothing close() method of proxyImpl.
Expand Down Expand Up @@ -584,3 +586,7 @@ func splitClientHello(hello []byte) [][]byte {
splits = append(splits, hello[start:])
return splits
}

func (p *proxy) Ready() <-chan error {
return p.impl.ready()
}
4 changes: 4 additions & 0 deletions chained/quic_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,7 @@ func (impl *quicImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, err
return conn, err
})
}

func (*quicImpl) ready() <-chan error {
return nil
}
8 changes: 5 additions & 3 deletions chained/shadowsocks_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type shadowsocksImpl struct {
rng *mrand.Rand
rngmx sync.Mutex
tlsConfig *tls.Config
nopCloser
}

type PrefixSaltGen struct {
Expand Down Expand Up @@ -125,9 +126,6 @@ func newShadowsocksImpl(name, addr string, pc *config.ProxyConfig, reportDialCor
}, nil
}

func (impl *shadowsocksImpl) close() {
}

func (impl *shadowsocksImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) {
return impl.reportDialCore(op, func() (net.Conn, error) {
conn, err := impl.client.DialStream(ctx, impl.generateUpstream())
Expand All @@ -142,6 +140,10 @@ func (impl *shadowsocksImpl) dialServer(op *ops.Op, ctx context.Context) (net.Co
})
}

func (*shadowsocksImpl) ready() <-chan error {
return nil
}

// generateUpstream() creates a marker upstream address. This isn't an
// acutal upstream that will be dialed, it signals that the upstream
// should be determined by other methods. It's just a bit random just to
Expand Down
4 changes: 4 additions & 0 deletions chained/starbridge_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func newStarbridgeImpl(name, addr string, pc *config.ProxyConfig, reportDialCore
}, nil
}

func (*starbridge) ready() <-chan error {
return nil
}

// Adapted from https://github.com/OperatorFoundation/Starbridge-go/blob/v3.0.12/Starbridge/v3/starbridge.go#L237-L253
func getClientConfig(serverPublicKey string) replicant.ClientConfig {
polishClientConfig := polish.DarkStarPolishClientConfig{
Expand Down
4 changes: 4 additions & 0 deletions chained/tlsmasq_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func newTLSMasqImpl(configDir, name, addr string, pc *config.ProxyConfig, uc com
}, nil
}

func (*tlsMasqImpl) ready() <-chan error {
return nil
}

func decodeUint16(s string) (uint16, error) {
b, err := hex.DecodeString(strings.TrimPrefix(s, "0x"))
if err != nil {
Expand Down
Loading

0 comments on commit 30d5ac4

Please sign in to comment.