Skip to content

Commit

Permalink
group config requests, make config/bypass services
Browse files Browse the repository at this point in the history
  • Loading branch information
garmr-ulfr committed Jul 16, 2024
1 parent 6dc5470 commit 159e65d
Show file tree
Hide file tree
Showing 6 changed files with 2,348 additions and 0 deletions.
257 changes: 257 additions & 0 deletions services/bypass.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
package services

// bypass periodically sends traffic to the bypass blocking detection server. The server uses the ratio
// between domain fronted and proxied traffic to determine if proxies are blocked. The client randomizes
// the intervals between calls to the server and also randomizes the length of requests.
import (
"bytes"
"context"
"io"
"net"
"net/http"
"sync"

mrand "math/rand"

"go.uber.org/atomic"
"google.golang.org/protobuf/proto"

commonconfig "github.com/getlantern/common/config"
"github.com/getlantern/golog"

"github.com/getlantern/flashlight/v7/apipb"
"github.com/getlantern/flashlight/v7/bandit"
"github.com/getlantern/flashlight/v7/chained"
"github.com/getlantern/flashlight/v7/common"
"github.com/getlantern/flashlight/v7/config"
"github.com/getlantern/flashlight/v7/ops"
"github.com/getlantern/flashlight/v7/proxied"
)

var (

// some pluggable transports don't work with bypass
unsupportedTransports = map[string]bool{
"broflake": true,
}
)

// The way lantern-cloud is configured, we need separate URLs for domain fronted vs proxied traffic.
const (
dfEndpoint = "https://iantem.io/api/v1/bypass"
proxyEndpoint = "https://api.iantem.io/v1/bypass"

// bypassMinWaitSeconds and maxRequestWaitSec are the minimum and maximum number of seconds to wait
// between sending bypass requests. The wait time can be extended by the server.
bypassMinWaitSeconds = 80
bypassMaxWaitSeconds = 200

// version is the bypass client version. It is not necessary to update this value on every
// change to bypass; this should only be updated when the backend needs to make decisions unique
// to a new version of bypass.
version int32 = 1
)

type bypassService struct {
infos map[string]*commonconfig.ProxyConfig
proxies []*proxy
mxProxies sync.Mutex
done chan struct{}
logger golog.Logger
}

// StartBypassService sends periodic traffic to the bypass server. The client periodically sends
// traffic to the server both via domain fronting and proxying to determine if proxies are blocked.
func StartBypassService(
listen func(func(map[string]*commonconfig.ProxyConfig, config.Source)),
configDir string,
userConfig common.UserConfig,
) *bypassService {
b := &bypassService{
infos: make(map[string]*commonconfig.ProxyConfig),
proxies: make([]*proxy, 0),
done: make(chan struct{}),
logger: golog.LoggerFor("bypassService"),
}
listen(func(infos map[string]*commonconfig.ProxyConfig, src config.Source) {
b.onProxies(infos, configDir, userConfig)
})
return b
}

func (b *bypassService) onProxies(
infos map[string]*commonconfig.ProxyConfig,
configDir string,
userConfig common.UserConfig,
) {
b.mxProxies.Lock()
defer b.mxProxies.Unlock()
b.Reset()

// Some pluggable transports don't support bypass, filter these out here.
supportedInfos := make(map[string]*commonconfig.ProxyConfig, len(infos))

for k, v := range infos {
if !unsupportedTransports[v.PluggableTransport] {
supportedInfos[k] = v
}
}

dialers := chained.CreateDialersMap(configDir, supportedInfos, userConfig)
for k, v := range supportedInfos {
dialer := dialers[k]
if dialer == nil {
b.logger.Errorf("No dialer for %v", k)
continue
}

pc := chained.CopyConfig(v)
// Set the name in the info since we know it here.
pc.Name = k
// Kill the cert to avoid it taking up unnecessary space.
pc.Cert = ""
p := newProxy(k, pc, configDir, userConfig, dialer, b.logger)
b.proxies = append(b.proxies, p)
go p.start(b.done)
}
}

