Skip to content

Commit

Permalink
Merge branch 'sbruens/udp-split-serving' into sbruens/caddy-ws
Browse files Browse the repository at this point in the history
  • Loading branch information
sbruens committed Nov 28, 2024
2 parents 269c367 + 34a01d7 commit ba26dd6
Show file tree
Hide file tree
Showing 9 changed files with 689 additions and 523 deletions.
12 changes: 1 addition & 11 deletions caddy/shadowsocks_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ import (

const ssModuleName = "layer4.handlers.shadowsocks"

// Max UDP buffer size for the server code.
const serverUDPBufferSize = 64 * 1024

func init() {
caddy.RegisterModule(ModuleRegistration{
ID: ssModuleName,
Expand All @@ -50,7 +47,6 @@ type ShadowsocksHandler struct {
Keys []KeyConfig `json:"keys,omitempty"`

service outline.Service
buffer []byte
logger *slog.Logger
}

Expand Down Expand Up @@ -112,7 +108,6 @@ func (h *ShadowsocksHandler) Provision(ctx caddy.Context) error {
return err
}
h.service = service
h.buffer = make([]byte, serverUDPBufferSize)
return nil
}

Expand All @@ -122,12 +117,7 @@ func (h *ShadowsocksHandler) Handle(cx *layer4.Connection, _ layer4.Handler) err
case transport.StreamConn:
h.service.HandleStream(cx.Context, conn)
case net.Conn:
n, err := cx.Read(h.buffer)
if err != nil {
return err
}
pkt := h.buffer[:n]
h.service.HandlePacket(cx, pkt)
h.service.HandleAssociation(cx)
default:
return fmt.Errorf("failed to handle unknown connection type: %T", conn)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/outline-ss-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
go service.PacketServe(pc, ssService.HandlePacket)
go service.PacketServe(pc, ssService.HandleAssociation, s.serverMetrics)
}

for _, serviceConfig := range config.Services {
Expand Down Expand Up @@ -271,7 +271,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
go service.PacketServe(pc, ssService.HandlePacket)
go service.PacketServe(pc, ssService.HandleAssociation, s.serverMetrics)
}
}
totalCipherCount += len(serviceConfig.Keys)
Expand Down
32 changes: 29 additions & 3 deletions cmd/outline-ss-server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"time"

"github.com/Jigsaw-Code/outline-ss-server/service"
"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -25,12 +26,15 @@ var now = time.Now

type serverMetrics struct {
// NOTE: New metrics need to be added to `newPrometheusServerMetrics()`, `Describe()` and `Collect()`.
buildInfo *prometheus.GaugeVec
accessKeys prometheus.Gauge
ports prometheus.Gauge
buildInfo *prometheus.GaugeVec
accessKeys prometheus.Gauge
ports prometheus.Gauge
addedNatEntries prometheus.Counter
removedNatEntries prometheus.Counter
}

var _ prometheus.Collector = (*serverMetrics)(nil)
var _ service.NATMetrics = (*serverMetrics)(nil)

// newPrometheusServerMetrics constructs a Prometheus metrics collector for server
// related metrics.
Expand All @@ -48,19 +52,33 @@ func newPrometheusServerMetrics() *serverMetrics {
Name: "ports",
Help: "Count of open ports",
}),
addedNatEntries: prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: "udp",
Name: "nat_entries_added",
Help: "Entries added to the UDP NAT table",
}),
removedNatEntries: prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: "udp",
Name: "nat_entries_removed",
Help: "Entries removed from the UDP NAT table",
}),
}
}

func (m *serverMetrics) Describe(ch chan<- *prometheus.Desc) {
m.buildInfo.Describe(ch)
m.accessKeys.Describe(ch)
m.ports.Describe(ch)
m.addedNatEntries.Describe(ch)
m.removedNatEntries.Describe(ch)
}

func (m *serverMetrics) Collect(ch chan<- prometheus.Metric) {
m.buildInfo.Collect(ch)
m.accessKeys.Collect(ch)
m.ports.Collect(ch)
m.addedNatEntries.Collect(ch)
m.removedNatEntries.Collect(ch)
}

