-
Notifications
You must be signed in to change notification settings - Fork 23
/
rwlock.go
389 lines (329 loc) · 11.5 KB
/
rwlock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
/*
Copyright 2015 Datawise Systems Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Utility that implements distributed read/write locks using etcd.
//
// Assumptions:
// 1) Each lock request is added to a per lock queue in etcd. The queue is
// implemented using atomic in order keys in etcd with a queue-ttl. The
// queue-ttl is a constant for now (30 seconds). This is the maximum
// amount of time a lock request can be waiting in queue.
// 2) If a node queues a lock request and goes away, queue-ttl expiry will
// ensure that the next set of lock requests can be served.
// 3) lock-ttl is specified in the lock request. This controls how long a
// lockholder can hang on to the lock. If a lockholder dies, the lock-ttl
// expiry ensures that the lock is released in a reasonable time.
// 4) Fairness is ensured by never letting reads leapfrog writes ahead of them.
//
// Implementation notes:
// 1) Each lock request adds a key to the wait queue for the lock. The value
// stored contains the type of operation(Read/Write) and the ID of the
// requestor. The key has a queue-ttl associated with it.
// 2) A lock can be assigned if all the queue entries ahead of the requestor
// are non-conflicting. Reads dont conflict with each other. Writes conflict
// with both Reads and Writes.
// 3) Every time a watch event is received with expire or delete action, the
// queue is reevaluated to see if the lock can be granted.
// 4) When a lock is granted, the corresponding key in the queue is refereshed
// with lock-ttl.
// 5) The queue entry is deleted in the unlock function or on ttl expiry.
package utils
import (
"encoding/json"
"errors"
"flag"
"path"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
"github.com/satori/go.uuid"
)
// Possible errors returned by Lock/Unlock.
var (
ErrLockID = errors.New("Can't get lock id")
ErrLockExpired = errors.New("Lock request expired")
ErrLockReqLost = errors.New("Lock request lost")
ErrLockEnq = errors.New("Error enqueuing lock request")
ErrLockGet = errors.New("Error reading lock queue")
ErrLockDelete = errors.New("Error deleting lock")
ErrLockMarshal = errors.New("Marshal error")
ErrLockUnmarshal = errors.New("Unmarshal error")
)
var QTTL = flag.Uint64("queue-ttl", 30, "Timeout in seconds for a lock queue entry")
var (
lockID = uuid.NewV4().String()
// GetLockID returns identity of a lock requester.
GetLockID = func() (string, error) {
return lockID, nil
}
)
const (
kLocksDir = "/rwlocks" // Locks directory
)
// Lock types.
type LockType int
type LockHandle string
const (
LockTypeRead LockType = iota
LockTypeWrite
)
var LockTypes = map[LockType]string{
LockTypeRead: "read-lock",
LockTypeWrite: "write-lock",
}
// LockState is used to track lock holders/requests for a given lock and is
// stored as the value in etcd.
type LockState struct {
LockType string
Id string
}
// RLock is a function to acquire a read lock.
//
// Inputs:
// client - etcd client object.
// lockType - type of the lock, read or write.
// name - name of the lock.
// ttl - time needed to complete the work after acquiring lock. If the
// lock is not released within this time by calling Unlock, it is
// automatically released.
// Outputs:
// LockHandle - lock handle to provide to UnLock.
// error - any etcd errors or ttl expiry.
func RLock(client Registry, name string, ttl uint64) (LockHandle, error) {
return rwLock(client, LockTypeRead, name, ttl)
}
// WLock is a function to acquire a write lock.
//
// Inputs:
// client - etcd client object.
// lockType - type of the lock, read or write.
// name - name of the lock.
// ttl - time needed to complete the work after acquiring lock. If the
// lock is not released within this time by calling Unlock, it is
// automatically released.
// Outputs:
// LockHandle - lock handle to provide to UnLock.
// error - any etcd errors or ttl expiry.
func WLock(client Registry, name string, ttl uint64) (LockHandle, error) {
return rwLock(client, LockTypeWrite, name, ttl)
}
// RUnlock is used to unlock a read lock.
func RUnlock(client Registry, name string, handle LockHandle) error {
return rwUnlock(client, name, handle)
}
// WUnlock is used to unlock a write lock.
func WUnlock(client Registry, name string, handle LockHandle) error {
return rwUnlock(client, name, handle)
}
// rwLock is a helper function to attempt acquiring a lock.
func rwLock(client Registry, lockType LockType, name string, ttl uint64) (LockHandle,
error) {
// id is the value to be stored against the lock request.
id, err := GetLockID()
if err != nil {
glog.Errorf("Unable to get lock id for lock %s", name)
return "", ErrLockID
}
glog.V(2).Infof("%s %s for %s with ttl %d", LockTypes[lockType], name,
id, ttl)
// Enqueue the lock request.
handle, err := enqLock(client, lockType, name, id)
if err != nil {
return "", err
}
// Wait until the lock is acquired or ttl expires.
if err := waitForLock(client, lockType, name, handle, id, ttl); err != nil {
return "", err
}
return LockHandle(handle), nil
}
// rwUnlock releases the lock with the provided name and lock handle.
func rwUnlock(client Registry, name string, handle LockHandle) error {
glog.V(2).Infof("Unlock called for lock %s, handle %s", name, handle)
key := path.Join(kLocksDir, name, string(handle))
if _, err := client.Delete(key, false); err != nil {
glog.Errorf("Failed to unlock %s/%s with error %s", name,
handle, err.Error())
return ErrLockDelete
}
return nil
}
// Helper function to enqueue the lock request.
func enqLock(client Registry, lockType LockType, name string, id string) (string, error) {
// Value to write to etcd.
state := &LockState{
LockType: LockTypes[lockType],
Id: id,
}
out, err := json.Marshal(&state)
if err != nil {
glog.Errorf("Failed to marshal lock %s with id %s: %s",
name, id, err.Error())
return "", ErrLockMarshal
}
// CreateInOrder is used to atomically insert at the end of the lock queue.
resp, err := client.CreateInOrder(path.Join(kLocksDir, name),
string(out), *QTTL)
if err != nil {
glog.Errorf("Error enqueuing lock %s with id %s: %s",
name, id, err.Error())
return "", ErrLockEnq
}
_, handle := path.Split(resp.Node.Key)
glog.V(2).Infof("Got handle %s for lock %s with id %s", handle, name, id)
return handle, nil
}
// Helper function to wait for a lock.
func waitForLock(client Registry, lockType LockType, name string, handle string,
id string, ttl uint64) error {
// Setup the watch to look for Unlock, ttl expiry events.
watchCh := make(chan *etcd.Response, 1)
watchFailCh := make(chan bool, 1)
watchStopCh := make(chan bool, 1)
go func() {
glog.V(2).Infof("Starting watch for %s %s with handle %s",
LockTypes[lockType], name, handle)
watch(client, name, watchCh, watchStopCh, watchFailCh)
}()
defer func() {
glog.V(2).Infof("Stopping watch for %s %s with handle %s",
LockTypes[lockType], name, handle)
watchStopCh <- true
}()
// Check to see if the lock can be acquired.
if acquired, err := tryLock(client, lockType, name, handle, id, ttl); err != nil {
return err
} else if acquired {
glog.V(2).Infof("Got %s %s with handle %s", LockTypes[lockType],
name, handle)
return nil
}
// Need to wait until a lock expires or is released.
for {
select {
case resp := <-watchCh:
if resp == nil {
glog.Info("Got nil resp in watch channel")
continue
}
glog.V(2).Infof("Watch resp action is %s for %s %s with "+
"handle %s", resp.Action, LockTypes[lockType],
name, handle)
// Some node called unlock or a ttl expired. See if the
// request can go through now. If this request expired,
// return an error.
if resp.Action == "expire" || resp.Action == "delete" {
if _, h := path.Split(resp.PrevNode.Key); h == handle {
// TODO: Delete should not happen when waiting.
glog.Errorf("Lock request for %s with handle "+
"%s expired", name, handle)
return ErrLockExpired
}
if acquired, err := tryLock(client, lockType, name,
handle, id, ttl); err != nil {
return err
} else if acquired {
glog.V(2).Infof("Got %s %s with handle %s",
LockTypes[lockType], name, handle)
return nil
}
}
case <-watchFailCh:
// etcd client watch closes the channel, hence the need
// to recreate.
watchCh = make(chan *etcd.Response, 1)
// Restart the watch.
go watch(client, name, watchCh, watchStopCh, watchFailCh)
}
}
return nil
}
// Helper function to check if the lock can be acquired. Returns true on success.
func tryLock(client Registry, lockType LockType, name string, handle string,
id string, ttl uint64) (bool, error) {
// Get the lock queue in sorted order.
resp, err := client.Get(path.Join(kLocksDir, name), true, true)
if err != nil {
glog.Errorf("Failed to read lock queue for %s with error %s",
name, err.Error())
return false, ErrLockGet
}
for ii, node := range resp.Node.Nodes {
var state LockState
if err := json.Unmarshal([]byte(node.Value), &state); err != nil {
glog.Errorf("Unmarshal of lock %s failed with error %s",
name, err.Error())
return false, ErrLockUnmarshal
}
_, h := path.Split(node.Key)
// For a write lock request to be successful, it needs to be
// first in the queue.
if lockType == LockTypeWrite && ii == 0 {
if state.LockType != LockTypes[LockTypeWrite] ||
h != handle {
return false, nil
}
if err := refreshTTL(client, lockType, name, handle, id,
ttl); err != nil {
return false, err
}
return true, nil
}
// For a read lock request to be successful, it should not hit
// a write until its position in the queue.
if state.LockType != LockTypes[LockTypeRead] {
return false, nil
}
// Reached the entry with matching handle, got the lock.
if h == handle {
if err := refreshTTL(client, lockType, name, handle, id,
ttl); err != nil {
return false, err
}
return true, nil
}
}
return false, ErrLockReqLost
}
// Helper function to refresh ttl.
func refreshTTL(client Registry, lockType LockType, name string, handle string,
id string, ttl uint64) error {
// Value to write to etcd.
state := &LockState{
LockType: LockTypes[lockType],
Id: id,
}
out, err := json.Marshal(&state)
if err != nil {
glog.Errorf("Failed to marshal lock %s with id %s: %s",
name, id, err.Error())
return ErrLockMarshal
}
if _, err := client.Update(path.Join(kLocksDir, name, handle), string(out),
ttl); err != nil {
glog.Errorf("Failed to refresh TTL lock %s with id %s: %s",
name, id, err.Error())
return ErrLockExpired
}
return nil
}
// Helper function to watch the lock. Watch is a blocking call. Launched in a
// separate go routine.
func watch(client Registry, name string, watchCh chan *etcd.Response,
watchStopCh chan bool, watchFailCh chan bool) {
if _, err := client.Watch(path.Join(kLocksDir, name), 0, true, watchCh,
watchStopCh); IsEtcdWatchStoppedByUser(err) {
return
} else {
glog.Errorf("Watch returned err %s for key %s", err.Error(), name)
watchFailCh <- true
}
}