-
Notifications
You must be signed in to change notification settings - Fork 4
/
broadcast.go
61 lines (51 loc) · 1.02 KB
/
broadcast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main
import "context"
type Listener chan<- ServerList
type broadcast struct {
ctx context.Context
src <-chan ServerList
state ServerList
tgts map[Listener]bool
add chan Listener
rem chan Listener
}
func newBroadcast(ctx context.Context, src <-chan ServerList) *broadcast {
b := &broadcast{
ctx: ctx,
src: filterEqualServerList(src),
tgts: make(map[Listener]bool),
add: make(chan Listener),
rem: make(chan Listener),
}
go b.run()
return b
}
func (b *broadcast) addListener(listener Listener) {
b.add <- listener
}
func (b *broadcast) remListener(listener Listener) {
b.rem <- listener
}
func (b *broadcast) run() {
for {
select {
case <-b.ctx.Done():
close(b.add)
close(b.rem)
return
case l := <-b.add:
b.tgts[l] = true
// send initial state if present once on registration
if len(b.state) > 0 {
l <- b.state
}
case l := <-b.rem:
delete(b.tgts, l)
case state := <-b.src:
b.state = state
for tgt := range b.tgts {
tgt <- state
}
}
}
}