forked from l7mp/stunner
-
Notifications
You must be signed in to change notification settings - Fork 0
/
config.go
402 lines (337 loc) · 11.6 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
package stunner
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"github.com/pion/logging"
"github.com/pion/transport/v2"
"sigs.k8s.io/yaml"
"github.com/l7mp/stunner/internal/resolver"
"github.com/l7mp/stunner/pkg/apis/v1alpha1"
)
const confUpdatePeriod = 1 * time.Second
// Options defines various options for the STUNner server.
type Options struct {
// DryRun suppresses sideeffects: STUNner will not initialize listener sockets and bring up
// the TURN server, and it will not fire up the health-check and the metrics
// servers. Intended for testing, default is false.
DryRun bool
// SuppressRollback controls whether to rollback to the last working configuration after a
// failed reconciliation request. Default is false, which means to always do a rollback.
SuppressRollback bool
// LogLevel specifies the required loglevel for STUNner and each of its sub-objects, e.g.,
// "all:TRACE" will force maximal loglevel throughout, "all:ERROR,auth:TRACE,turn:DEBUG"
// will suppress all logs except in the authentication subsystem and the TURN protocol
// logic.
LogLevel string
// Resolver swaps the internal DNS resolver with a custom implementation. Intended for
// testing.
Resolver resolver.DnsResolver
// UDPListenerThreadNum determines the number of readloop threads spawned per UDP listener
// (default is 4, must be >0 integer). TURN allocations will be automatically load-balanced
// by the kernel UDP stack based on the client 5-tuple. This setting controls the maximum
// number of CPU cores UDP listeners can scale to. Note that all other listener protocol
// types (TCP, TLS and DTLS) use per-client threads, so this setting affects only UDP
// listeners. For more info see https://github.com/pion/turn/pull/295.
UDPListenerThreadNum int
// VNet will switch on testing mode, using a vnet.Net instance to run STUNner over an
// emulated data-plane.
Net transport.Net
}
// NewZeroConfig builds a zero configuration useful for bootstrapping STUNner. It starts with
// plaintext authentication and opens no listeners and clusters.
func NewZeroConfig() *v1alpha1.StunnerConfig {
return &v1alpha1.StunnerConfig{
ApiVersion: v1alpha1.ApiVersion,
Admin: v1alpha1.AdminConfig{},
Auth: v1alpha1.AuthConfig{
Type: "plaintext",
Realm: v1alpha1.DefaultRealm,
Credentials: map[string]string{
"username": "dummy-username",
"password": "dummy-password",
},
},
Listeners: []v1alpha1.ListenerConfig{},
Clusters: []v1alpha1.ClusterConfig{},
}
}
// NewDefaultConfig builds a default configuration from a TURN server URI. Example: the URI
// `turn://user:[email protected]:3478?transport=udp` will be parsed into a STUNner configuration with
// a server running on the localhost at UDP port 3478, with plain-text authentication using the
// username/password pair `user:pass`. Health-checks and metric scarping are disabled.
func NewDefaultConfig(uri string) (*v1alpha1.StunnerConfig, error) {
u, err := ParseUri(uri)
if err != nil {
return nil, fmt.Errorf("Invalid URI '%s': %s", uri, err)
}
if u.Username == "" || u.Password == "" {
return nil, fmt.Errorf("Username/password must be set: '%s'", uri)
}
h := ""
c := &v1alpha1.StunnerConfig{
ApiVersion: v1alpha1.ApiVersion,
Admin: v1alpha1.AdminConfig{
LogLevel: v1alpha1.DefaultLogLevel,
// MetricsEndpoint: "http://:8088",
HealthCheckEndpoint: &h,
},
Auth: v1alpha1.AuthConfig{
Type: "plaintext",
Realm: v1alpha1.DefaultRealm,
Credentials: map[string]string{
"username": u.Username,
"password": u.Password,
},
},
Listeners: []v1alpha1.ListenerConfig{{
Name: "default-listener",
Protocol: u.Protocol,
Addr: u.Address,
Port: u.Port,
Routes: []string{"allow-any"},
}},
Clusters: []v1alpha1.ClusterConfig{{
Name: "allow-any",
Type: "STATIC",
Endpoints: []string{"0.0.0.0/0"},
}},
}
if strings.ToUpper(u.Protocol) == "TLS" || strings.ToUpper(u.Protocol) == "DTLS" {
certPem, keyPem, err := GenerateSelfSignedKey()
if err != nil {
return nil, err
}
c.Listeners[0].Cert = string(certPem)
c.Listeners[0].Key = string(keyPem)
}
if err := c.Validate(); err != nil {
return nil, err
}
return c, nil
}
// LoadConfig loads a configuration from a file, substituting environment variables for
// placeholders in the configuration file. Returns the new configuration or error if load fails.
func LoadConfig(config string) (*v1alpha1.StunnerConfig, error) {
c, err := os.ReadFile(config)
if err != nil {
return nil, fmt.Errorf("could not read config: %s\n", err.Error())
}
// substitute environtment variables
// default port: STUNNER_PUBLIC_PORT -> STUNNER_PORT
re := regexp.MustCompile(`^[0-9]+$`)
port, ok := os.LookupEnv("STUNNER_PORT")
if !ok || (ok && port == "") || (ok && !re.Match([]byte(port))) {
publicPort := v1alpha1.DefaultPort
publicPortStr, ok := os.LookupEnv("STUNNER_PUBLIC_PORT")
if ok {
if p, err := strconv.Atoi(publicPortStr); err == nil {
publicPort = p
}
}
os.Setenv("STUNNER_PORT", fmt.Sprintf("%d", publicPort))
}
e := os.ExpandEnv(string(c))
s := v1alpha1.StunnerConfig{}
// try YAML first
if err = yaml.Unmarshal([]byte(e), &s); err != nil {
// if it fails, try to json
if errJ := json.Unmarshal([]byte(e), &s); err != nil {
return nil, fmt.Errorf("could not parse config file at '%s': "+
"YAML parse error: %s, JSON parse error: %s\n",
config, err.Error(), errJ.Error())
}
}
return &s, nil
}
// GetConfig returns the configuration of the running STUNner daemon.
func (s *Stunner) GetConfig() *v1alpha1.StunnerConfig {
s.log.Tracef("GetConfig")
// singletons, but we want to avoid panics when GetConfig is called on an uninitialized
// STUNner object
adminConf := v1alpha1.AdminConfig{}
if len(s.adminManager.Keys()) > 0 {
adminConf = *s.GetAdmin().GetConfig().(*v1alpha1.AdminConfig)
}
authConf := v1alpha1.AuthConfig{}
if len(s.authManager.Keys()) > 0 {
authConf = *s.GetAuth().GetConfig().(*v1alpha1.AuthConfig)
}
listeners := s.listenerManager.Keys()
clusters := s.clusterManager.Keys()
c := v1alpha1.StunnerConfig{
ApiVersion: s.version,
Admin: adminConf,
Auth: authConf,
Listeners: make([]v1alpha1.ListenerConfig, len(listeners)),
Clusters: make([]v1alpha1.ClusterConfig, len(clusters)),
}
for i, name := range listeners {
c.Listeners[i] = *s.GetListener(name).GetConfig().(*v1alpha1.ListenerConfig)
}
for i, name := range clusters {
c.Clusters[i] = *s.GetCluster(name).GetConfig().(*v1alpha1.ClusterConfig)
}
return &c
}
type Watcher struct {
// ConfigFile specifies the config file name to watch.
ConfigFile string
// ConfigChannel is used to return the configs read.
ConfigChannel chan<- v1alpha1.StunnerConfig
// Logger is a logger factory as returned by, e.g., stunner.GetLogger().
Logger logging.LoggerFactory
// Log is a leveled logger used to report progress. Either Logger or Log must be specified.
Log logging.LeveledLogger
}
// WatchConfig will watch a configuration file specified in the `Watcher.ConfigFile` parameter for
// changes and emit a new `StunnerConfig` on `Watcher.ConfigChannel` each time the file changes. If
// no file exists at the given path, then WatchConfig will periodically retry until the file
// appears. The configuration sent through the channel is not validated, make sure to check for
// syntax errors on the receiver side. Use the `context` to cancel the watcher.
func WatchConfig(ctx context.Context, w Watcher) error {
if w.ConfigChannel == nil {
return errors.New("uninitialized config channel")
}
if w.ConfigFile == "" {
return errors.New("uninitialized config file path")
}
if w.Log == nil {
w.Log = w.Logger.NewLogger("watch-config")
}
w.Log.Tracef("WatchConfig")
go func() {
for {
// try to watch
if ok := configWatcher(ctx, w); !ok {
return
}
if ok := tryWatchConfig(ctx, w); !ok {
return
}
}
}()
return nil
}
// tryWatchConfig runs a timer to look for the config file at the given path and returns it
// immediately once found. Returns true if further action is needed (configWatcher has to be
// started) or false on normal exit.
func tryWatchConfig(ctx context.Context, w Watcher) bool {
w.Log.Tracef("tryWatchConfig")
config := w.ConfigFile
ticker := time.NewTicker(confUpdatePeriod)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return false
case <-ticker.C:
w.Log.Debugf("trying to read config file %q from periodic timer",
config)
// check if config file exists and it is readable
if _, err := os.Stat(config); errors.Is(err, os.ErrNotExist) {
w.Log.Debugf("config file %q does not exist", config)
// report status in every 10th second
if time.Now().Second()%10 == 0 {
w.Log.Warnf("waiting for config file %q", config)
}
continue
}
return true
}
}
}
// configWatcher actually watches the config and emits the configs found on the specified
// channel. Returns true if further action is needed (tryWatachConfig is to be started) or false on
// normal exit.
func configWatcher(ctx context.Context, w Watcher) bool {
w.Log.Tracef("configWatcher")
prev := v1alpha1.StunnerConfig{}
// create a new watcher
watcher, err := fsnotify.NewWatcher()
if err != nil {
return true
}
defer watcher.Close()
config := w.ConfigFile
ch := w.ConfigChannel
if err := watcher.Add(config); err != nil {
w.Log.Debugf("could not add config file %q watcher: %s", config, err.Error())
return true
}
// emit an initial config
c, err := LoadConfig(config)
if err != nil {
w.Log.Warnf("could not load config file %q: %s", config, err.Error())
return true
}
w.Log.Debugf("config file successfully loaded from %q", config)
// send a deepcopy over the channel
copy := v1alpha1.StunnerConfig{}
c.DeepCopyInto(©)
ch <- copy
// save deepcopy so that we can filter repeated events
c.DeepCopyInto(&prev)
for {
select {
case <-ctx.Done():
return false
case e, ok := <-watcher.Events:
if !ok {
w.Log.Debug("config watcher event handler received invalid event")
return true
}
w.Log.Debugf("received watcher event: %s", e.String())
if e.Has(fsnotify.Remove) {
w.Log.Warnf("config file deleted %q, disabling watcher", e.Op.String())
if err := watcher.Remove(config); err != nil {
w.Log.Debugf("could not remove config file %q watcher: %s",
config, err.Error())
}
return true
}
if !e.Has(fsnotify.Write) {
w.Log.Debugf("unhandled notify op on config file %q (ignoring): %s",
e.Name, e.Op.String())
continue
}
w.Log.Debugf("loading configuration file: %s", config)
c, err = LoadConfig(config)
if err != nil {
// assume it is a YAML/JSON syntax error (LoadConfig does not
// validate): report and ignore
w.Log.Warnf("could not load config file %q: %s", config, err.Error())
continue
}
// suppress repeated events
if c.DeepEqual(&prev) {
w.Log.Debugf("ignoring recurrent notify event for the same config file")
continue
}
w.Log.Debugf("config file successfully loaded from %q", config)
copy := v1alpha1.StunnerConfig{}
c.DeepCopyInto(©)
ch <- copy
// save deepcopy so that we can filter repeated events
c.DeepCopyInto(&prev)
case err, ok := <-watcher.Errors:
if !ok {
w.Log.Debugf("config watcher error handler received invalid error")
return true
}
w.Log.Debugf("watcher error, deactivating watcher: %s", err.Error())
if err := watcher.Remove(config); err != nil {
w.Log.Debugf("could not remove config file %q watcher: %s",
config, err.Error())
}
return true
}
}
}