Skip to content

Commit

Permalink
Add config setting to control udp sockets count (#162)
Browse files Browse the repository at this point in the history
  • Loading branch information
streamer45 authored Nov 22, 2024
1 parent 2137f0c commit fc76bf6
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 29 deletions.
9 changes: 8 additions & 1 deletion config/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ turn.static_auth_secret = ""
# The expiration, in minutes, of the short-lived credentials generated for TURN servers.
turn.credentials_expiration_minutes = 1440

# udp_sockets_count controls the number of listening UDP sockets used for each local
# network address. A larger number can improve performance by reducing contention
# over a few file descriptors. At the same time, it will cause more file descriptors
# to be open. The default is a dynamic value that scales with the number of available CPUs with
# a constant multiplier of 100. E.g. On a 4 CPUs node, 400 sockets per local
# network address will be open.
# udp_sockets_count =

[store]
# A path to a directory the service will use to store persistent data such as registered client IDs and hashed credentials.
data_source = "/tmp/rtcd_db"
Expand All @@ -109,4 +117,3 @@ file_level = "DEBUG"
file_location = "rtcd.log"
# A boolean controlling whether to display colors when logging to the console.
enable_color = true

1 change: 1 addition & 0 deletions docs/env_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ RTCD_RTC_ICESERVERS Comma-separated list of
RTCD_RTC_TURNCONFIG_STATICAUTHSECRET String
RTCD_RTC_TURNCONFIG_CREDENTIALSEXPIRATIONMINUTES Integer
RTCD_RTC_ENABLEIPV6 True or False
RTCD_RTC_UDPSOCKETSCOUNT Integer
RTCD_STORE_DATASOURCE String
RTCD_LOGGER_ENABLECONSOLE True or False
RTCD_LOGGER_CONSOLEJSON True or False
Expand Down
1 change: 1 addition & 0 deletions service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (c *Config) SetDefaults() {
c.RTC.ICEPortUDP = 8443
c.RTC.ICEPortTCP = 8443
c.RTC.TURNConfig.CredentialsExpirationMinutes = 1440
c.RTC.UDPSocketsCount = rtc.GetDefaultUDPListeningSocketsCount()
c.Store.DataSource = "/tmp/rtcd_db"
c.Logger.EnableConsole = true
c.Logger.ConsoleJSON = false
Expand Down
5 changes: 3 additions & 2 deletions service/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ func MakeDefaultCfg(tb testing.TB) *Config {
},
},
RTC: rtc.ServerConfig{
ICEPortUDP: 30444,
ICEPortTCP: 30444,
ICEPortUDP: 30444,
ICEPortTCP: 30444,
UDPSocketsCount: rtc.GetDefaultUDPListeningSocketsCount(),
},
Store: StoreConfig{
DataSource: dbDir,
Expand Down
11 changes: 11 additions & 0 deletions service/rtc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ type ServerConfig struct {
TURNConfig TURNConfig `toml:"turn"`
// EnableIPv6 specifies whether or not IPv6 should be used.
EnableIPv6 bool `toml:"enable_ipv6"`
// UDPSocketsCount controls the number of listening UDP sockets used for each local
// network address. A larger number can improve performance by reducing contention
// over a few file descriptors. At the same time, it will cause more file descriptors
// to be open. The default is a dynamic value that scales with the number of available CPUs with
// a constant multiplier of 100. E.g. On a 4 CPUs node, 400 sockets per local
// network address will be open.
UDPSocketsCount int `toml:"udp_sockets_count"`
}

func (c ServerConfig) IsValid() error {
Expand Down Expand Up @@ -62,6 +69,10 @@ func (c ServerConfig) IsValid() error {
return fmt.Errorf("invalid ICEHostPortOverride value: %w", err)
}

if c.UDPSocketsCount <= 0 {
return fmt.Errorf("invalid UDPSocketsCount value: should be greater than 0")
}

return nil
}

Expand Down
10 changes: 10 additions & 0 deletions service/rtc/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,22 @@ func TestServerConfigIsValid(t *testing.T) {
})
})

t.Run("invalid UDPSocketsCount", func(t *testing.T) {
var cfg ServerConfig
cfg.ICEPortUDP = 8443
cfg.ICEPortTCP = 8443
cfg.UDPSocketsCount = 0
err := cfg.IsValid()
require.EqualError(t, err, "invalid UDPSocketsCount value: should be greater than 0")
})

