forked from redis/rueidis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpubsub.go
156 lines (140 loc) · 3 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package rueidis
import "sync"
// PubSubMessage represent a pubsub message from redis
type PubSubMessage struct {
// Pattern is only available with pmessage.
Pattern string
// Channel is the channel the message belongs to
Channel string
// Message is the message content
Message string
}
// PubSubSubscription represent a pubsub "subscribe", "unsubscribe", "psubscribe" or "punsubscribe" event.
type PubSubSubscription struct {
// Kind is "subscribe", "unsubscribe", "psubscribe" or "punsubscribe"
Kind string
// Channel is the event subject.
Channel string
// Count is the current number of subscriptions for connection.
Count int64
}
// PubSubHooks can be registered into DedicatedClient to process pubsub messages without using Client.Receive
type PubSubHooks struct {
// OnMessage will be called when receiving "message" and "pmessage" event.
OnMessage func(m PubSubMessage)
// OnSubscription will be called when receiving "subscribe", "unsubscribe", "psubscribe" and "punsubscribe" event.
OnSubscription func(s PubSubSubscription)
}
func (h *PubSubHooks) isZero() bool {
return h.OnMessage == nil && h.OnSubscription == nil
}
func newSubs() *subs {
return &subs{chs: make(map[string]chs), sub: make(map[int]*sub)}
}
type subs struct {
mu sync.RWMutex
chs map[string]chs
sub map[int]*sub
cnt int
}
type chs struct {
sub map[int]*sub
cnf bool
}
type sub struct {
cs []string
ch chan PubSubMessage
}
func (s *subs) Publish(channel string, msg PubSubMessage) {
s.mu.RLock()
if s.chs != nil {
for _, sb := range s.chs[channel].sub {
sb.ch <- msg
}
}
s.mu.RUnlock()
}
func (s *subs) Subscribe(channels []string) (ch chan PubSubMessage, cancel func()) {
s.mu.Lock()
if s.chs != nil {
s.cnt++
ch = make(chan PubSubMessage, 16)
sb := &sub{cs: channels, ch: ch}
id := s.cnt
s.sub[id] = sb
for _, channel := range channels {
c := s.chs[channel].sub
if c == nil {
c = make(map[int]*sub)
s.chs[channel] = chs{sub: c, cnf: false}
}
c[id] = sb
}
cancel = func() {
go func() {
for range ch {
}
}()
s.mu.Lock()
if s.chs != nil {
s.remove(id)
}
s.mu.Unlock()
}
}
s.mu.Unlock()
return ch, cancel
}
func (s *subs) remove(id int) {
if sb := s.sub[id]; sb != nil {
for _, channel := range sb.cs {
if c := s.chs[channel].sub; c != nil {
delete(c, id)
}
}
close(sb.ch)
delete(s.sub, id)
}
}
func (s *subs) Confirm(channel string) {
s.mu.Lock()
if s.chs != nil {
c := s.chs[channel]
c.cnf = true
s.chs[channel] = c
}
s.mu.Unlock()
}
func (s *subs) Confirmed() (count int) {
s.mu.RLock()
if s.chs != nil {
for _, c := range s.chs {
if c.cnf {
count++
}
}
}
s.mu.RUnlock()
return
}
func (s *subs) Unsubscribe(channel string) {
s.mu.Lock()
if s.chs != nil {
for id := range s.chs[channel].sub {
s.remove(id)
}
delete(s.chs, channel)
}
s.mu.Unlock()
}
func (s *subs) Close() {
var sbs map[int]*sub
s.mu.Lock()
sbs = s.sub
s.chs = nil
s.sub = nil
s.mu.Unlock()
for _, sb := range sbs {
close(sb.ch)
}
}