This repository has been archived by the owner on Jul 16, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
store.go
213 lines (173 loc) · 4.68 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package mapstore
import (
"bytes"
"errors"
"os"
"sync"
kubeerr "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Interface defines the required methods to satisfy the Manager implementation.
type Interface interface {
Keys() ([]string, error)
Get(key string) ([]byte, error)
Set(key string, value []byte) error
Delete(key string) error
Truncate() error
}
// AdvancedInterface defines the required methods and a few optional methods for the Manager implementation.
type AdvancedInterface interface {
Interface
Raw() (map[string][]byte, error)
ForceSet(key string, value []byte) error
}
// Verify we meet the requirements for our own interfaces.
var (
_ Interface = &Manager{}
_ AdvancedInterface = &Manager{}
)
// Manager is a thread safe key value store backed by a Kubernetes ConfigMap.
type Manager struct {
*sync.RWMutex
configMapName string
client *kubeClient
cacheEnabled bool
internalCache map[string][]byte
}
// New returns a newly setup Manager instance.
func New(cmNamespace, cmName string, cacheInternally bool) (*Manager, error) {
// Grab the KubeClient.
kubeClient, err := getKubeClient(cmNamespace, os.Getenv(clusterUseLocalClusterEnv) == "1")
if err != nil {
return nil, err
}
// If we are caching internally, fetch the data first.
cache := map[string][]byte{}
if cacheInternally {
if cm, err := kubeClient.getOrCreateConfigMap(cmName); err != nil {
return nil, err
} else if cm.BinaryData != nil {
cache = cm.BinaryData
}
}
return &Manager{
RWMutex: &sync.RWMutex{},
configMapName: cmName,
client: kubeClient,
cacheEnabled: cacheInternally,
internalCache: cache,
}, nil
}
func (k *Manager) getMapData() (map[string][]byte, error) {
if k.cacheEnabled {
return k.internalCache, nil
}
data, err := k.client.get(k.configMapName)
// Determine if the error was a "not found" error or not.
var statusErr *kubeerr.StatusError
if ok := errors.As(err, &statusErr); ok && statusErr.Status().Reason != v1.StatusReasonNotFound {
return nil, err
}
// If data hasn't been set yet, create an empty map.
if data == nil {
data = map[string][]byte{}
}
return data, nil
}
// Keys returns all the key names from the ConfigMap.
func (k *Manager) Keys() ([]string, error) {
k.RLock()
defer k.RUnlock()
// Grab the data map.
dataMap, err := k.getMapData()
if err != nil {
return nil, err
}
// Lookup all the keys.
keys := make([]string, 0, len(dataMap))
for k := range dataMap {
keys = append(keys, k)
}
return keys, nil
}
// Get uses the supplied key and attempts to return the corresponding value from the ConfigMap.
func (k *Manager) Get(key string) ([]byte, error) {
k.RLock()
defer k.RUnlock()
// Grab the data map.
dataMap, err := k.getMapData()
if err != nil {
return nil, err
}
// Lookup the value, return not found if it failed.
val, ok := dataMap[key]
if !ok {
return nil, ErrKeyNotFound
}
return val, nil
}
// Raw returns the actual underlying map data.
func (k *Manager) Raw() (map[string][]byte, error) {
k.RLock()
defer k.RUnlock()
// Grab the data map.
dataMap, err := k.getMapData()
if err != nil {
return nil, err
}
return dataMap, nil
}
// Set checks if the value has changed before performing the underlying save call.
func (k *Manager) Set(key string, value []byte) error {
k.Lock()
defer k.Unlock()
return k.set(key, value, false)
}
// ForceSet is the same as Set, but does not check if the values are equal first.
func (k *Manager) ForceSet(key string, value []byte) error {
k.Lock()
defer k.Unlock()
return k.set(key, value, true)
}
func (k *Manager) set(key string, value []byte, force bool) error {
// Grab the data map.
dataMap, err := k.getMapData()
if err != nil {
return err
}
if !force {
// Look up the original value and check if it's the same.
if ogValue, ok := dataMap[key]; ok && bytes.Equal(ogValue, value) {
return nil
}
}
// Set the new value.
dataMap[key] = value
// Write the ConfigMap.
return k.client.set(k.configMapName, dataMap)
}
// Delete removes the given key from the underlying ConfigMap.
func (k *Manager) Delete(key string) error {
k.Lock()
defer k.Unlock()
// Grab the data map.
dataMap, err := k.getMapData()
if err != nil {
return err
}
// Delete the key/value.
delete(dataMap, key)
// Write the ConfigMap.
return k.client.set(k.configMapName, dataMap)
}
// Truncate removes all the data from the underlying ConfigMap.
func (k *Manager) Truncate() error {
k.Lock()
defer k.Unlock()
// Reset the internal cache if needed.
if k.cacheEnabled {
k.internalCache = map[string][]byte{}
}
// Write the ConfigMap with a new blank map.
return k.client.set(k.configMapName, map[string][]byte{})
}