Skip to content

Commit

Permalink
loading water dialer async (#1429)
Browse files Browse the repository at this point in the history
* feat: adding wasm downloader

* fix: update tests package name

* fix: generating wasm downloader mock

* feat: adding magnet downloader

* feat: adding case for using magnet downloader

* fix: replacing magnetic references by magnet and also removing magnet skip

* fix: downgrade torrent version to 1.53.3 to avoid errors happening at confluence - anacrolix/confluence needs a new tag and we need to update that at lantern/replica and then here at lantern/flashlight

* fix: updating the way we're comparing hashes

* chore: go mod tidy

* feat: using DirectThenFronted proxy for fetching WASM and also update proxy vars

* feat: fix test and print error message

* feat: give life to the dial monster

* feat: release the beast

* fix: rename mocks.go to mocks_test.go

* fix: pass empty rootCA at this moment

* fix: removing hashsum parameter

* fix: replacing http client parameter by package var so I can mock it for tests

* fix: leaving mkdirtemp first parameter empty and set lantern-water-module dir name

* fix: update torrent to latest version

* fix: generating more mocks for torrent so we can unit test it without integration

* chore: removing test that was fetching random torrent

* chore: keeping torrent at v1.53.3

* fix: removing WithHTTPDownloader

* fix: adding garmr-ulfr suggestions from http-proxy PR

* feat: adding minimal version control for WASM files

* feat: using filename/transport as index for mapped wasm files

* feat: integrate version control with water wasm and replace direct downloader

* chore: sending writer directly instead of creating a temp buffer

* chore: sending config dir as parameter to water impl

* chore: small refactor for organizing download step

* feat: add support for WASM history

* feat: update history after loading WASM correctly

* feat: creating water dialer async

* chore: go mod tidy

* chore: running go generate

* fix: renaming versionControl to waterVersionControl

* fix: removing VersionControl unused interface

* fix: moving version control commit to GetWASM

* fix: moving WASMDownloader initialization from version control to water impl and receive downloader for fetching wasm

* fix: renaming structs to refer to water

* fix: renaming configDir to dir

* fix: updating mock test references

* fix: removing water_impl responsibility for setting the water version control dir

* fix: creating const at package level for outdated WASMs

* fix: removing csv dependency and making it simpler

* chore: go mod tidy

* fix: replace os.IsNotExist

* fix: grouping files to be deleted and delete them outside filepath.Walk

* fix: creating water dir if it doesn't exist

* fix: removing unused imports

* chore: adding some logs

* fix: adding downloadErr so we can understand what happened

* fix: re-organize dialer initialization

* fix: update test for asserting if there's no errors

* chore: removing TODO since behavior is acceptable
  • Loading branch information
WendelHime authored Oct 31, 2024
1 parent cc31811 commit 1bfc4a2
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 38 deletions.
106 changes: 69 additions & 37 deletions chained/water_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"net/http"
"strings"
"sync"
"time"

"github.com/getlantern/common/config"
Expand All @@ -22,74 +23,105 @@ type waterImpl struct {
raddr string
reportDialCore reportDialCoreFn
dialer water.Dialer
wgDownload *sync.WaitGroup
downloadErr error
nopCloser
}

var httpClient *http.Client

func newWaterImpl(dir, addr string, pc *config.ProxyConfig, reportDialCore reportDialCoreFn) (*waterImpl, error) {
var wasm []byte
ctx := context.Background()
wasmAvailableAt := ptSetting(pc, "water_available_at")
transport := ptSetting(pc, "water_transport")
wg := new(sync.WaitGroup)
d := &waterImpl{
raddr: addr,
reportDialCore: reportDialCore,
wgDownload: wg,
}

b64WASM := ptSetting(pc, "water_wasm")
if b64WASM != "" {
var err error
wasm, err = base64.StdEncoding.DecodeString(b64WASM)
wasm, err := base64.StdEncoding.DecodeString(b64WASM)
if err != nil {
return nil, fmt.Errorf("failed to decode water wasm: %w", err)
}
}

ctx := context.Background()
wasmAvailableAt := ptSetting(pc, "water_available_at")
transport := ptSetting(pc, "water_transport")
if wasm == nil && wasmAvailableAt != "" {
vc := newWaterVersionControl(dir)
cli := httpClient
if cli == nil {
cli = proxied.ChainedThenDirectThenFrontedClient(1*time.Minute, "")
}
downloader, err := newWaterWASMDownloader(strings.Split(wasmAvailableAt, ","), cli)
if err != nil {
return nil, log.Errorf("failed to create wasm downloader: %w", err)
}

r, err := vc.GetWASM(ctx, transport, downloader)
dialer, err := createDialer(ctx, wasm, transport)
if err != nil {
return nil, log.Errorf("failed to get wasm: %w", err)
}
defer r.Close()

b, err := io.ReadAll(r)
if err != nil {
return nil, log.Errorf("failed to read wasm: %w", err)
}

if len(b) == 0 {
return nil, log.Errorf("received empty wasm")
return nil, log.Errorf("failed to create dialer: %w", err)
}
d.dialer = dialer
return d, nil
}

wasm = b
if wasmAvailableAt != "" {
wg.Add(1)
go func() {
defer wg.Done()
log.Debugf("Loading WASM for %q. If not available, it should try to download from the following URLs: %+v. The file should be available here: %s", transport, strings.Split(wasmAvailableAt, ","), dir)
vc := newWaterVersionControl(dir)
cli := httpClient
if cli == nil {
cli = proxied.ChainedThenDirectThenFrontedClient(1*time.Minute, "")
}
downloader, err := newWaterWASMDownloader(strings.Split(wasmAvailableAt, ","), cli)
if err != nil {
d.downloadErr = log.Errorf("failed to create wasm downloader: %w", err)
return
}

r, err := vc.GetWASM(ctx, transport, downloader)
if err != nil {
d.downloadErr = log.Errorf("failed to get wasm: %w", err)
return
}
defer r.Close()

b, err := io.ReadAll(r)
if err != nil {
d.downloadErr = log.Errorf("failed to read wasm: %w", err)
return
}

if len(b) == 0 {
d.downloadErr = log.Errorf("received empty wasm")
return
}

dialer, err := createDialer(ctx, b, transport)
if err != nil {
d.downloadErr = log.Errorf("failed to create dialer: %w", err)
}
d.dialer = dialer
}()
}

return d, nil
}

func createDialer(ctx context.Context, wasm []byte, transport string) (water.Dialer, error) {
cfg := &water.Config{
TransportModuleBin: wasm,
OverrideLogger: slog.New(newLogHandler(log, transport)),
}

dialer, err := water.NewDialerWithContext(ctx, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create dialer: %w", err)
return nil, log.Errorf("failed to create dialer: %w", err)
}

return &waterImpl{
raddr: addr,
dialer: dialer,
reportDialCore: reportDialCore,
}, nil
return dialer, nil
}

func (d *waterImpl) dialServer(op *ops.Op, ctx context.Context) (net.Conn, error) {
return d.reportDialCore(op, func() (net.Conn, error) {
d.wgDownload.Wait()
if d.dialer == nil {
return nil, log.Errorf("dialer not available: %w", d.downloadErr)
}

// TODO: At water 0.7.0 (currently), the library is hanging onto the dial context
// beyond it's scope. If you cancel this context, all dialed connections with the context
// will be closed. This should not happen (only dials in progress should be affected).
Expand Down
4 changes: 3 additions & 1 deletion chained/water_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func TestNewWaterImpl(t *testing.T) {
assert: func(t *testing.T, actual *waterImpl, err error) {
require.NoError(t, err)
require.NotNil(t, actual)
require.NotNil(t, actual.dialer)
actual.wgDownload.Wait()
assert.NoError(t, actual.downloadErr)
assert.NotNil(t, actual.dialer)
assert.NotNil(t, actual.reportDialCore)
},
setHTTPClient: func() {
Expand Down
4 changes: 4 additions & 0 deletions chained/water_version_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func newWaterVersionControl(dir string) *waterVersionControl {
func (vc *waterVersionControl) GetWASM(ctx context.Context, transport string, downloader waterWASMDownloader) (io.ReadCloser, error) {
path := filepath.Join(vc.dir, transport+".wasm")
var f io.ReadCloser
log.Debugf("trying to load file %q", path)
f, err := os.Open(path)
if err != nil && !os.IsNotExist(err) {
return nil, log.Errorf("failed to open file %s: %w", path, err)
Expand All @@ -48,6 +49,7 @@ func (vc *waterVersionControl) GetWASM(ctx context.Context, transport string, do
if err = vc.markUsed(transport); err != nil {
return nil, log.Errorf("failed to update WASM history: %w", err)
}
log.Debugf("WASM file loaded")

return f, nil
}
Expand Down Expand Up @@ -105,6 +107,7 @@ func (vc *waterVersionControl) cleanOutdated() error {
return nil
})
for _, path := range filesToBeDeleted {
log.Debugf("deleting file: %q", path)
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -125,6 +128,7 @@ func (vc *waterVersionControl) cleanOutdated() error {

func (vc *waterVersionControl) downloadWASM(ctx context.Context, transport string, downloader waterWASMDownloader) (io.ReadCloser, error) {
outputPath := filepath.Join(vc.dir, transport+".wasm")
log.Debugf("downloading WASM file and writing at %q", outputPath)
f, err := os.Create(outputPath)
if err != nil {
return nil, log.Errorf("failed to create file %s: %w", transport, err)
Expand Down

0 comments on commit 1bfc4a2

Please sign in to comment.