-
Notifications
You must be signed in to change notification settings - Fork 5
/
mvmemory.go
150 lines (127 loc) · 3.74 KB
/
mvmemory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package block_stm
import (
"sync/atomic"
storetypes "cosmossdk.io/store/types"
)
type (
// keys are sorted
Locations []Key
MultiLocations map[int]Locations
)
// MVMemory implements `Algorithm 2 The MVMemory module`
type MVMemory struct {
storage MultiStore
scheduler *Scheduler
stores map[storetypes.StoreKey]int
data []MVStore
lastWrittenLocations []atomic.Pointer[MultiLocations]
lastReadSet []atomic.Pointer[MultiReadSet]
}
func NewMVMemory(
block_size int, stores map[storetypes.StoreKey]int,
storage MultiStore, scheduler *Scheduler,
) *MVMemory {
return NewMVMemoryWithEstimates(block_size, stores, storage, scheduler, nil)
}
func NewMVMemoryWithEstimates(
block_size int, stores map[storetypes.StoreKey]int,
storage MultiStore, scheduler *Scheduler, estimates []MultiLocations,
) *MVMemory {
data := make([]MVStore, len(stores))
for key, i := range stores {
data[i] = NewMVStore(key)
}
mv := &MVMemory{
storage: storage,
scheduler: scheduler,
stores: stores,
data: data,
lastWrittenLocations: make([]atomic.Pointer[MultiLocations], block_size),
lastReadSet: make([]atomic.Pointer[MultiReadSet], block_size),
}
// init with pre-estimates
for txn, est := range estimates {
mv.rcuUpdateWrittenLocations(TxnIndex(txn), est)
mv.ConvertWritesToEstimates(TxnIndex(txn))
}
return mv
}
func (mv *MVMemory) Record(version TxnVersion, view *MultiMVMemoryView) bool {
newLocations := view.ApplyWriteSet(version)
wroteNewLocation := mv.rcuUpdateWrittenLocations(version.Index, newLocations)
mv.lastReadSet[version.Index].Store(view.ReadSet())
return wroteNewLocation
}
// newLocations are sorted
func (mv *MVMemory) rcuUpdateWrittenLocations(txn TxnIndex, newLocations MultiLocations) bool {
var wroteNewLocation bool
prevLocations := mv.readLastWrittenLocations(txn)
for i, newLoc := range newLocations {
prevLoc, ok := prevLocations[i]
if !ok {
if len(newLocations[i]) > 0 {
wroteNewLocation = true
}
continue
}
DiffOrderedList(prevLoc, newLoc, func(key Key, is_new bool) bool {
if is_new {
wroteNewLocation = true
} else {
mv.data[i].Delete(key, txn)
}
return true
})
}
// delete all the keys in un-touched stores
for i, prevLoc := range prevLocations {
if _, ok := newLocations[i]; ok {
continue
}
for _, key := range prevLoc {
mv.data[i].Delete(key, txn)
}
}
mv.lastWrittenLocations[txn].Store(&newLocations)
return wroteNewLocation
}
func (mv *MVMemory) ConvertWritesToEstimates(txn TxnIndex) {
for i, locations := range mv.readLastWrittenLocations(txn) {
for _, key := range locations {
mv.data[i].WriteEstimate(key, txn)
}
}
}
func (mv *MVMemory) ValidateReadSet(txn TxnIndex) bool {
// Invariant: at least one `Record` call has been made for `txn`
rs := *mv.lastReadSet[txn].Load()
for store, readSet := range rs {
if !mv.data[store].ValidateReadSet(txn, readSet) {
return false
}
}
return true
}
func (mv *MVMemory) readLastWrittenLocations(txn TxnIndex) MultiLocations {
p := mv.lastWrittenLocations[txn].Load()
if p != nil {
return *p
}
return nil
}
func (mv *MVMemory) WriteSnapshot(storage MultiStore) {
for name, i := range mv.stores {
mv.data[i].SnapshotToStore(storage.GetStore(name))
}
}
// View creates a view for a particular transaction.
func (mv *MVMemory) View(txn TxnIndex) *MultiMVMemoryView {
return NewMultiMVMemoryView(mv.stores, mv.newMVView, txn)
}
func (mv *MVMemory) newMVView(name storetypes.StoreKey, txn TxnIndex) MVView {
i := mv.stores[name]
return NewMVView(i, mv.storage.GetStore(name), mv.GetMVStore(i), mv.scheduler, txn)
}
func (mv *MVMemory) GetMVStore(i int) MVStore {
return mv.data[i]
}