forked from talent-plan/tinykv
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlatches.go
99 lines (86 loc) · 3.57 KB
/
latches.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package latches
import (
"sync"
"github.com/pingcap-incubator/tinykv/kv/transaction/mvcc"
)
// Latching provides atomicity of TinyKV commands. This should not be confused with SQL transactions which provide atomicity
// for multiple TinyKV commands. For example, consider two commit commands, these write to multiple keys/CFs so if they race,
// then it is possible for inconsistent data to be written. By latching the keys each command might write, we ensure that the
// two commands will not race to write the same keys.
//
// A latch is a per-key lock. There is only one latch per user key, not one per CF or one for each encoded key. Latches are
// only needed for writing. Only one thread can hold a latch at a time and all keys that a command might write must be locked
// at once.
//
// Latching is implemented using a single map which maps keys to a Go WaitGroup. Access to this map is guarded by a mutex
// to ensure that latching is atomic and consistent. Since the mutex is a global lock, it would cause intolerable contention
// in a real system.
type Latches struct {
// Before modifying any property of a key, the thread must have the latch for that key. `Latches` maps each latched
// key to a WaitGroup. Threads who find a key locked should wait on that WaitGroup.
latchMap map[string]*sync.WaitGroup
// Mutex to guard latchMap. A thread must hold this mutex while it makes any change to latchMap.
latchGuard sync.Mutex
// An optional validation function, only used for testing.
Validation func(txn *mvcc.MvccTxn, keys [][]byte)
}
// NewLatches creates a new Latches object for managing a databases latches. There should only be one such object, shared
// between all threads.
func NewLatches() *Latches {
l := new(Latches)
l.latchMap = make(map[string]*sync.WaitGroup)
return l
}
// AcquireLatches tries lock all Latches specified by keys. If this succeeds, nil is returned. If any of the keys are
// locked, then AcquireLatches requires a WaitGroup which the thread can use to be woken when the lock is free.
func (l *Latches) AcquireLatches(keysToLatch [][]byte) *sync.WaitGroup {
l.latchGuard.Lock()
defer l.latchGuard.Unlock()
// Check none of the keys we want to write are locked.
for _, key := range keysToLatch {
if latchWg, ok := l.latchMap[string(key)]; ok {
// Return a wait group to wait on.
return latchWg
}
}
// All Latches are available, lock them all with a new wait group.
wg := new(sync.WaitGroup)
wg.Add(1)
for _, key := range keysToLatch {
l.latchMap[string(key)] = wg
}
return nil
}
// ReleaseLatches releases the latches for all keys in keysToUnlatch. It will wakeup any threads blocked on one of the
// latches. All keys in keysToUnlatch must have been locked together in one call to AcquireLatches.
func (l *Latches) ReleaseLatches(keysToUnlatch [][]byte) {
l.latchGuard.Lock()
defer l.latchGuard.Unlock()
first := true
for _, key := range keysToUnlatch {
if first {
wg := l.latchMap[string(key)]
wg.Done()
first = false
}
delete(l.latchMap, string(key))
}
}
// WaitForLatches attempts to lock all keys in keysToLatch using AcquireLatches. If a latch is already locked, then
// WaitForLatches will wait for it to become unlocked then try again. Therefore WaitForLatches may block for an unbounded
// length of time.
func (l *Latches) WaitForLatches(keysToLatch [][]byte) {
for {
wg := l.AcquireLatches(keysToLatch)
if wg == nil {
return
}
wg.Wait()
}
}
// Validate calls the function in Validation, if it exists.
func (l *Latches) Validate(txn *mvcc.MvccTxn, latched [][]byte) {
if l.Validation != nil {
l.Validation(txn, latched)
}
}