-
Notifications
You must be signed in to change notification settings - Fork 0
/
strmap.go
153 lines (139 loc) · 4.21 KB
/
strmap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package shardedmap
import (
"sync"
)
// Implementation: This is a sharded map so that the cost of locking is
// distributed with the data, instead of a single lock.
// The optimal number of shards will probably depend on the number of system
// cores but we provide a general default.
type StrMap struct {
shardCount uint64 // Don't alter after creation, no mutex here
mutexes []sync.RWMutex
maps []map[string]interface{}
}
// NewStrMap ...
func NewStrMap(shardCount int) *StrMap {
if shardCount <= 0 {
shardCount = defaultShards
}
sm := &StrMap{
shardCount: uint64(shardCount),
mutexes: make([]sync.RWMutex, shardCount),
maps: make([]map[string]interface{}, shardCount),
}
for i := range sm.maps {
sm.maps[i] = make(map[string]interface{})
}
return sm
}
func (sm *StrMap) pickShard(key string) uint64 {
return memHashString(key) % sm.shardCount
}
// Store ...
func (sm *StrMap) Store(key string, value interface{}) {
shard := sm.pickShard(key)
sm.mutexes[shard].Lock()
sm.maps[shard][key] = value
sm.mutexes[shard].Unlock()
}
// Load ...
func (sm *StrMap) Load(key string) (interface{}, bool) {
shard := sm.pickShard(key)
sm.mutexes[shard].RLock()
value, ok := sm.maps[shard][key]
sm.mutexes[shard].RUnlock()
return value, ok
}
// LoadOrStore ...
func (sm *StrMap) LoadOrStore(key string, value interface{}) (actual interface{}, loaded bool) {
shard := sm.pickShard(key)
sm.mutexes[shard].RLock()
// Fast path assuming value has a somewhat high chance of already being
// there.
if actual, loaded = sm.maps[shard][key]; loaded {
sm.mutexes[shard].RUnlock()
return
}
sm.mutexes[shard].RUnlock()
// Gotta check again, unfortunately
sm.mutexes[shard].Lock()
if actual, loaded = sm.maps[shard][key]; loaded {
sm.mutexes[shard].Unlock()
return
}
sm.maps[shard][key] = value
sm.mutexes[shard].Unlock()
return value, loaded
}
// Delete ...
func (sm *StrMap) Delete(key string) {
shard := sm.pickShard(key)
sm.mutexes[shard].Lock()
delete(sm.maps[shard], key)
sm.mutexes[shard].Unlock()
}
// Range is modeled after sync.Map.Range. It calls f sequentially for each key
// and value present in each of the shards in the map. If f returns false, range
// stops the iteration.
//
// No key will be visited more than once, but if any value is inserted
// concurrently, Range may or may not visit it. Similarly, if a value is
// modified concurrently, Range may visit the previous or newest version of said
// value.
func (sm *StrMap) Range(f func(key string, value interface{}) bool) {
for shard := range sm.mutexes {
sm.mutexes[shard].RLock()
for key, value := range sm.maps[shard] {
if !f(key, value) {
sm.mutexes[shard].RUnlock()
return
}
}
sm.mutexes[shard].RUnlock()
}
}
// ConcRange ranges concurrently over all the shards, calling f sequentially
// over each shard's key and value. If f returns false, range stops the
// iteration on that shard (but the other shards continue until completion).
//
// No key will be visited more than once, but if any value is inserted
// concurrently, Range may or may not visit it. Similarly, if a value is
// modified concurrently, Range may visit the previous or newest version of said
// value.
func (sm *StrMap) ConcRange(f func(key string, value interface{}) bool) {
var wg sync.WaitGroup
wg.Add(int(sm.shardCount))
for shard := range sm.mutexes {
go func(shard int) {
sm.mutexes[shard].RLock()
for key, value := range sm.maps[shard] {
if !f(key, value) {
sm.mutexes[shard].RUnlock()
wg.Done()
return
}
}
sm.mutexes[shard].RUnlock()
wg.Done()
}(shard)
}
wg.Wait()
}
// AsyncRange is exactly like ConcRange, but doesn't wait until all shards are
// done. This is usually ok, although calls that appear to happen "sequentially"
// on the same goroutine might get the before or after AsyncRange values, which
// might be surprising behaviour. When that's not desirable, use ConcRange.
func (sm *StrMap) AsyncRange(f func(key string, value interface{}) bool) {
for shard := range sm.mutexes {
go func(shard int) {
sm.mutexes[shard].RLock()
for key, value := range sm.maps[shard] {
if !f(key, value) {
sm.mutexes[shard].RUnlock()
return
}
}
sm.mutexes[shard].RUnlock()
}(shard)
}
}