This repository has been archived by the owner on Jul 16, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
kubernetes.go
164 lines (129 loc) · 4.23 KB
/
kubernetes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package mapstore
import (
"context"
"errors"
"os"
"path/filepath"
"time"
corev1 "k8s.io/api/core/v1"
kubeerr "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth" // Import auth for local cluster configs.
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
const (
clusterUseLocalClusterEnv = "USE_LOCAL_CLUSTER"
contextTimeout = 10 * time.Second
)
var singleton *kubeClient
// kubeClient wires up the connection to the cluster.
type kubeClient struct {
client kubernetes.Interface
namespace string
}
func getKubeClient(namespace string, userLocalCluster bool) (*kubeClient, error) {
// Try to return the singleton first.
if singleton != nil {
return singleton, nil
}
// Otherwise we need to setup the client and set the singleton.
var err error
var config *rest.Config
// Determine if we should connect via our `~/.kube/config` credentials or the embedded config values.
if userLocalCluster {
cfg := filepath.Join(os.Getenv("HOME"), ".kube", "config")
config, err = clientcmd.BuildConfigFromFlags("", cfg)
if err != nil {
return nil, err
}
} else {
config, err = rest.InClusterConfig()
if err != nil {
return nil, err
}
}
// Create the clientset based on the config.
var clientset *kubernetes.Clientset
clientset, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
// Set the singleton.
singleton = &kubeClient{clientset, namespace}
return singleton, nil
}
func (k *kubeClient) getDefaultTimeoutContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), contextTimeout)
}
func (k *kubeClient) getConfigMap(name string) (*corev1.ConfigMap, error) {
ctx, cancel := k.getDefaultTimeoutContext()
defer cancel()
return k.client.CoreV1().ConfigMaps(k.namespace).Get(ctx, name, v1.GetOptions{})
}
func (k *kubeClient) getOrCreateConfigMap(name string) (*corev1.ConfigMap, error) {
getCtx, getCancel := k.getDefaultTimeoutContext()
defer getCancel()
// Attempt to fetch the existing ConfigMap.
configMap, err := k.client.CoreV1().ConfigMaps(k.namespace).Get(getCtx, name, v1.GetOptions{})
// If no error was returned and we have valid ConfigMap, return it.
if err == nil && configMap != nil {
return configMap, nil
}
// But if we had an error other than StatusReasonNotFound, return it.
var statusErr *kubeerr.StatusError
if ok := errors.As(err, &statusErr); ok && statusErr.Status().Reason != v1.StatusReasonNotFound {
return nil, err
}
// Looks like we need to create the ConfigMap.
configMap = &corev1.ConfigMap{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: k.namespace,
},
}
createCtx, createCancel := k.getDefaultTimeoutContext()
defer createCancel()
return k.client.CoreV1().ConfigMaps(k.namespace).Create(createCtx, configMap, v1.CreateOptions{})
}
func (k *kubeClient) get(name string) (map[string][]byte, error) {
cm, err := k.getConfigMap(name)
if err != nil {
return nil, err
}
return cm.BinaryData, err
}
func (k *kubeClient) set(name string, binaryData map[string][]byte) error {
updateCtx, updateCancel := k.getDefaultTimeoutContext()
defer updateCancel()
// Attempt to update if it exists.
if configMap, err := k.getConfigMap(name); err == nil {
configMap.BinaryData = binaryData
_, updateErr := k.client.CoreV1().ConfigMaps(k.namespace).Update(updateCtx, configMap, v1.UpdateOptions{})
return updateErr
}
// Doesn't exists, create it instead.
configMap := &corev1.ConfigMap{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: k.namespace,
},
BinaryData: binaryData,
}
creatCtx, createCancel := k.getDefaultTimeoutContext()
defer createCancel()
_, err := k.client.CoreV1().ConfigMaps(k.namespace).Create(creatCtx, configMap, v1.CreateOptions{})
return err
}
func (k *kubeClient) delete(name string) error {
ctx, cancel := k.getDefaultTimeoutContext()
defer cancel()
err := k.client.CoreV1().ConfigMaps(k.namespace).Delete(ctx, name, v1.DeleteOptions{})
// We can safely ignore not found errors.
var statusErr *kubeerr.StatusError
if ok := errors.As(err, &statusErr); ok && statusErr.Status().Reason == v1.StatusReasonNotFound {
return nil
}
return err
}