forked from lightninglabs/neutrino
-
Notifications
You must be signed in to change notification settings - Fork 0
/
blocksubscriptions.go
259 lines (218 loc) · 6.84 KB
/
blocksubscriptions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
// NOTE: THIS API IS UNSTABLE AND WILL BE MOVED TO ITS OWN PACKAGE OR REFACTORED
// OUT.
package neutrino
import (
"fmt"
"github.com/btcsuite/btcd/wire"
)
// messageType describes the type of blockMessage.
type messageType int
const (
// connectBasic is a type of notification sent whenever we connect a
// new set of basic filter headers to the end of the main chain.
connectBasic messageType = iota
// disconnect is a type of filter notification that is sent whenever a
// block is disconnected from the end of the main chain.
disconnect
)
// blockMessage is a notification from the block manager to a block
// subscription's goroutine to be forwarded on via the appropriate channel.
type blockMessage struct {
header *wire.BlockHeader
msgType messageType
}
// blockSubscription allows a client to subscribe to and unsubscribe from block
// connect and disconnect notifications.
// TODO(aakselrod): Move this to its own package so that the subscriber can't
// access internals, in particular the notifyBlock and intQuit members.
type blockSubscription struct {
onConnectBasic chan<- wire.BlockHeader
onDisconnect chan<- wire.BlockHeader
quit <-chan struct{}
notifyBlock chan *blockMessage
intQuit chan struct{}
}
// sendSubscribedMsg sends all block subscribers a message if they request this
// type.
//
// TODO(aakselrod): Refactor so we're able to handle more message types in new
// package.
func (s *ChainService) sendSubscribedMsg(bm *blockMessage) {
s.mtxSubscribers.RLock()
for sub := range s.blockSubscribers {
sendMsgToSubscriber(sub, bm)
}
s.mtxSubscribers.RUnlock()
}
// sendMsgToSubscriber is a helper function that sends the target message to
// the subscription client over the proper channel based on the type of the new
// block notification.
func sendMsgToSubscriber(sub *blockSubscription, bm *blockMessage) {
var subChan chan<- wire.BlockHeader
switch bm.msgType {
case connectBasic:
subChan = sub.onConnectBasic
case disconnect:
subChan = sub.onDisconnect
default:
// TODO: Return a useful error when factored out into its own
// package.
panic("invalid message type")
}
// If the subscription channel was found for this subscription based on
// the new update, then we'll wait to either send this notification, or
// quit from either signal.
if subChan != nil {
select {
case sub.notifyBlock <- bm:
case <-sub.quit:
case <-sub.intQuit:
}
}
}
// subscribeBlockMsg handles adding block subscriptions to the ChainService.
// The best known height to the caller should be passed in, such that we can
// send a backlog of notifications to the caller if they're behind the current
// best tip.
//
// TODO(aakselrod): move this to its own package and refactor so that we're not
// modifying an object held by the caller.
func (s *ChainService) subscribeBlockMsg(bestHeight uint32,
onConnectBasic, onDisconnect chan<- wire.BlockHeader,
quit <-chan struct{}) (*blockSubscription, error) {
subscription := blockSubscription{
onConnectBasic: onConnectBasic,
onDisconnect: onDisconnect,
quit: quit,
notifyBlock: make(chan *blockMessage),
intQuit: make(chan struct{}),
}
// At this point, we'll now check to see if we need to deliver any
// backlog notifications as its possible that while the caller is
// requesting right after a new set of blocks has been connected.
err := s.blockManager.SynchronizeFilterHeaders(func(filterHeaderTip uint32) error {
s.mtxSubscribers.Lock()
defer s.mtxSubscribers.Unlock()
s.blockSubscribers[&subscription] = struct{}{}
go subscription.subscriptionHandler()
// If the best height matches the filter header tip, then we're
// done and don't need to proceed any further.
if filterHeaderTip == bestHeight {
return nil
}
log.Debugf("Delivering backlog block notifications from "+
"height=%v, to height=%v", bestHeight, filterHeaderTip)
// Otherwise, we need to read block headers from disk to
// deliver a backlog to the caller before we proceed. We'll use
// this synchronization method to ensure the filter header
// state doesn't change until we're finished catching up the
// caller.
for currentHeight := bestHeight + 1; currentHeight <= filterHeaderTip; currentHeight++ {
blockHeader, err := s.BlockHeaders.FetchHeaderByHeight(
currentHeight,
)
if err != nil {
return fmt.Errorf("unable to read header at "+
"height: %v: %v", currentHeight, err)
}
sendMsgToSubscriber(&subscription, &blockMessage{
msgType: connectBasic,
header: blockHeader,
})
}
return nil
})
if err != nil {
return nil, err
}
return &subscription, nil
}
// unsubscribeBlockMsgs handles removing block subscriptions from the
// ChainService.
//
// TODO(aakselrod): move this to its own package and refactor so that we're
// not depending on the caller to not modify the argument between subscribe and
// unsubscribe.
func (s *ChainService) unsubscribeBlockMsgs(subscription *blockSubscription) {
s.mtxSubscribers.Lock()
delete(s.blockSubscribers, subscription)
s.mtxSubscribers.Unlock()
close(subscription.intQuit)
// Drain the inbound notification channel
cleanup:
for {
select {
case <-subscription.notifyBlock:
default:
break cleanup
}
}
}
// subscriptionHandler must be run as a goroutine and queues notification
// messages from the chain service to the subscriber.
func (s *blockSubscription) subscriptionHandler() {
// Start with a small queue; it will grow if needed.
ntfns := make([]*blockMessage, 0, 5)
var next *blockMessage
// Try to send on the specified channel. If a new message arrives while
// we try to send, queue it and continue with the loop. If a quit
// signal is sent, let the loop know.
selectChan := func(notify chan<- wire.BlockHeader) bool {
if notify == nil {
select {
case <-s.quit:
return false
case <-s.intQuit:
return false
default:
return true
}
}
select {
case notify <- *next.header:
next = nil
return true
case queueMsg := <-s.notifyBlock:
ntfns = append(ntfns, queueMsg)
return true
case <-s.quit:
return false
case <-s.intQuit:
return false
}
}
// Loop until we get a signal on s.quit or s.intQuit.
for {
if next != nil {
// If selectChan returns false, we were signalled on
// s.quit or s.intQuit.
switch next.msgType {
case connectBasic:
if !selectChan(s.onConnectBasic) {
return
}
case disconnect:
if !selectChan(s.onDisconnect) {
return
}
}
} else {
// Next notification is nil, so see if we can get a
// notification from the queue. If not, we wait for a
// notification on s.notifyBlock or quit if signalled.
if len(ntfns) > 0 {
next = ntfns[0]
ntfns[0] = nil // Set to nil to avoid GC leak.
ntfns = ntfns[1:]
} else {
select {
case next = <-s.notifyBlock:
case <-s.quit:
return
case <-s.intQuit:
return
}
}
}
}
}