-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbroadcaster.go
70 lines (56 loc) · 1.47 KB
/
broadcaster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package broadcasting
import "sync"
// Broadcaster implements a Publisher.
type Broadcaster[T any] struct {
mutex sync.Mutex
bufferSize int
channels map[uint]chan<- T
nextID uint
closed bool
}
// NewBroadcaster returns a new Broadcaster (channelSize = 0 means un-buffered).
func NewBroadcaster[T any](channelSize int) *Broadcaster[T] {
return &Broadcaster[T]{
bufferSize: channelSize,
}
}
// Listen returns a Listener for the broadcast channel.
func (bcast *Broadcaster[T]) Listen() *Listener[T] {
bcast.mutex.Lock()
defer bcast.mutex.Unlock()
if bcast.channels == nil {
bcast.channels = make(map[uint]chan<- T)
}
if bcast.channels[bcast.nextID] != nil { // why? Is it necessary?
bcast.nextID++
}
channel := make(chan T, bcast.bufferSize)
bcast.channels[bcast.nextID] = channel
if bcast.closed {
close(channel)
}
return NewListener(bcast.nextID, channel, bcast)
}
// Send broadcasts a message to the channel.
// Sending on a closed channel causes a runtime panic.
func (bcast *Broadcaster[T]) Send(data T) {
bcast.mutex.Lock()
defer bcast.mutex.Unlock()
if bcast.closed {
panic("broadcast: send after close")
}
for _, channel := range bcast.channels {
channel <- data
}
}
// Close closes all channels, disabling the sending of further messages.
func (bcast *Broadcaster[T]) Close() {
bcast.mutex.Lock()
defer bcast.mutex.Unlock()
if !bcast.closed {
bcast.closed = true
for _, channel := range bcast.channels {
close(channel)
}
}
}