Skip to content

Commit

Permalink
Merge branch 'main' into group-client-reqs
Browse files Browse the repository at this point in the history
  • Loading branch information
garmr-ulfr committed Nov 12, 2024
2 parents cdb743d + b33595a commit 4e85451
Show file tree
Hide file tree
Showing 19 changed files with 3,780 additions and 3,479 deletions.
3 changes: 2 additions & 1 deletion chained/broflake_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ func makeBroflakeOptions(pc *config.ProxyConfig) (
// Broflake's HTTP client isn't currently configurable via PluggableTransportSettings, and so
// we just give it this domain fronted client in all cases
wo.HttpClient = &http.Client{
Transport: proxied.Fronted(masqueradeTimeout),
Transport: proxied.Fronted("broflake_fronted_roundtrip", masqueradeTimeout),
Timeout: 60 * time.Second,
}

// Override QUICLayerOptions defaults as applicable
Expand Down
106 changes: 69 additions & 37 deletions chained/water_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"net/http"
"strings"
"sync"
"time"

"github.com/getlantern/common/config"
Expand All @@ -22,74 +23,105 @@ type waterImpl struct {
raddr string
reportDialCore reportDialCoreFn
dialer water.Dialer
wgDownload *sync.WaitGroup
downloadErr error
nopCloser
}

var httpClient *http.Client

func newWaterImpl(dir, addr string, pc *config.ProxyConfig, reportDialCore reportDialCoreFn) (*waterImpl, error) {
var wasm []byte
ctx := context.Background()
wasmAvailableAt := ptSetting(pc, "water_available_at")
transport := ptSetting(pc, "water_transport")
wg := new(sync.WaitGroup)
d := &waterImpl{
raddr: addr,
reportDialCore: reportDialCore,
wgDownload: wg,
}

b64WASM := ptSetting(pc, "water_wasm")
if b64WASM != "" {
var err error
wasm, err = base64.StdEncoding.DecodeString(b64WASM)
wasm, err := base64.StdEncoding.DecodeString(b64WASM)
if err != nil {
return nil, fmt.Errorf("failed to decode water wasm: %w", err)
}
}

ctx := context.Background()
wasmAvailableAt := ptSetting(pc, "water_available_at")
transport := ptSetting(pc, "water_transport")
if wasm == nil && wasmAvailableAt != "" {
vc := newWaterVersionControl(dir)
cli := httpClient
if cli == nil {
cli = proxied.ChainedThenDirectThenFrontedClient(1*time.Minute, "")
}
downloader, err := newWaterWASMDownloader(strings.Split(wasmAvailableAt, ","), cli)
if err != nil {
return nil, log.Errorf("failed to create wasm downloader: %w", err)
}

r, err := vc.GetWASM(ctx, transport, downloader)
dialer, err := createDialer(ctx, wasm, transport)
if err != nil {
return nil, log.Errorf("failed to get wasm: %w", err)
}
defer r.Close()

b, err := io.ReadAll(r)
if err != nil {
return nil, log.Errorf("failed to read wasm: %w", err)
}

if len(b) == 0 {
return nil, log.Errorf("received empty wasm")
return nil, log.Errorf("failed to create dialer: %w", err)
}
d.dialer = dialer
return d, nil
}

wasm = b
if wasmAvailableAt != "" {
wg.Add(1)
go func() {
defer wg.Done()
log.Debugf("Loading WASM for %q. If not available, it should try to download from the following URLs: %+v. The file should be available here: %s", transport, strings.Split(wasmAvailableAt, ","), dir)
vc := newWaterVersionControl(dir)
cli := httpClient
if cli == nil {
cli = proxied.ChainedThenDirectThenFrontedClient(1*time.Minute, "")
}
downloader, err := newWaterWASMDownloader(strings.Split(wasmAvailableAt, ","), cli)
if err != nil {
d.downloadErr = log.Errorf("failed to create wasm downloader: %w", err)
return
}

r, err := vc.GetWASM(ctx, transport, downloader)
if err != nil {
d.downloadErr = log.Errorf("failed to get wasm: %w", err)
return
}
defer r.Close()

b, err := io.ReadAll(r)
if err != nil {
d.downloadErr = log.Errorf("failed to read wasm: %w", err)
return
}

if len(b) == 0 {
d.downloadErr = log.Errorf("received empty wasm")
return
}

dialer, err := createDialer(ctx, b, transport)
if err != nil {
d.downloadErr = log.Errorf("failed to create dialer: %w", err)
}
d.dialer = dialer
}()
}

return d, nil
}

func createDialer(ctx context.Context, wasm []byte, transport string) (water.Dialer, error) {
cfg := &water.Config{
TransportModuleBin: wasm,
OverrideLogger: slog.New(newLogHandler(log, transport)),
}

dialer, err := water.NewDialerWithContext(ctx, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create dialer: %w", err)
return nil, log.Errorf("failed to create dialer: %w", err)
}

return &waterImpl{
raddr: addr,
dialer: dialer,
reportDialCore: reportDialCore,
}, nil
return dialer, nil
}

func (d *waterImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) {
return d.reportDialCore(op, func() (net.Conn, error) {
d.wgDownload.Wait()
if d.dialer == nil {
return nil, log.Errorf("dialer not available: %w", d.downloadErr)
}

// TODO: At water 0.7.0 (currently), the library is hanging onto the dial context
// beyond it's scope. If you cancel this context, all dialed connections with the context
// will be closed. This should not happen (only dials in progress should be affected).
Expand Down
4 changes: 3 additions & 1 deletion chained/water_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func TestNewWaterImpl(t *testing.T) {
assert: func(t *testing.T, actual *waterImpl, err error) {
require.NoError(t, err)
require.NotNil(t, actual)
require.NotNil(t, actual.dialer)
actual.wgDownload.Wait()
assert.NoError(t, actual.downloadErr)
assert.NotNil(t, actual.dialer)
assert.NotNil(t, actual.reportDialCore)
},
setHTTPClient: func() {
Expand Down
4 changes: 4 additions & 0 deletions chained/water_version_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func newWaterVersionControl(dir string) *waterVersionControl {
func (vc *waterVersionControl) GetWASM(ctx context.Context, transport string, downloader waterWASMDownloader) (io.ReadCloser, error) {
path := filepath.Join(vc.dir, transport+".wasm")
var f io.ReadCloser
log.Debugf("trying to load file %q", path)
f, err := os.Open(path)
if err != nil && !os.IsNotExist(err) {
return nil, log.Errorf("failed to open file %s: %w", path, err)
Expand All @@ -48,6 +49,7 @@ func (vc *waterVersionControl) GetWASM(ctx context.Context, transport string, do
if err = vc.markUsed(transport); err != nil {
return nil, log.Errorf("failed to update WASM history: %w", err)
}
log.Debugf("WASM file loaded")

return f, nil
}
Expand Down Expand Up @@ -105,6 +107,7 @@ func (vc *waterVersionControl) cleanOutdated() error {
return nil
})
for _, path := range filesToBeDeleted {
log.Debugf("deleting file: %q", path)
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -125,6 +128,7 @@ func (vc *waterVersionControl) cleanOutdated() error {

func (vc *waterVersionControl) downloadWASM(ctx context.Context, transport string, downloader waterWASMDownloader) (io.ReadCloser, error) {
outputPath := filepath.Join(vc.dir, transport+".wasm")
log.Debugf("downloading WASM file and writing at %q", outputPath)
f, err := os.Create(outputPath)
if err != nil {
return nil, log.Errorf("failed to create file %s: %w", transport, err)
Expand Down
3 changes: 2 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func TestProductionGlobal(t *testing.T) {
testURL := common.GlobalURL // this should always point to the live production configuration (not staging etc)

expectedProviders := map[string]bool{
"akamai": true,
"akamai": true,
"cloudfront": true,
}

f := newHttpFetcher(newTestUserConfig(), &http.Transport{}, testURL)
Expand Down
Loading

0 comments on commit 4e85451

Please sign in to comment.