forked from YutaroHayakawa/go-ra
-
Notifications
You must be signed in to change notification settings - Fork 0
/
daemon.go
200 lines (167 loc) · 5.1 KB
/
daemon.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of go-ra
package ra
import (
"context"
"log/slog"
"sort"
"sync"
"time"
)
// Daemon is the main struct for the ra daemon
type Daemon struct {
initialConfig *Config
reloadCh chan *Config
logger *slog.Logger
socketConstructor socketCtor
deviceWatcher deviceWatcher
advertisers map[string]*advertiser
advertisersLock sync.RWMutex
}
// NewDaemon creates a new Daemon instance with the provided configuration and
// options. It returns ValidationErrors if the configuration is invalid.
func NewDaemon(config *Config, opts ...DaemonOption) (*Daemon, error) {
// Take a copy of the new configuration. c.validate() will modify it to
// set default values.
c := config.deepCopy()
// Validate the configuration first
if err := c.defaultAndValidate(); err != nil {
return nil, err
}
d := &Daemon{
initialConfig: c,
reloadCh: make(chan *Config),
logger: slog.Default(),
socketConstructor: newSocket,
deviceWatcher: newDeviceWatcher(),
advertisers: map[string]*advertiser{},
}
for _, opt := range opts {
opt(d)
}
return d, nil
}
// Run starts the daemon and blocks until the context is cancelled
func (d *Daemon) Run(ctx context.Context) {
d.logger.Info("Starting daemon")
// Current desired configuration
config := d.initialConfig
reload:
// Main loop
for {
var (
toAdd []*InterfaceConfig
toUpdate []*advertiser
toRemove []*advertiser
)
// We may modify the advertiser map from now
d.advertisersLock.Lock()
// Cache the interface => config mapping for later use
ifaceConfigs := map[string]*InterfaceConfig{}
// Find out which advertiser to add, update and remove
for _, c := range config.Interfaces {
if advertiser, ok := d.advertisers[c.Name]; !ok {
toAdd = append(toAdd, c)
} else {
toUpdate = append(toUpdate, advertiser)
}
ifaceConfigs[c.Name] = c
}
for name, advertiser := range d.advertisers {
if _, ok := ifaceConfigs[name]; !ok {
toRemove = append(toRemove, advertiser)
}
}
// Add new per-interface jobs
for _, c := range toAdd {
d.logger.Info("Adding new RA sender", slog.String("interface", c.Name))
advertiser := newAdvertiser(c, d.socketConstructor, d.deviceWatcher, d.logger)
go advertiser.run(ctx)
d.advertisers[c.Name] = advertiser
}
// Update (reload) existing workers
for _, advertiser := range toUpdate {
iface := advertiser.initialConfig.Name
d.logger.Info("Updating RA sender", slog.String("interface", iface))
// Set timeout to guarantee progress
timeout, cancelTimeout := context.WithTimeout(ctx, time.Second*3)
advertiser.reload(timeout, ifaceConfigs[iface])
cancelTimeout()
}
// Remove unnecessary workers
for _, advertiser := range toRemove {
iface := advertiser.initialConfig.Name
d.logger.Info("Deleting RA sender", slog.String("interface", iface))
advertiser.stop()
delete(d.advertisers, iface)
}
d.advertisersLock.Unlock()
// Wait for the events
for {
select {
case newConfig := <-d.reloadCh:
d.logger.Info("Reloading configuration")
config = newConfig
continue reload
case <-ctx.Done():
d.logger.Info("Shutting down daemon")
return
}
}
}
}
// Reload reloads the configuration of the daemon. The context passed to this
// function is used to cancel the potentially long-running operations during
// the reload process. Currently, the result of the unsucecssful or cancelled
// reload is undefined and the daemon may be running with either the old or the
// new configuration or both. It returns ValidationErrors if the configuration
// is invalid.
func (d *Daemon) Reload(ctx context.Context, newConfig *Config) error {
// Take a copy of the new configuration. c.validate() will modify it to
// set default values.
c := newConfig.deepCopy()
if err := c.defaultAndValidate(); err != nil {
return err
}
select {
case d.reloadCh <- c:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// Status returns the current status of the daemon
func (d *Daemon) Status() *Status {
d.advertisersLock.RLock()
ifaceStatus := []*InterfaceStatus{}
for _, advertiser := range d.advertisers {
ifaceStatus = append(ifaceStatus, advertiser.status())
}
d.advertisersLock.RUnlock()
sort.Slice(ifaceStatus, func(i, j int) bool {
return ifaceStatus[i].Name < ifaceStatus[j].Name
})
return &Status{Interfaces: ifaceStatus}
}
// DaemonOption is an optional parameter for the Daemon constructor
type DaemonOption func(*Daemon)
// WithLogger overrides the default logger with the provided one.
func WithLogger(l *slog.Logger) DaemonOption {
return func(d *Daemon) {
d.logger = l
}
}
// withSocketConstructor overrides the default socket constructor with the
// provided one. For testing purposes only.
func withSocketConstructor(c socketCtor) DaemonOption {
return func(d *Daemon) {
d.socketConstructor = c
}
}
// withDeviceWatcher overrides the default device watcher with the provided
// one. For testing purposes only.
func withDeviceWatcher(w deviceWatcher) DaemonOption {
return func(d *Daemon) {
d.deviceWatcher = w
}
}