forked from docker-archive/leadership
-
Notifications
You must be signed in to change notification settings - Fork 1
/
candidate.go
153 lines (126 loc) · 3.14 KB
/
candidate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package leadership
import (
"context"
"sync"
"time"
"github.com/kvtools/valkeyrie/store"
)
const (
defaultLockTTL = 20 * time.Second
)
// Candidate runs the leader election algorithm asynchronously
type Candidate struct {
client store.Store
key string
node string
electedCh chan bool
lock sync.RWMutex
lockTTL time.Duration
leader bool
stopCh chan struct{}
stopRenew chan struct{}
resignCh chan bool
errCh chan error
}
// NewCandidate creates a new Candidate
func NewCandidate(client store.Store, key, node string, ttl time.Duration) *Candidate {
return &Candidate{
client: client,
key: key,
node: node,
leader: false,
lockTTL: ttl,
resignCh: make(chan bool),
stopCh: make(chan struct{}),
}
}
// IsLeader returns true if the candidate is currently a leader.
func (c *Candidate) IsLeader() bool {
c.lock.RLock()
defer c.lock.RUnlock()
return c.leader
}
// RunForElection starts the leader election algorithm. Updates in status are
// pushed through the ElectedCh channel.
//
// ElectedCh is used to get a channel which delivers signals on
// acquiring or losing leadership. It sends true if we become
// the leader, and false if we lose it.
func (c *Candidate) RunForElection(ctx context.Context) (<-chan bool, <-chan error) {
c.electedCh = make(chan bool)
c.errCh = make(chan error)
go c.campaign(ctx)
return c.electedCh, c.errCh
}
// Stop running for election.
func (c *Candidate) Stop() {
close(c.stopCh)
}
// Resign forces the candidate to step-down and try again.
// If the candidate is not a leader, it doesn't have any effect.
// Candidate will retry immediately to acquire the leadership. If no-one else
// took it, then the Candidate will end up being a leader again.
func (c *Candidate) Resign() {
c.lock.Lock()
defer c.lock.Unlock()
if c.leader {
c.resignCh <- true
}
}
func (c *Candidate) update(status bool) {
c.lock.Lock()
defer c.lock.Unlock()
c.leader = status
c.electedCh <- status
}
func (c *Candidate) initLock(ctx context.Context) (store.Locker, error) {
// Give up on the lock session if
// we recovered from a store failure
if c.stopRenew != nil {
close(c.stopRenew)
}
lockOpts := &store.LockOptions{
Value: []byte(c.node),
}
if c.lockTTL != defaultLockTTL {
lockOpts.TTL = c.lockTTL
}
lockOpts.RenewLock = make(chan struct{})
c.stopRenew = lockOpts.RenewLock
lock, err := c.client.NewLock(ctx, c.key, lockOpts)
return lock, err
}
func (c *Candidate) campaign(ctx context.Context) {
defer close(c.electedCh)
defer close(c.errCh)
for {
// Start as a follower.
c.update(false)
lock, err := c.initLock(ctx)
if err != nil {
c.errCh <- err
return
}
lostCh, err := lock.Lock(ctx)
if err != nil {
c.errCh <- err
return
}
// Hooray! We acquired the lock therefore we are the new leader.
c.update(true)
select {
case <-c.resignCh:
// We were asked to resign, give up the lock and go back
// campaigning.
_ = lock.Unlock(ctx)
case <-c.stopCh:
// Give up the leadership and quit.
if c.leader {
_ = lock.Unlock(ctx)
}
return
case <-lostCh:
// We lost the lock. Someone else is the leader, try again.
}
}
}