From dd5f439b9043c8e8a0c1ac29dcd7aa829eab5f12 Mon Sep 17 00:00:00 2001 From: sbruens Date: Thu, 28 Nov 2024 00:56:32 -0500 Subject: [PATCH 1/8] feat: add WebSocket support to the existing `outline-ss-server` --- cmd/outline-ss-server/config.go | 80 +++++++++++++++++++----- cmd/outline-ss-server/config_example.yml | 9 +++ cmd/outline-ss-server/config_test.go | 26 ++++++++ cmd/outline-ss-server/main.go | 74 ++++++++++++++++++++++ 4 files changed, 174 insertions(+), 15 deletions(-) diff --git a/cmd/outline-ss-server/config.go b/cmd/outline-ss-server/config.go index d85fd72a..66825410 100644 --- a/cmd/outline-ss-server/config.go +++ b/cmd/outline-ss-server/config.go @@ -28,12 +28,30 @@ type ServiceConfig struct { type ListenerType string -const listenerTypeTCP ListenerType = "tcp" -const listenerTypeUDP ListenerType = "udp" +const ( + listenerTypeTCP ListenerType = "tcp" + listenerTypeUDP ListenerType = "udp" + listenerTypeWebSocket ListenerType = "websocket" +) + +type ConnectionType string + +const ( + connectionTypeStream ConnectionType = "stream" + connectionTypePacket ConnectionType = "packet" +) + +type ConfigOption struct { + Path string `yaml:"path"` + ConnectionType ConnectionType `yaml:"connection_type"` +} type ListenerConfig struct { Type ListenerType Address string + + // WebSocket config options + Options []ConfigOption `yaml:"options,omitempty"` } type KeyConfig struct { @@ -60,27 +78,59 @@ func (c *Config) Validate() error { existingListeners := make(map[string]bool) for _, serviceConfig := range c.Services { for _, lnConfig := range serviceConfig.Listeners { - // TODO: Support more listener types. - if lnConfig.Type != listenerTypeTCP && lnConfig.Type != listenerTypeUDP { + var key string + switch lnConfig.Type { + case listenerTypeTCP, listenerTypeUDP: + if err := validateAddress(lnConfig.Address); err != nil { + return err + } + key = fmt.Sprintf("%s/%s", lnConfig.Type, lnConfig.Address) + if _, exists := existingListeners[key]; exists { + return fmt.Errorf("listener of type `%s` with address `%s` already exists.", lnConfig.Type, lnConfig.Address) + } + + case listenerTypeWebSocket: + if err := validateAddress(lnConfig.Address); err != nil { + return err + } + if len(lnConfig.Options) == 0 { + return fmt.Errorf("listener type `%s` requires at least one option", lnConfig.Type) + } + for _, option := range lnConfig.Options { + if option.Path == "" { + return fmt.Errorf("listener type `%s` requires a `path` for each option", lnConfig.Type) + } + if option.ConnectionType != connectionTypeStream && option.ConnectionType != connectionTypePacket { + return fmt.Errorf("unsupported connection type: %s", option.ConnectionType) + } + key = fmt.Sprintf("%s/%s/%s", lnConfig.Type, lnConfig.Address, option.Path) + if _, exists := existingListeners[key]; exists { + return fmt.Errorf("listener of type `%s` with address `%s` and path `%s` already exists.", lnConfig.Type, lnConfig.Address, option.Path) + } + existingListeners[key] = true + } + + default: return fmt.Errorf("unsupported listener type: %s", lnConfig.Type) } - host, _, err := net.SplitHostPort(lnConfig.Address) - if err != nil { - return fmt.Errorf("invalid listener address `%s`: %v", lnConfig.Address, err) - } - if ip := net.ParseIP(host); ip == nil { - return fmt.Errorf("address must be IP, found: %s", host) - } - key := string(lnConfig.Type) + "/" + lnConfig.Address - if _, exists := existingListeners[key]; exists { - return fmt.Errorf("listener of type %s with address %s already exists.", lnConfig.Type, lnConfig.Address) - } + existingListeners[key] = true } } return nil } +func validateAddress(addr string) error { + host, _, err := net.SplitHostPort(addr) + if err != nil { + return fmt.Errorf("invalid listener address `%s`: %v", addr, err) + } + if ip := net.ParseIP(host); ip == nil { + return fmt.Errorf("address must be IP, found: %s", host) + } + return nil +} + // readConfig attempts to read a config from a filename and parses it as a [Config]. func readConfig(configData []byte) (*Config, error) { config := Config{} diff --git a/cmd/outline-ss-server/config_example.yml b/cmd/outline-ss-server/config_example.yml index 7131c4cb..2d4278b1 100644 --- a/cmd/outline-ss-server/config_example.yml +++ b/cmd/outline-ss-server/config_example.yml @@ -20,6 +20,15 @@ services: address: "[::]:9000" - type: udp address: "[::]:9000" + - type: websocket + address: "[::]:8000" + options: + # TCP over WebSocket + - path: "/tcp" + connection_type: "stream" + # UDP over WebSocket + - path: "/udp" + connection_type: "packet" keys: - id: user-0 cipher: chacha20-ietf-poly1305 diff --git a/cmd/outline-ss-server/config_test.go b/cmd/outline-ss-server/config_test.go index f183ff5a..ce40e249 100644 --- a/cmd/outline-ss-server/config_test.go +++ b/cmd/outline-ss-server/config_test.go @@ -79,6 +79,18 @@ func TestValidateConfigFails(t *testing.T) { }, }, }, + { + name: "WithWebSocketWithoutOptions", + cfg: &Config{ + Services: []ServiceConfig{ + ServiceConfig{ + Listeners: []ListenerConfig{ + ListenerConfig{Type: listenerTypeWebSocket, Address: "[::]:9000"}, + }, + }, + }, + }, + }, } for _, tc := range tests { @@ -99,6 +111,20 @@ func TestReadConfig(t *testing.T) { Listeners: []ListenerConfig{ ListenerConfig{Type: listenerTypeTCP, Address: "[::]:9000"}, ListenerConfig{Type: listenerTypeUDP, Address: "[::]:9000"}, + ListenerConfig{ + Type: listenerTypeWebSocket, + Address: "[::]:8000", + Options: []ConfigOption{ + { + Path: "/tcp", + ConnectionType: connectionTypeStream, + }, + { + Path: "/udp", + ConnectionType: connectionTypePacket, + }, + }, + }, }, Keys: []KeyConfig{ KeyConfig{"user-0", "chacha20-ietf-poly1305", "Secret0"}, diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index 35ab2a93..0656ca89 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -16,6 +16,7 @@ package main import ( "container/list" + "context" "flag" "fmt" "log/slog" @@ -27,6 +28,7 @@ import ( "syscall" "time" + "github.com/Jigsaw-Code/outline-sdk/transport" "github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks" "github.com/Jigsaw-Code/outline-ss-server/ipinfo" outline_prometheus "github.com/Jigsaw-Code/outline-ss-server/prometheus" @@ -34,6 +36,7 @@ import ( "github.com/lmittmann/tint" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/net/websocket" "golang.org/x/term" ) @@ -272,6 +275,52 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) { } slog.Info("UDP service started.", "address", pc.LocalAddr().String()) go service.PacketServe(pc, ssService.HandleAssociation, s.serverMetrics) + case listenerTypeWebSocket: + for _, option := range lnConfig.Options { + switch option.ConnectionType { + case connectionTypeStream: + http.HandleFunc(option.Path, func(w http.ResponseWriter, r *http.Request) { + fmt.Println("STREAM FOUND") + handler := func(wsConn *websocket.Conn) { + //defer wsConn.Close() + ctx, contextCancel := context.WithCancel(context.Background()) + defer contextCancel() + raddr, err := net.ResolveTCPAddr("tcp", r.RemoteAddr) + if err != nil { + slog.Error("failed to upgrade", "err", err) + w.WriteHeader(http.StatusBadGateway) + return + } + conn := &wsToStreamConn{&wrappedConn{Conn: wsConn, raddr: raddr}} + ssService.HandleStream(ctx, conn) + } + websocket.Handler(handler).ServeHTTP(w, r) + }) + case connectionTypePacket: + http.HandleFunc(option.Path, func(w http.ResponseWriter, r *http.Request) { + fmt.Println("PACKET FOUND") + handler := func(wsConn *websocket.Conn) { + raddr, err := net.ResolveUDPAddr("udp", r.RemoteAddr) + if err != nil { + slog.Error("failed to upgrade", "err", err) + w.WriteHeader(http.StatusBadGateway) + return + } + conn := &wrappedConn{Conn: wsConn, raddr: raddr} + ssService.HandleAssociation(conn) + } + websocket.Handler(handler).ServeHTTP(w, r) + }) + } + + slog.Info("WebSocket service started.", "address", lnConfig.Address, "path", option.Path) + } + go func() { + err := http.ListenAndServe(lnConfig.Address, nil) + if err != nil { + slog.Error("Failed to run HTTP server.", "err", err) + } + }() } } totalCipherCount += len(serviceConfig.Keys) @@ -339,6 +388,31 @@ func RunOutlineServer(filename string, natTimeout time.Duration, serverMetrics * return server, nil } +// wrappedConn overrides [websocket.Conn]'s remote address handling. +type wrappedConn struct { + *websocket.Conn + raddr net.Addr +} + +func (c wrappedConn) RemoteAddr() net.Addr { + return c.raddr +} + +// wsToStreamConn converts a [websocket.Conn] to a [transport.StreamConn]. +type wsToStreamConn struct { + net.Conn +} + +var _ transport.StreamConn = (*wsToStreamConn)(nil) + +func (c wsToStreamConn) CloseRead() error { + return c.Close() +} + +func (c wsToStreamConn) CloseWrite() error { + return nil +} + func main() { slog.SetDefault(slog.New(logHandler)) From e7695855d0304fea0b50e2e360160d1b72322552 Mon Sep 17 00:00:00 2001 From: sbruens Date: Tue, 3 Dec 2024 00:25:56 -0500 Subject: [PATCH 2/8] Fix config reloads. --- cmd/outline-ss-server/main.go | 38 ++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index 0656ca89..6c213377 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -24,6 +24,7 @@ import ( "net/http" "os" "os/signal" + "strings" "sync" "syscall" "time" @@ -59,6 +60,16 @@ func init() { ) } +type WebSocketStreamListener struct { + service.StreamListener +} + +var _ net.Listener = (*WebSocketStreamListener)(nil) + +func (t *WebSocketStreamListener) Accept() (net.Conn, error) { + return t.StreamListener.AcceptStream() +} + type OutlineServer struct { stopConfig func() error lnManager service.ListenerManager @@ -276,13 +287,16 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) { slog.Info("UDP service started.", "address", pc.LocalAddr().String()) go service.PacketServe(pc, ssService.HandleAssociation, s.serverMetrics) case listenerTypeWebSocket: + ln, err := lnSet.ListenStream(lnConfig.Address) + if err != nil { + return err + } + mux := http.NewServeMux() for _, option := range lnConfig.Options { switch option.ConnectionType { case connectionTypeStream: - http.HandleFunc(option.Path, func(w http.ResponseWriter, r *http.Request) { - fmt.Println("STREAM FOUND") + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { handler := func(wsConn *websocket.Conn) { - //defer wsConn.Close() ctx, contextCancel := context.WithCancel(context.Background()) defer contextCancel() raddr, err := net.ResolveTCPAddr("tcp", r.RemoteAddr) @@ -296,9 +310,9 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) { } websocket.Handler(handler).ServeHTTP(w, r) }) + mux.Handle(option.Path, http.StripPrefix(option.Path, handler)) case connectionTypePacket: - http.HandleFunc(option.Path, func(w http.ResponseWriter, r *http.Request) { - fmt.Println("PACKET FOUND") + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { handler := func(wsConn *websocket.Conn) { raddr, err := net.ResolveUDPAddr("udp", r.RemoteAddr) if err != nil { @@ -311,13 +325,15 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) { } websocket.Handler(handler).ServeHTTP(w, r) }) + mux.Handle(option.Path, http.StripPrefix(option.Path, handler)) } - - slog.Info("WebSocket service started.", "address", lnConfig.Address, "path", option.Path) + slog.Info("WebSocket service started.", "address", ln.Addr().String(), "path", option.Path) } + server := http.Server{Handler: mux} go func() { - err := http.ListenAndServe(lnConfig.Address, nil) - if err != nil { + defer server.Shutdown(context.Background()) + err := server.Serve(&WebSocketStreamListener{ln}) + if err != nil && err != http.ErrServerClosed && !isErrClosing(err) { slog.Error("Failed to run HTTP server.", "err", err) } }() @@ -362,6 +378,10 @@ func (s *OutlineServer) Stop() error { return nil } +func isErrClosing(err error) bool { + return strings.Contains(err.Error(), "use of closed network connection") +} + // RunOutlineServer starts an Outline server running, and returns the server or an error. func RunOutlineServer(filename string, natTimeout time.Duration, serverMetrics *serverMetrics, serviceMetrics service.ServiceMetrics, replayHistory int) (*OutlineServer, error) { server := &OutlineServer{ From 6eb828ef5b3f50903e5c08262ce92442e7c13933 Mon Sep 17 00:00:00 2001 From: sbruens Date: Wed, 4 Dec 2024 13:27:35 -0500 Subject: [PATCH 3/8] Fix StreamConn wrapper. --- cmd/outline-ss-server/main.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index 6c213377..9a5faab1 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -305,7 +305,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) { w.WriteHeader(http.StatusBadGateway) return } - conn := &wsToStreamConn{&wrappedConn{Conn: wsConn, raddr: raddr}} + conn := &streamConn{&wrappedConn{Conn: wsConn, raddr: raddr}} ssService.HandleStream(ctx, conn) } websocket.Handler(handler).ServeHTTP(w, r) @@ -418,19 +418,18 @@ func (c wrappedConn) RemoteAddr() net.Addr { return c.raddr } -// wsToStreamConn converts a [websocket.Conn] to a [transport.StreamConn]. -type wsToStreamConn struct { +type streamConn struct { net.Conn } -var _ transport.StreamConn = (*wsToStreamConn)(nil) +var _ transport.StreamConn = (*streamConn)(nil) -func (c wsToStreamConn) CloseRead() error { +func (c *streamConn) CloseRead() error { return c.Close() } -func (c wsToStreamConn) CloseWrite() error { - return nil +func (c *streamConn) CloseWrite() error { + return c.Close() } func main() { From 831371a97b678e9c96169decca7e3d4f76945bbc Mon Sep 17 00:00:00 2001 From: sbruens Date: Tue, 10 Dec 2024 12:34:04 -0500 Subject: [PATCH 4/8] Move web server config into its own top-level config structure. --- cmd/outline-ss-server/config.go | 92 +++++++++-------- cmd/outline-ss-server/config_example.yml | 21 ++-- cmd/outline-ss-server/main.go | 125 +++++++++++++---------- 3 files changed, 132 insertions(+), 106 deletions(-) diff --git a/cmd/outline-ss-server/config.go b/cmd/outline-ss-server/config.go index 66825410..6c22efd7 100644 --- a/cmd/outline-ss-server/config.go +++ b/cmd/outline-ss-server/config.go @@ -22,59 +22,72 @@ import ( ) type ServiceConfig struct { - Listeners []ListenerConfig - Keys []KeyConfig + Listeners []ListenerConfig `yaml:"listeners"` + Keys []KeyConfig `yaml:"keys"` } type ListenerType string const ( - listenerTypeTCP ListenerType = "tcp" - listenerTypeUDP ListenerType = "udp" - listenerTypeWebSocket ListenerType = "websocket" + listenerTypeTCP ListenerType = "tcp" + listenerTypeUDP ListenerType = "udp" + listenerTypeWebsocketStream ListenerType = "websocket-stream" + listenerTypeWebsocketPacket ListenerType = "websocket-packet" ) -type ConnectionType string - -const ( - connectionTypeStream ConnectionType = "stream" - connectionTypePacket ConnectionType = "packet" -) - -type ConfigOption struct { - Path string `yaml:"path"` - ConnectionType ConnectionType `yaml:"connection_type"` +type WebServerConfig struct { + ID string `yaml:"id"` + Listeners []string `yaml:"listen"` } type ListenerConfig struct { - Type ListenerType - Address string - - // WebSocket config options - Options []ConfigOption `yaml:"options,omitempty"` + Type ListenerType `yaml:"type"` + Address string `yaml:"address,omitempty"` + WebServer string `yaml:"web_server,omitempty"` + Path string `yaml:"path,omitempty"` } type KeyConfig struct { - ID string - Cipher string - Secret string + ID string `yaml:"id"` + Cipher string `yaml:"cipher"` + Secret string `yaml:"secret"` } type LegacyKeyServiceConfig struct { KeyConfig `yaml:",inline"` - Port int + Port int `yaml:"port"` } type Config struct { - Services []ServiceConfig + Web struct { + Servers []WebServerConfig `yaml:"servers"` + } `yaml:"web"` + Services []ServiceConfig `yaml:"services"` // Deprecated: `keys` exists for backward compatibility. Prefer to configure // using the newer `services` format. - Keys []LegacyKeyServiceConfig + Keys []LegacyKeyServiceConfig `yaml:"keys"` } // Validate checks that the config is valid. func (c *Config) Validate() error { + existingWebServers := make(map[string]bool) + for _, srv := range c.Web.Servers { + if srv.ID == "" { + return fmt.Errorf("web server must have an ID") + } + if _, exists := existingWebServers[srv.ID]; exists { + return fmt.Errorf("web server with ID `%s` already exists", srv.ID) + } + existingWebServers[srv.ID] = true + + for _, addr := range srv.Listeners { + if err := validateAddress(addr); err != nil { + return fmt.Errorf("invalid listener for web server `%s`: %w", srv.ID, err) + } + } + } + existingListeners := make(map[string]bool) for _, serviceConfig := range c.Services { for _, lnConfig := range serviceConfig.Listeners { @@ -88,28 +101,17 @@ func (c *Config) Validate() error { if _, exists := existingListeners[key]; exists { return fmt.Errorf("listener of type `%s` with address `%s` already exists.", lnConfig.Type, lnConfig.Address) } - - case listenerTypeWebSocket: - if err := validateAddress(lnConfig.Address); err != nil { - return err + case listenerTypeWebsocketStream, listenerTypeWebsocketPacket: + if lnConfig.WebServer == "" { + return fmt.Errorf("listener type `%s` requires an http server reference", lnConfig.Type) } - if len(lnConfig.Options) == 0 { - return fmt.Errorf("listener type `%s` requires at least one option", lnConfig.Type) + if _, exists := existingWebServers[lnConfig.WebServer]; !exists { + return fmt.Errorf("listener type `%s` references unknown web server `%s`", lnConfig.Type, lnConfig.WebServer) } - for _, option := range lnConfig.Options { - if option.Path == "" { - return fmt.Errorf("listener type `%s` requires a `path` for each option", lnConfig.Type) - } - if option.ConnectionType != connectionTypeStream && option.ConnectionType != connectionTypePacket { - return fmt.Errorf("unsupported connection type: %s", option.ConnectionType) - } - key = fmt.Sprintf("%s/%s/%s", lnConfig.Type, lnConfig.Address, option.Path) - if _, exists := existingListeners[key]; exists { - return fmt.Errorf("listener of type `%s` with address `%s` and path `%s` already exists.", lnConfig.Type, lnConfig.Address, option.Path) - } - existingListeners[key] = true + key = fmt.Sprintf("%s/%s", lnConfig.Type, lnConfig.WebServer) + if _, exists := existingListeners[key]; exists { + return fmt.Errorf("listener of type `%s` with http server `%s` already exists.", lnConfig.Type, lnConfig.WebServer) } - default: return fmt.Errorf("unsupported listener type: %s", lnConfig.Type) } diff --git a/cmd/outline-ss-server/config_example.yml b/cmd/outline-ss-server/config_example.yml index 2d4278b1..54a3365d 100644 --- a/cmd/outline-ss-server/config_example.yml +++ b/cmd/outline-ss-server/config_example.yml @@ -12,6 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +web: + servers: + - id: my_web_server + listen: + - "[::]:8000" + services: - listeners: # TODO(sbruens): Allow a string-based listener config, as a convenient short-form @@ -20,15 +26,12 @@ services: address: "[::]:9000" - type: udp address: "[::]:9000" - - type: websocket - address: "[::]:8000" - options: - # TCP over WebSocket - - path: "/tcp" - connection_type: "stream" - # UDP over WebSocket - - path: "/udp" - connection_type: "packet" + - type: websocket-stream + web_server: my_web_server + path: "/tcp" + - type: websocket-packet + web_server: my_web_server + path: "/udp" keys: - id: user-0 cipher: chacha20-ietf-poly1305 diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index 9a5faab1..db602cc7 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -17,6 +17,7 @@ package main import ( "container/list" "context" + "errors" "flag" "fmt" "log/slog" @@ -60,13 +61,13 @@ func init() { ) } -type WebSocketStreamListener struct { +type HTTPStreamListener struct { service.StreamListener } -var _ net.Listener = (*WebSocketStreamListener)(nil) +var _ net.Listener = (*HTTPStreamListener)(nil) -func (t *WebSocketStreamListener) Accept() (net.Conn, error) { +func (t *HTTPStreamListener) Accept() (net.Conn, error) { return t.StreamListener.AcceptStream() } @@ -194,6 +195,11 @@ func (ls *listenerSet) Len() int { return len(ls.listenerCloseFuncs) } +type connWithDone struct { + net.Conn + doneCh chan struct{} +} + func (s *OutlineServer) runConfig(config Config) (func() error, error) { startErrCh := make(chan error) stopErrCh := make(chan error) @@ -201,7 +207,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) { go func() { lnSet := &listenerSet{ - manager: s.lnManager, + manager: s.lnManager, listenerCloseFuncs: make(map[string]func() error), } defer func() { @@ -209,6 +215,29 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) { }() startErrCh <- func() error { + // Start configured web servers. + webServers := make(map[string]*http.ServeMux) + for _, srvConfig := range config.Web.Servers { + mux := http.NewServeMux() + for _, addr := range srvConfig.Listeners { + server := &http.Server{Addr: addr, Handler: mux} + ln, err := lnSet.ListenStream(addr) + if err != nil { + return fmt.Errorf("failed to listen on %s: %w", addr, err) + } + go func() { + defer server.Shutdown(context.Background()) + err := server.Serve(&HTTPStreamListener{ln}) + if err != nil && err != http.ErrServerClosed && !isErrClosing(err) { + slog.Error("Failed to run web server.", "err", err, "ID", srvConfig.ID) + } + }() + slog.Info("Web server started.", "ID", srvConfig.ID, "address", addr) + } + webServers[srvConfig.ID] = mux + } + + // Start legacy services. totalCipherCount := len(config.Keys) portCiphers := make(map[int]*list.List) // Values are *List of *CipherEntry. for _, keyConfig := range config.Keys { @@ -254,7 +283,8 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) { slog.Info("UDP service started.", "address", pc.LocalAddr().String()) go service.PacketServe(pc, ssService.HandleAssociation, s.serverMetrics) } - + + // Start services with listeners. for _, serviceConfig := range config.Services { ciphers, err := newCipherListFromConfig(serviceConfig) if err != nil { @@ -286,57 +316,48 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) { } slog.Info("UDP service started.", "address", pc.LocalAddr().String()) go service.PacketServe(pc, ssService.HandleAssociation, s.serverMetrics) - case listenerTypeWebSocket: - ln, err := lnSet.ListenStream(lnConfig.Address) - if err != nil { - return err + case listenerTypeWebsocketStream: + if _, exists := webServers[lnConfig.WebServer]; !exists { + return fmt.Errorf("listener type `%s` references unknown web server `%s`", lnConfig.Type, lnConfig.WebServer) } - mux := http.NewServeMux() - for _, option := range lnConfig.Options { - switch option.ConnectionType { - case connectionTypeStream: - handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - handler := func(wsConn *websocket.Conn) { - ctx, contextCancel := context.WithCancel(context.Background()) - defer contextCancel() - raddr, err := net.ResolveTCPAddr("tcp", r.RemoteAddr) - if err != nil { - slog.Error("failed to upgrade", "err", err) - w.WriteHeader(http.StatusBadGateway) - return - } - conn := &streamConn{&wrappedConn{Conn: wsConn, raddr: raddr}} - ssService.HandleStream(ctx, conn) - } - websocket.Handler(handler).ServeHTTP(w, r) - }) - mux.Handle(option.Path, http.StripPrefix(option.Path, handler)) - case connectionTypePacket: - handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - handler := func(wsConn *websocket.Conn) { - raddr, err := net.ResolveUDPAddr("udp", r.RemoteAddr) - if err != nil { - slog.Error("failed to upgrade", "err", err) - w.WriteHeader(http.StatusBadGateway) - return - } - conn := &wrappedConn{Conn: wsConn, raddr: raddr} - ssService.HandleAssociation(conn) - } - websocket.Handler(handler).ServeHTTP(w, r) - }) - mux.Handle(option.Path, http.StripPrefix(option.Path, handler)) + mux := webServers[lnConfig.WebServer] + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handler := func(wsConn *websocket.Conn) { + ctx, contextCancel := context.WithCancel(context.Background()) + defer contextCancel() + raddr, err := net.ResolveTCPAddr("tcp", r.RemoteAddr) + if err != nil { + slog.Error("failed to upgrade", "err", err) + w.WriteHeader(http.StatusBadGateway) + return + } + conn := &streamConn{&wrappedConn{Conn: wsConn, raddr: raddr}} + ssService.HandleStream(ctx, conn) } - slog.Info("WebSocket service started.", "address", ln.Addr().String(), "path", option.Path) + websocket.Handler(handler).ServeHTTP(w, r) + }) + mux.Handle(lnConfig.Path, http.StripPrefix(lnConfig.Path, handler)) + case listenerTypeWebsocketPacket: + if _, exists := webServers[lnConfig.WebServer]; !exists { + return fmt.Errorf("listener type `%s` references unknown web server `%s`", lnConfig.Type, lnConfig.WebServer) } - server := http.Server{Handler: mux} - go func() { - defer server.Shutdown(context.Background()) - err := server.Serve(&WebSocketStreamListener{ln}) - if err != nil && err != http.ErrServerClosed && !isErrClosing(err) { - slog.Error("Failed to run HTTP server.", "err", err) + mux := webServers[lnConfig.WebServer] + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handler := func(wsConn *websocket.Conn) { + raddr, err := net.ResolveUDPAddr("udp", r.RemoteAddr) + if err != nil { + slog.Error("failed to upgrade", "err", err) + w.WriteHeader(http.StatusBadGateway) + return + } + conn := &wrappedConn{Conn: wsConn, raddr: raddr} + ssService.HandleAssociation(conn) } - }() + websocket.Handler(handler).ServeHTTP(w, r) + }) + mux.Handle(lnConfig.Path, http.StripPrefix(lnConfig.Path, handler)) + default: + return errors.New("unsupported listener configuration") } } totalCipherCount += len(serviceConfig.Keys) From db09289ba3bca6e40a424f0b1dfd0b2c001ebc91 Mon Sep 17 00:00:00 2001 From: sbruens Date: Tue, 10 Dec 2024 12:40:29 -0500 Subject: [PATCH 5/8] Add a TODO to create a new `ClientConn` struct. --- cmd/outline-ss-server/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index db602cc7..197098e9 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -429,6 +429,7 @@ func RunOutlineServer(filename string, natTimeout time.Duration, serverMetrics * return server, nil } +// TODO: Create a dedicated `ClientConn` struct with `ClientAddr` and `Conn`. // wrappedConn overrides [websocket.Conn]'s remote address handling. type wrappedConn struct { *websocket.Conn From 21cae177d982623aa590de8d86b1dd5fabe0a1e3 Mon Sep 17 00:00:00 2001 From: sbruens Date: Tue, 10 Dec 2024 13:09:23 -0500 Subject: [PATCH 6/8] Update tests for config validation. --- cmd/outline-ss-server/config.go | 15 +- cmd/outline-ss-server/config_test.go | 419 +++++++++++++++++++-------- 2 files changed, 311 insertions(+), 123 deletions(-) diff --git a/cmd/outline-ss-server/config.go b/cmd/outline-ss-server/config.go index 6c22efd7..f677d198 100644 --- a/cmd/outline-ss-server/config.go +++ b/cmd/outline-ss-server/config.go @@ -58,10 +58,12 @@ type LegacyKeyServiceConfig struct { Port int `yaml:"port"` } +type WebConfig struct { + Servers []WebServerConfig `yaml:"servers"` +} + type Config struct { - Web struct { - Servers []WebServerConfig `yaml:"servers"` - } `yaml:"web"` + Web WebConfig `yaml:"web"` Services []ServiceConfig `yaml:"services"` // Deprecated: `keys` exists for backward compatibility. Prefer to configure @@ -103,14 +105,17 @@ func (c *Config) Validate() error { } case listenerTypeWebsocketStream, listenerTypeWebsocketPacket: if lnConfig.WebServer == "" { - return fmt.Errorf("listener type `%s` requires an http server reference", lnConfig.Type) + return fmt.Errorf("listener type `%s` requires a `web_server`", lnConfig.Type) + } + if lnConfig.Path == "" { + return fmt.Errorf("listener type `%s` requires a `path`", lnConfig.Type) } if _, exists := existingWebServers[lnConfig.WebServer]; !exists { return fmt.Errorf("listener type `%s` references unknown web server `%s`", lnConfig.Type, lnConfig.WebServer) } key = fmt.Sprintf("%s/%s", lnConfig.Type, lnConfig.WebServer) if _, exists := existingListeners[key]; exists { - return fmt.Errorf("listener of type `%s` with http server `%s` already exists.", lnConfig.Type, lnConfig.WebServer) + return fmt.Errorf("listener of type `%s` with web server `%s` already exists.", lnConfig.Type, lnConfig.WebServer) } default: return fmt.Errorf("unsupported listener type: %s", lnConfig.Type) diff --git a/cmd/outline-ss-server/config_test.go b/cmd/outline-ss-server/config_test.go index ce40e249..7c71cf08 100644 --- a/cmd/outline-ss-server/config_test.go +++ b/cmd/outline-ss-server/config_test.go @@ -16,178 +16,361 @@ package main import ( "os" + "strings" "testing" "github.com/stretchr/testify/require" ) -func TestValidateConfigFails(t *testing.T) { - tests := []struct { - name string - cfg *Config - }{ - { - name: "WithUnknownListenerType", - cfg: &Config{ - Services: []ServiceConfig{ - ServiceConfig{ - Listeners: []ListenerConfig{ - ListenerConfig{Type: "foo", Address: "[::]:9000"}, +func TestConfigValidate(t *testing.T) { + t.Run("InvalidConfig", func(t *testing.T) { + tests := []struct { + name string + cfg *Config + errStr string + }{ + { + name: "UnknownListenerType", + cfg: &Config{ + Services: []ServiceConfig{ + ServiceConfig{ + Listeners: []ListenerConfig{ + ListenerConfig{Type: "foo", Address: "[::]:9000"}, + }, }, }, }, + errStr: "unsupported listener type", }, - }, - { - name: "WithInvalidListenerAddress", - cfg: &Config{ - Services: []ServiceConfig{ - ServiceConfig{ - Listeners: []ListenerConfig{ - ListenerConfig{Type: listenerTypeTCP, Address: "tcp/[::]:9000"}, + { + name: "InvalidListenerAddress", + cfg: &Config{ + Services: []ServiceConfig{ + ServiceConfig{ + Listeners: []ListenerConfig{ + ListenerConfig{Type: listenerTypeTCP, Address: "tcp/[::]:9000"}, + }, }, }, }, + errStr: "invalid listener address", }, - }, - { - name: "WithHostnameAddress", - cfg: &Config{ - Services: []ServiceConfig{ - ServiceConfig{ - Listeners: []ListenerConfig{ - ListenerConfig{Type: listenerTypeTCP, Address: "example.com:9000"}, + { + name: "HostnameAddress", + cfg: &Config{ + Services: []ServiceConfig{ + ServiceConfig{ + Listeners: []ListenerConfig{ + ListenerConfig{Type: listenerTypeTCP, Address: "example.com:9000"}, + }, }, }, }, + errStr: "address must be IP", }, - }, - { - name: "WithDuplicateListeners", - cfg: &Config{ - Services: []ServiceConfig{ - ServiceConfig{ - Listeners: []ListenerConfig{ - ListenerConfig{Type: listenerTypeTCP, Address: "[::]:9000"}, + { + name: "DuplicateListeners", + cfg: &Config{ + Services: []ServiceConfig{ + ServiceConfig{ + Listeners: []ListenerConfig{ + ListenerConfig{Type: listenerTypeTCP, Address: "[::]:9000"}, + }, + }, + ServiceConfig{ + Listeners: []ListenerConfig{ + ListenerConfig{Type: listenerTypeTCP, Address: "[::]:9000"}, + }, }, }, - ServiceConfig{ - Listeners: []ListenerConfig{ - ListenerConfig{Type: listenerTypeTCP, Address: "[::]:9000"}, + }, + errStr: "already exists", + }, + { + name: "WebServerMissingID", + cfg: &Config{ + Web: WebConfig{ + Servers: []WebServerConfig{ + { + Listeners: []string{"[::]:8000"}, + }, }, }, + Services: []ServiceConfig{}, }, + errStr: "web server must have an ID", }, - }, - { - name: "WithWebSocketWithoutOptions", - cfg: &Config{ - Services: []ServiceConfig{ - ServiceConfig{ - Listeners: []ListenerConfig{ - ListenerConfig{Type: listenerTypeWebSocket, Address: "[::]:9000"}, + { + name: "WebServerDuplicateID", + cfg: &Config{ + Web: WebConfig{ + Servers: []WebServerConfig{ + { + ID: "foo", + Listeners: []string{"[::]:8000"}, + }, + { + ID: "foo", + Listeners: []string{"[::]:8001"}, + }, }, }, + Services: []ServiceConfig{}, }, + errStr: "already exists", }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - err := tc.cfg.Validate() - require.Error(t, err) - }) - } -} - -func TestReadConfig(t *testing.T) { - config, err := readConfigFile("./config_example.yml") - - require.NoError(t, err) - expected := Config{ - Services: []ServiceConfig{ - ServiceConfig{ - Listeners: []ListenerConfig{ - ListenerConfig{Type: listenerTypeTCP, Address: "[::]:9000"}, - ListenerConfig{Type: listenerTypeUDP, Address: "[::]:9000"}, - ListenerConfig{ - Type: listenerTypeWebSocket, - Address: "[::]:8000", - Options: []ConfigOption{ + { + name: "WebServerInvalidAddress", + cfg: &Config{ + Web: WebConfig{ + Servers: []WebServerConfig{ { - Path: "/tcp", - ConnectionType: connectionTypeStream, + ID: "foo", + Listeners: []string{":invalid"}, }, + }, + }, + Services: []ServiceConfig{}, + }, + errStr: "invalid listener for web server `foo`", + }, + { + name: "WebsocketListenerMissingWebServer", + cfg: &Config{ + Web: WebConfig{ + Servers: []WebServerConfig{ + { + ID: "foo", + Listeners: []string{"[::]:8000"}, + }, + }, + }, + Services: []ServiceConfig{ + { + Listeners: []ListenerConfig{ + { + Type: listenerTypeWebsocketStream, + Path: "/tcp", + }, + }, + }, + }, + }, + errStr: "requires a `web_server`", + }, + { + name: "WebsocketListenerUnknownWebServer", + cfg: &Config{ + Web: WebConfig{ + Servers: []WebServerConfig{ + { + ID: "foo", + Listeners: []string{"[::]:8000"}, + }, + }, + }, + Services: []ServiceConfig{ + { + Listeners: []ListenerConfig{ + { + Type: listenerTypeWebsocketStream, + WebServer: "unknown_server", + Path: "/tcp", + }, + }, + }, + }, + }, + errStr: "unknown web server `unknown_server`", + }, + { + name: "WebsocketListenerMissingPath", + cfg: &Config{ + Web: WebConfig{ + Servers: []WebServerConfig{ { - Path: "/udp", - ConnectionType: connectionTypePacket, + ID: "foo", + Listeners: []string{"[::]:8000"}, + }, + }, + }, + Services: []ServiceConfig{ + { + Listeners: []ListenerConfig{ + { + Type: listenerTypeWebsocketStream, + WebServer: "foo", + }, }, }, }, }, - Keys: []KeyConfig{ - KeyConfig{"user-0", "chacha20-ietf-poly1305", "Secret0"}, - KeyConfig{"user-1", "chacha20-ietf-poly1305", "Secret1"}, + errStr: "requires a `path`", + }, + { + name: "ListenerInvalidType", + cfg: &Config{ + Web: WebConfig{ + Servers: []WebServerConfig{ + { + ID: "foo", + Listeners: []string{"[::]:8000"}, + }, + }, + }, + Services: []ServiceConfig{ + { + Listeners: []ListenerConfig{ + { + Type: "invalid-type", + WebServer: "foo", + Path: "/tcp", + }, + }, + }, + }, }, + errStr: "unsupported listener type: invalid-type", }, - ServiceConfig{ - Listeners: []ListenerConfig{ - ListenerConfig{Type: listenerTypeTCP, Address: "[::]:9001"}, - ListenerConfig{Type: listenerTypeUDP, Address: "[::]:9001"}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.cfg.Validate() + require.Error(t, err) + if !isStrInError(err, tc.errStr) { + t.Errorf("Config.Validate() error=`%v`, expected=`%v`", err, tc.errStr) + } + }) + } + }) + + t.Run("ValidConfig", func(t *testing.T) { + config := Config{ + Web: WebConfig{ + Servers: []WebServerConfig{ + { + ID: "my_web_server", + Listeners: []string{"[::]:8000"}, + }, }, - Keys: []KeyConfig{ - KeyConfig{"user-2", "chacha20-ietf-poly1305", "Secret2"}, + }, + Services: []ServiceConfig{ + { + Listeners: []ListenerConfig{ + { + Type: listenerTypeWebsocketStream, + WebServer: "my_web_server", + Path: "/tcp", + }, + { + Type: listenerTypeWebsocketPacket, + WebServer: "my_web_server", + Path: "/udp", + }, + }, + Keys: []KeyConfig{ + { + ID: "user-0", + Cipher: "chacha20-ietf-poly1305", + Secret: "Secret0", + }, + }, }, }, - }, - } - require.Equal(t, expected, *config) + } + err := config.Validate() + require.NoError(t, err) + }) } -func TestReadConfigParsesDeprecatedFormat(t *testing.T) { - config, err := readConfigFile("./config_example.deprecated.yml") +func TestReadConfig(t *testing.T) { + + t.Run("ExampleFile", func(t *testing.T) { + config, err := readConfigFile("./config_example.yml") - require.NoError(t, err) - expected := Config{ - Keys: []LegacyKeyServiceConfig{ - LegacyKeyServiceConfig{ - KeyConfig: KeyConfig{ID: "user-0", Cipher: "chacha20-ietf-poly1305", Secret: "Secret0"}, - Port: 9000, + require.NoError(t, err) + expected := Config{ + Web: WebConfig{ + Servers: []WebServerConfig{ + WebServerConfig{ID: "my_web_server", Listeners: []string{"[::]:8000"}}, + }, }, - LegacyKeyServiceConfig{ - KeyConfig: KeyConfig{ID: "user-1", Cipher: "chacha20-ietf-poly1305", Secret: "Secret1"}, - Port: 9000, + Services: []ServiceConfig{ + ServiceConfig{ + Listeners: []ListenerConfig{ + ListenerConfig{Type: listenerTypeTCP, Address: "[::]:9000"}, + ListenerConfig{Type: listenerTypeUDP, Address: "[::]:9000"}, + ListenerConfig{Type: listenerTypeWebsocketStream, WebServer: "my_web_server", Path: "/tcp"}, + ListenerConfig{Type: listenerTypeWebsocketPacket, WebServer: "my_web_server", Path: "/udp"}, + }, + Keys: []KeyConfig{ + KeyConfig{"user-0", "chacha20-ietf-poly1305", "Secret0"}, + KeyConfig{"user-1", "chacha20-ietf-poly1305", "Secret1"}, + }, + }, + ServiceConfig{ + Listeners: []ListenerConfig{ + ListenerConfig{Type: listenerTypeTCP, Address: "[::]:9001"}, + ListenerConfig{Type: listenerTypeUDP, Address: "[::]:9001"}, + }, + Keys: []KeyConfig{ + KeyConfig{"user-2", "chacha20-ietf-poly1305", "Secret2"}, + }, + }, }, - LegacyKeyServiceConfig{ - KeyConfig: KeyConfig{ID: "user-2", Cipher: "chacha20-ietf-poly1305", Secret: "Secret2"}, - Port: 9001, + } + require.Equal(t, expected, *config) + }) + + t.Run("ParsesDeprecatedFormat", func(t *testing.T) { + config, err := readConfigFile("./config_example.deprecated.yml") + + require.NoError(t, err) + expected := Config{ + Keys: []LegacyKeyServiceConfig{ + LegacyKeyServiceConfig{ + KeyConfig: KeyConfig{ID: "user-0", Cipher: "chacha20-ietf-poly1305", Secret: "Secret0"}, + Port: 9000, + }, + LegacyKeyServiceConfig{ + KeyConfig: KeyConfig{ID: "user-1", Cipher: "chacha20-ietf-poly1305", Secret: "Secret1"}, + Port: 9000, + }, + LegacyKeyServiceConfig{ + KeyConfig: KeyConfig{ID: "user-2", Cipher: "chacha20-ietf-poly1305", Secret: "Secret2"}, + Port: 9001, + }, }, - }, - } - require.Equal(t, expected, *config) -} + } + require.Equal(t, expected, *config) + }) -func TestReadConfigFromEmptyFile(t *testing.T) { - file, _ := os.CreateTemp("", "empty.yaml") + t.Run("FromEmptyFile", func(t *testing.T) { + file, _ := os.CreateTemp("", "empty.yaml") - config, err := readConfigFile(file.Name()) + config, err := readConfigFile(file.Name()) - require.NoError(t, err) - require.ElementsMatch(t, Config{}, config) -} + require.NoError(t, err) + require.ElementsMatch(t, Config{}, config) + }) -func TestReadConfigFromIncorrectFormatFails(t *testing.T) { - file, _ := os.CreateTemp("", "empty.yaml") - file.WriteString("foo") + t.Run("FromIncorrectFormatFails", func(t *testing.T) { + file, _ := os.CreateTemp("", "empty.yaml") + file.WriteString("foo") - config, err := readConfigFile(file.Name()) + config, err := readConfigFile(file.Name()) - require.Error(t, err) - require.ElementsMatch(t, Config{}, config) + require.Error(t, err) + require.ElementsMatch(t, Config{}, config) + }) } func readConfigFile(filename string) (*Config, error) { configData, _ := os.ReadFile(filename) return readConfig(configData) } + +func isStrInError(err error, str string) bool { + return err != nil && strings.Contains(err.Error(), str) +} From e210fb09b61b97dd390e98fd06ca050080f1fb20 Mon Sep 17 00:00:00 2001 From: sbruens Date: Fri, 13 Dec 2024 13:58:19 -0500 Subject: [PATCH 7/8] Format. --- cmd/outline-ss-server/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index 197098e9..6e1c5eb8 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -207,7 +207,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) { go func() { lnSet := &listenerSet{ - manager: s.lnManager, + manager: s.lnManager, listenerCloseFuncs: make(map[string]func() error), } defer func() { @@ -283,7 +283,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) { slog.Info("UDP service started.", "address", pc.LocalAddr().String()) go service.PacketServe(pc, ssService.HandleAssociation, s.serverMetrics) } - + // Start services with listeners. for _, serviceConfig := range config.Services { ciphers, err := newCipherListFromConfig(serviceConfig) From 0c45bade27090685294c83b15806c4a6083a8e0b Mon Sep 17 00:00:00 2001 From: sbruens Date: Fri, 13 Dec 2024 14:05:08 -0500 Subject: [PATCH 8/8] Close the connection. --- cmd/outline-ss-server/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index 6e1c5eb8..80797389 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -323,6 +323,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) { mux := webServers[lnConfig.WebServer] handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { handler := func(wsConn *websocket.Conn) { + defer wsConn.Close() ctx, contextCancel := context.WithCancel(context.Background()) defer contextCancel() raddr, err := net.ResolveTCPAddr("tcp", r.RemoteAddr) @@ -344,6 +345,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) { mux := webServers[lnConfig.WebServer] handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { handler := func(wsConn *websocket.Conn) { + defer wsConn.Close() raddr, err := net.ResolveUDPAddr("udp", r.RemoteAddr) if err != nil { slog.Error("failed to upgrade", "err", err)