func (m *serverMetrics) SetVersion(version string) {
Expand All @@ -71,3 +89,11 @@ func (m *serverMetrics) SetNumAccessKeys(numKeys int, ports int) {
m.accessKeys.Set(float64(numKeys))
m.ports.Set(float64(ports))
}

func (m *serverMetrics) AddNATEntry() {
m.addedNatEntries.Inc()
}

func (m *serverMetrics) RemoveNATEntry() {
m.removedNatEntries.Inc()
}
89 changes: 43 additions & 46 deletions internal/integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,53 +261,50 @@ func TestRestrictedAddresses(t *testing.T) {
assert.ElementsMatch(t, testMetrics.statuses, expectedStatus)
}

// Stub metrics implementation for testing NAT behaviors.

type natTestMetrics struct {
natEntriesAdded int
}

var _ service.NATMetrics = (*natTestMetrics)(nil)

func (m *natTestMetrics) AddNATEntry() {
m.natEntriesAdded++
}
func (m *natTestMetrics) RemoveNATEntry() {}

// Metrics about one UDP packet.
type udpRecord struct {
clientAddr net.Addr
accessKey, status string
in, out int64
}

type fakeUDPConnMetrics struct {
clientAddr net.Addr
accessKey string
up, down []udpRecord
mu sync.Mutex
type fakeUDPAssocationMetrics struct {
accessKey string
up, down []udpRecord
mu sync.Mutex
}

var _ service.UDPConnMetrics = (*fakeUDPConnMetrics)(nil)
var _ service.UDPAssocationMetrics = (*fakeUDPAssocationMetrics)(nil)

func (m *fakeUDPConnMetrics) AddPacketFromClient(status string, clientProxyBytes, proxyTargetBytes int64) {
m.mu.Lock()
defer m.mu.Unlock()
m.up = append(m.up, udpRecord{m.clientAddr, m.accessKey, status, clientProxyBytes, proxyTargetBytes})
func (m *fakeUDPAssocationMetrics) AddAuthenticated(key string) {
m.accessKey = key
}

func (m *fakeUDPConnMetrics) AddPacketFromTarget(status string, targetProxyBytes, proxyClientBytes int64) {
func (m *fakeUDPAssocationMetrics) AddPacketFromClient(status string, clientProxyBytes, proxyTargetBytes int64) {
m.mu.Lock()
defer m.mu.Unlock()
m.down = append(m.down, udpRecord{m.clientAddr, m.accessKey, status, targetProxyBytes, proxyClientBytes})
m.up = append(m.up, udpRecord{m.accessKey, status, clientProxyBytes, proxyTargetBytes})
}

func (m *fakeUDPConnMetrics) RemoveNatEntry() {
// Not tested because it requires waiting for a long timeout.
}

// Fake metrics implementation for UDP
type fakeUDPMetrics struct {
connMetrics []fakeUDPConnMetrics
func (m *fakeUDPAssocationMetrics) AddPacketFromTarget(status string, targetProxyBytes, proxyClientBytes int64) {
m.mu.Lock()
defer m.mu.Unlock()
m.down = append(m.down, udpRecord{m.accessKey, status, targetProxyBytes, proxyClientBytes})
}

var _ service.UDPMetrics = (*fakeUDPMetrics)(nil)

func (m *fakeUDPMetrics) AddUDPNatEntry(clientAddr net.Addr, accessKey string) service.UDPConnMetrics {
cm := fakeUDPConnMetrics{
clientAddr: clientAddr,
accessKey: accessKey,
}
m.connMetrics = append(m.connMetrics, cm)
return &m.connMetrics[len(m.connMetrics)-1]
}
func (m *fakeUDPAssocationMetrics) AddClosed() {}

func TestUDPEcho(t *testing.T) {
echoConn, echoRunning := startUDPEchoServer(t)
Expand All @@ -321,12 +318,14 @@ func TestUDPEcho(t *testing.T) {
if err != nil {
t.Fatal(err)
}
testMetrics := &fakeUDPMetrics{}
proxy := service.NewPacketHandler(time.Hour, cipherList, testMetrics, &fakeShadowsocksMetrics{})
proxy := service.NewAssociationHandler(time.Hour, cipherList, &fakeShadowsocksMetrics{})

proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
natMetrics := &natTestMetrics{}
associationMetrics := &fakeUDPAssocationMetrics{}
go func() {
service.PacketServe(proxyConn, proxy.Handle)
service.PacketServe(proxyConn, func(conn net.Conn) { proxy.Handle(conn, associationMetrics) }, natMetrics)
done <- struct{}{}
}()

Expand Down Expand Up @@ -372,22 +371,20 @@ func TestUDPEcho(t *testing.T) {
snapshot := cipherList.SnapshotForClientIP(netip.Addr{})
keyID := snapshot[0].Value.(*service.CipherEntry).ID

require.Lenf(t, testMetrics.connMetrics, 1, "Wrong NAT count")
require.Equal(t, natMetrics.natEntriesAdded, 1, "Wrong NAT count")

testMetrics.connMetrics[0].mu.Lock()
defer testMetrics.connMetrics[0].mu.Unlock()
associationMetrics.mu.Lock()
defer associationMetrics.mu.Unlock()

require.Lenf(t, testMetrics.connMetrics[0].up, 1, "Wrong number of packets sent")
record := testMetrics.connMetrics[0].up[0]
require.Equal(t, conn.LocalAddr(), record.clientAddr, "Bad upstream metrics")
require.Lenf(t, associationMetrics.up, 1, "Wrong number of packets sent")
record := associationMetrics.up[0]
require.Equal(t, keyID, record.accessKey, "Bad upstream metrics")
require.Equal(t, "OK", record.status, "Bad upstream metrics")
require.Greater(t, record.in, record.out, "Bad upstream metrics")
require.Equal(t, int64(N), record.out, "Bad upstream metrics")

require.Lenf(t, testMetrics.connMetrics[0].down, 1, "Wrong number of packets received")
record = testMetrics.connMetrics[0].down[0]
require.Equal(t, conn.LocalAddr(), record.clientAddr, "Bad downstream metrics")
require.Lenf(t, associationMetrics.down, 1, "Wrong number of packets received")
record = associationMetrics.down[0]
require.Equal(t, keyID, record.accessKey, "Bad downstream metrics")
require.Equal(t, "OK", record.status, "Bad downstream metrics")
require.Greater(t, record.out, record.in, "Bad downstream metrics")
Expand Down Expand Up @@ -552,11 +549,11 @@ func BenchmarkUDPEcho(b *testing.B) {
if err != nil {
b.Fatal(err)
}
proxy := service.NewPacketHandler(time.Hour, cipherList, &service.NoOpUDPMetrics{}, &fakeShadowsocksMetrics{})
proxy := service.NewAssociationHandler(time.Hour, cipherList, &fakeShadowsocksMetrics{})
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
service.PacketServe(server, proxy.Handle)
service.PacketServe(server, func(conn net.Conn) { proxy.Handle(conn, &service.NoOpUDPAssocationMetrics{}) }, &natTestMetrics{})
done <- struct{}{}
}()

Expand Down Expand Up @@ -596,11 +593,11 @@ func BenchmarkUDPManyKeys(b *testing.B) {
if err != nil {
b.Fatal(err)
}
proxy := service.NewPacketHandler(time.Hour, cipherList, &service.NoOpUDPMetrics{}, &fakeShadowsocksMetrics{})
proxy := service.NewAssociationHandler(time.Hour, cipherList, &fakeShadowsocksMetrics{})
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
service.PacketServe(proxyConn, proxy.Handle)
service.PacketServe(proxyConn, func(conn net.Conn) { proxy.Handle(conn, &service.NoOpUDPAssocationMetrics{}) }, &natTestMetrics{})
done <- struct{}{}
}()

Expand Down
Loading

0 comments on commit ba26dd6

Please sign in to comment.