-
Notifications
You must be signed in to change notification settings - Fork 997
/
observer.go
149 lines (129 loc) · 4.24 KB
/
observer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package raft
import (
"sync/atomic"
"time"
)
// Observation is sent along the given channel to observers when an event occurs.
type Observation struct {
// Raft holds the Raft instance generating the observation.
Raft *Raft
// Data holds observation-specific data. Possible types are
// RequestVoteRequest
// RaftState
// PeerObservation
// LeaderObservation
Data interface{}
}
// LeaderObservation is used for the data when leadership changes.
type LeaderObservation struct {
// DEPRECATED The LeaderAddr field should now be used
Leader ServerAddress
LeaderAddr ServerAddress
LeaderID ServerID
}
// PeerObservation is sent to observers when peers change.
type PeerObservation struct {
Removed bool
Peer Server
}
// FailedHeartbeatObservation is sent when a node fails to heartbeat with the leader
type FailedHeartbeatObservation struct {
PeerID ServerID
LastContact time.Time
}
// ResumedHeartbeatObservation is sent when a node resumes to heartbeat with the leader following failures
type ResumedHeartbeatObservation struct {
PeerID ServerID
}
// nextObserverId is used to provide a unique ID for each observer to aid in
// deregistration.
var nextObserverID uint64
// FilterFn is a function that can be registered in order to filter observations.
// The function reports whether the observation should be included - if
// it returns false, the observation will be filtered out.
type FilterFn func(o *Observation) bool
// Observer describes what to do with a given observation.
type Observer struct {
// numObserved and numDropped are performance counters for this observer.
// 64 bit types must be 64 bit aligned to use with atomic operations on
// 32 bit platforms, so keep them at the top of the struct.
numObserved uint64
numDropped uint64
// channel receives observations.
channel chan Observation
// blocking, if true, will cause Raft to block when sending an observation
// to this observer. This should generally be set to false.
blocking bool
// filter will be called to determine if an observation should be sent to
// the channel.
filter FilterFn
// id is the ID of this observer in the Raft map.
id uint64
}
// NewObserver creates a new observer that can be registered
// to make observations on a Raft instance. Observations
// will be sent on the given channel if they satisfy the
// given filter.
//
// If blocking is true, the observer will block when it can't
// send on the channel, otherwise it may discard events.
func NewObserver(channel chan Observation, blocking bool, filter FilterFn) *Observer {
return &Observer{
channel: channel,
blocking: blocking,
filter: filter,
id: atomic.AddUint64(&nextObserverID, 1),
}
}
// GetNumObserved returns the number of observations.
func (or *Observer) GetNumObserved() uint64 {
return atomic.LoadUint64(&or.numObserved)
}
// GetNumDropped returns the number of dropped observations due to blocking.
func (or *Observer) GetNumDropped() uint64 {
return atomic.LoadUint64(&or.numDropped)
}
// RegisterObserver registers a new observer.
func (r *Raft) RegisterObserver(or *Observer) {
r.observersLock.Lock()
defer r.observersLock.Unlock()
r.observers[or.id] = or
}
// DeregisterObserver deregisters an observer.
func (r *Raft) DeregisterObserver(or *Observer) {
r.observersLock.Lock()
defer r.observersLock.Unlock()
delete(r.observers, or.id)
}
// observe sends an observation to every observer.
func (r *Raft) observe(o interface{}) {
// In general observers should not block. But in any case this isn't
// disastrous as we only hold a read lock, which merely prevents
// registration / deregistration of observers.
r.observersLock.RLock()
defer r.observersLock.RUnlock()
for _, or := range r.observers {
// It's wasteful to do this in the loop, but for the common case
// where there are no observers we won't create any objects.
ob := Observation{Raft: r, Data: o}
if or.filter != nil && !or.filter(&ob) {
continue
}
if or.channel == nil {
continue
}
if or.blocking {
or.channel <- ob
atomic.AddUint64(&or.numObserved, 1)
} else {
select {
case or.channel <- ob:
atomic.AddUint64(&or.numObserved, 1)
default:
atomic.AddUint64(&or.numDropped, 1)
}
}
}
}