t.Run("valid", func(t *testing.T) {
var cfg ServerConfig
cfg.ICEAddressUDP = "127.0.0.1"
cfg.ICEPortUDP = 8443
cfg.ICEPortTCP = 8443
cfg.TURNConfig.CredentialsExpirationMinutes = 1440
cfg.UDPSocketsCount = 1
err := cfg.IsValid()
require.NoError(t, err)
})
Expand Down
6 changes: 3 additions & 3 deletions service/rtc/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
tcpSocketWriteBufferSize = 1024 * 1024 * 4 // 4MB
)

func getUDPListeningSocketsCount() int {
func GetDefaultUDPListeningSocketsCount() int {
// Originally we used runtime.NumCPU() but increased it as a result of v1 ceiling tests.
// The reason is that having just a few sockets caused significant lock contentions on
// the underlying file descriptors (at WriteToInet4 in internal/poll/fd_unix.go).
Expand Down Expand Up @@ -78,10 +78,10 @@ func getSystemIPs(log mlog.LoggerIFace, dualStack bool) ([]netip.Addr, error) {
return ips, nil
}

func createUDPConnsForAddr(log mlog.LoggerIFace, network, listenAddress string) ([]net.PacketConn, error) {
func createUDPConnsForAddr(log mlog.LoggerIFace, network, listenAddress string, socketsCount int) ([]net.PacketConn, error) {
var conns []net.PacketConn

for i := 0; i < getUDPListeningSocketsCount(); i++ {
for i := 0; i < socketsCount; i++ {
listenConfig := net.ListenConfig{
Control: func(_, _ string, c syscall.RawConn) error {
return c.Control(func(fd uintptr) {
Expand Down
8 changes: 4 additions & 4 deletions service/rtc/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func TestCreateUDPConnsForAddr(t *testing.T) {
require.NotEmpty(t, ips)

for _, ip := range ips {
conns, err := createUDPConnsForAddr(log, "udp4", netip.AddrPortFrom(ip, 30443).String())
conns, err := createUDPConnsForAddr(log, "udp4", netip.AddrPortFrom(ip, 30443).String(), 45)
require.NoError(t, err)
require.Len(t, conns, getUDPListeningSocketsCount())
require.Len(t, conns, 45)
for _, conn := range conns {
require.NoError(t, conn.Close())
}
Expand All @@ -91,9 +91,9 @@ func TestCreateUDPConnsForAddr(t *testing.T) {
require.NotEmpty(t, ips)

for _, ip := range ips {
conns, err := createUDPConnsForAddr(log, "udp", netip.AddrPortFrom(ip, 30443).String())
conns, err := createUDPConnsForAddr(log, "udp", netip.AddrPortFrom(ip, 30443).String(), 45)
require.NoError(t, err)
require.Len(t, conns, getUDPListeningSocketsCount())
require.Len(t, conns, 45)
for _, conn := range conns {
require.NoError(t, conn.Close())
}
Expand Down
2 changes: 1 addition & 1 deletion service/rtc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (s *Server) initUDP(localIPs []netip.Addr, network string) error {
var udpMuxes []ice.UDPMux

initUDPMux := func(addr string) error {
conns, err := createUDPConnsForAddr(s.log, network, addr)
conns, err := createUDPConnsForAddr(s.log, network, addr, s.cfg.UDPSocketsCount)
if err != nil {
return fmt.Errorf("failed to create UDP connections: %w", err)
}
Expand Down
49 changes: 31 additions & 18 deletions service/rtc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ func setupServer(t *testing.T) (*Server, func()) {
require.NotNil(t, metrics)

cfg := ServerConfig{
ICEPortUDP: 30433,
ICEPortTCP: 30433,
ICEPortUDP: 30433,
ICEPortTCP: 30433,
UDPSocketsCount: GetDefaultUDPListeningSocketsCount(),
}

s, err := NewServer(cfg, log, metrics)
Expand Down Expand Up @@ -66,8 +67,9 @@ func TestNewServer(t *testing.T) {

t.Run("missing logger", func(t *testing.T) {
cfg := ServerConfig{
ICEPortUDP: 30433,
ICEPortTCP: 30433,
ICEPortUDP: 30433,
ICEPortTCP: 30433,
UDPSocketsCount: GetDefaultUDPListeningSocketsCount(),
}
s, err := NewServer(cfg, nil, metrics)
require.Error(t, err)
Expand All @@ -76,8 +78,9 @@ func TestNewServer(t *testing.T) {

t.Run("missing metrics", func(t *testing.T) {
cfg := ServerConfig{
ICEPortUDP: 30433,
ICEPortTCP: 30433,
ICEPortUDP: 30433,
ICEPortTCP: 30433,
UDPSocketsCount: GetDefaultUDPListeningSocketsCount(),
}
s, err := NewServer(cfg, log, nil)
require.Error(t, err)
Expand All @@ -86,8 +89,9 @@ func TestNewServer(t *testing.T) {

t.Run("valid", func(t *testing.T) {
cfg := ServerConfig{
ICEPortUDP: 30433,
ICEPortTCP: 30433,
ICEPortUDP: 30433,
ICEPortTCP: 30433,
UDPSocketsCount: GetDefaultUDPListeningSocketsCount(),
}
s, err := NewServer(cfg, log, metrics)
require.NoError(t, err)
Expand All @@ -107,8 +111,9 @@ func TestStartServer(t *testing.T) {
require.NotNil(t, metrics)

cfg := ServerConfig{
ICEPortUDP: 30433,
ICEPortTCP: 30433,
ICEPortUDP: 30433,
ICEPortTCP: 30433,
UDPSocketsCount: GetDefaultUDPListeningSocketsCount(),
}

t.Run("port unavailable", func(t *testing.T) {
Expand Down Expand Up @@ -159,8 +164,9 @@ func TestDraining(t *testing.T) {
}()

cfg := ServerConfig{
ICEPortUDP: 30433,
ICEPortTCP: 30433,
ICEPortUDP: 30433,
ICEPortTCP: 30433,
UDPSocketsCount: GetDefaultUDPListeningSocketsCount(),
}

metrics := perf.NewMetrics("rtcd", nil)
Expand Down Expand Up @@ -221,8 +227,9 @@ func TestInitSession(t *testing.T) {
require.NotNil(t, metrics)

cfg := ServerConfig{
ICEPortUDP: 30433,
ICEPortTCP: 30433,
ICEPortUDP: 30433,
ICEPortTCP: 30433,
UDPSocketsCount: GetDefaultUDPListeningSocketsCount(),
}

s, err := NewServer(cfg, log, metrics)
Expand Down Expand Up @@ -386,8 +393,9 @@ func TestCalls(t *testing.T) {
require.NotNil(t, metrics)

cfg := ServerConfig{
ICEPortUDP: 30433,
ICEPortTCP: 30433,
ICEPortUDP: 30433,
ICEPortTCP: 30433,
UDPSocketsCount: GetDefaultUDPListeningSocketsCount(),
}

s, err := NewServer(cfg, log, metrics)
Expand Down Expand Up @@ -479,8 +487,9 @@ func TestTCPCandidates(t *testing.T) {
require.NotNil(t, metrics)

serverCfg := ServerConfig{
ICEPortUDP: 30433,
ICEPortTCP: 30433,
ICEPortUDP: 30433,
ICEPortTCP: 30433,
UDPSocketsCount: GetDefaultUDPListeningSocketsCount(),
}

s, err := NewServer(serverCfg, log, metrics)
Expand Down Expand Up @@ -651,6 +660,7 @@ func TestICEHostPortOverride(t *testing.T) {
ICEPortUDP: 30433,
ICEPortTCP: 30433,
ICEHostPortOverride: "8443",
UDPSocketsCount: GetDefaultUDPListeningSocketsCount(),
}

candidatesCh := gatherCandidates(serverCfg, nil)
Expand All @@ -669,6 +679,7 @@ func TestICEHostPortOverride(t *testing.T) {
ICEPortTCP: 30433,
ICEHostOverride: "8.8.8.8",
ICEHostPortOverride: "8443",
UDPSocketsCount: GetDefaultUDPListeningSocketsCount(),
}

candidatesCh := gatherCandidates(serverCfg, nil)
Expand All @@ -692,6 +703,7 @@ func TestICEHostPortOverride(t *testing.T) {
ICEPortTCP: 30433,
ICEHostOverride: "8.8.8.8",
ICEHostPortOverride: "127.0.0.1/8443",
UDPSocketsCount: GetDefaultUDPListeningSocketsCount(),
}

candidatesCh := gatherCandidates(serverCfg, nil)
Expand All @@ -715,6 +727,7 @@ func TestICEHostPortOverride(t *testing.T) {
ICEPortTCP: 30433,
ICEHostPortOverride: "8443",
ICEHostOverride: "",
UDPSocketsCount: GetDefaultUDPListeningSocketsCount(),
}

publicIP := "8.8.8.8"
Expand Down

0 comments on commit fc76bf6

Please sign in to comment.