func (b *bypassService) Reset() {
b.mxProxies.Lock()
close(b.done)
b.proxies = make([]*proxy, 0)
b.done = make(chan struct{})
b.mxProxies.Unlock()
}

type proxy struct {
*commonconfig.ProxyConfig
name string
dfRoundTripper http.RoundTripper
proxyRoundTripper http.RoundTripper
toggle *atomic.Bool
userConfig common.UserConfig
logger golog.Logger
}

func newProxy(
name string,
pc *commonconfig.ProxyConfig,
configDir string,
userConfig common.UserConfig,
dialer bandit.Dialer,
logger golog.Logger,
) *proxy {
return &proxy{
ProxyConfig: pc,
name: name,
toggle: atomic.NewBool(mrand.Float32() < 0.5),
dfRoundTripper: proxied.Fronted(0),
userConfig: userConfig,
proxyRoundTripper: newProxyRoundTripper(name, pc, userConfig, dialer, logger),
}
}

func (p *proxy) start(done <-chan struct{}) {
p.logger.Debugf("Starting bypass for proxy %v", p.name)
fn := func() int64 {
wait, _ := p.sendToBypass()
return wait
}
callRandomly(fn, bypassMinWaitSeconds, bypassMaxWaitSeconds, done, p.logger)
}

func (p *proxy) sendToBypass() (int64, error) {
op := ops.Begin("bypass_dial")
defer op.End()

// We alternate between domain fronting and proxying to ensure that, in aggregate, we
// send both equally. We avoid sending both a domain fronted and a proxied request
// in rapid succession to avoid the blocking detection itself being a signal.
var (
rt http.RoundTripper
endpoint string
fronted = p.toggle.Toggle()
)
if fronted {
p.logger.Debug("Using domain fronting")
rt = p.dfRoundTripper
endpoint = dfEndpoint
} else {
p.logger.Debug("Using proxy directly")
rt = p.proxyRoundTripper
endpoint = proxyEndpoint
}

op.Set("fronted", fronted)

// Just posting all the info about the server allows us to control these fields fully on the server
// side.
bypassRequest := &apipb.BypassRequest{
Config: &apipb.LegacyConnectConfig{
Name: p.ProxyConfig.Name,
Addr: p.ProxyConfig.Addr,
Cert: p.ProxyConfig.Cert,
PluggableTransport: p.ProxyConfig.PluggableTransport,
PluggableTransportSettings: p.ProxyConfig.PluggableTransportSettings,
Location: &apipb.LegacyConnectConfig_ProxyLocation{
City: p.ProxyConfig.Location.GetCity(),
Country: p.ProxyConfig.Location.GetCountry(),
CountryCode: p.ProxyConfig.Location.GetCountryCode(),
Latitude: p.ProxyConfig.Location.GetLatitude(),
Longitude: p.ProxyConfig.Location.GetLongitude(),
},
Track: p.ProxyConfig.Track,
Region: p.ProxyConfig.Region,
},
Version: version,
}

bypassBuf, err := proto.Marshal(bypassRequest)
if err != nil {
p.logger.Errorf("Unable to marshal chained server info: %v", err)
op.FailIf(err)
return 0, err
}

p.logger.Debugf("Sending traffic for bypass server: %v", p.name)
resp, sleep, err := post(
endpoint,
bytes.NewReader(bypassBuf),
rt,
p.userConfig,
p.logger,
)
if err != nil || resp == nil {
err = p.logger.Errorf("Unable to post chained server info: %v", err)
op.FailIf(err)
return 0, err
}

io.Copy(io.Discard, resp)
return sleep, nil
}

func newProxyRoundTripper(
name string,
info *commonconfig.ProxyConfig,
userConfig common.UserConfig,
dialer bandit.Dialer,
logger golog.Logger,
) http.RoundTripper {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.Proxy = nil
transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
logger.Debugf("Dialing chained server at: %s", addr)
pc, _, err := dialer.DialContext(ctx, bandit.NetworkConnect, addr)
if err != nil {
logger.Errorf("Unable to dial chained server: %v", err)
} else {
logger.Debug("Successfully dialed chained server")
}

return pc, err
}

return transport
}
Loading

0 comments on commit 159e65d

Please sign in to comment.