From fff02f1d84f214904fcd38dc2140343742db63c7 Mon Sep 17 00:00:00 2001 From: Abdelrahman Ahmed <16365652+abahmed@users.noreply.github.com> Date: Thu, 25 Jul 2024 04:08:13 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=80=20support=20node=20monitoring=20(#?= =?UTF-8?q?333)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 🐛 fix ignore failed graceful shutdown * 🚀 support node monitoring * 🚀 support node monitoring --- README.md | 9 +++++ config/config.go | 10 +++++ config/defaultConfig.go | 3 ++ go.mod | 16 ++++---- go.sum | 32 ++++++++-------- handler/handler.go | 5 ++- handler/processNode.go | 43 +++++++++++++++++++++ handler/processPod.go | 11 +++++- main.go | 2 +- storage/memory/memory.go | 18 +++++++++ storage/memory/memory_test.go | 46 +++++++++++++++++++++++ storage/storage.go | 4 ++ watcher/start.go | 71 +++++++++++++++++++++++++++++++---- watcher/watcher.go | 19 ++++------ 14 files changed, 240 insertions(+), 49 deletions(-) create mode 100644 handler/processNode.go diff --git a/README.md b/README.md index 7a432328..bdbf564f 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,15 @@ kubectl apply -f https://raw.githubusercontent.com/abahmed/kwatch/v0.9.5/deploy/ | `pvcMonitor.interval` | the frequency (in minutes) to check pvc usage in the cluster (default: 15) | | `pvcMonitor.threshold` | the percentage of accepted pvc usage. if current usage exceeds this value, it will send a notification (default: 80) | + +### Node Monitor + +| Parameter | Description | +|:-----------------------------|:------------------------------------------- | +| `nodeMonitor.enabled` | to enable or disable node monitoring (default: true) | + + + ### Alerts #### Slack diff --git a/config/config.go b/config/config.go index f3aaa2ec..465fa4d1 100644 --- a/config/config.go +++ b/config/config.go @@ -14,6 +14,9 @@ type Config struct { // PvcMonitor configuration PvcMonitor PvcMonitor `yaml:"pvcMonitor"` + // NodeMonitor configuration + NodeMonitor NodeMonitor `yaml:"nodeMonitor"` + // MaxRecentLogLines optional max tail log lines in messages, // if it's not provided it will get all log lines MaxRecentLogLines int64 `yaml:"maxRecentLogLines"` @@ -107,3 +110,10 @@ type PvcMonitor struct { // By default, this value is 80 Threshold float64 `yaml:"threshold"` } + +// NodeMonitor confing struct +type NodeMonitor struct { + // Enabled if set to true, it will enable node watcher + // By default, this value is true + Enabled bool `yaml:"enabled"` +} diff --git a/config/defaultConfig.go b/config/defaultConfig.go index 618c51f1..78577bd4 100644 --- a/config/defaultConfig.go +++ b/config/defaultConfig.go @@ -11,5 +11,8 @@ func DefaultConfig() *Config { Interval: 5, Threshold: 80, }, + NodeMonitor: NodeMonitor{ + Enabled: true, + }, } } diff --git a/go.mod b/go.mod index a2359fa2..672b45ee 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/abahmed/kwatch -go 1.22.4 +go 1.22.5 require ( github.com/bwmarrin/discordgo v0.28.1 @@ -41,12 +41,12 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - golang.org/x/crypto v0.24.0 // indirect - golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 - golang.org/x/net v0.26.0 // indirect + golang.org/x/crypto v0.25.0 // indirect + golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 + golang.org/x/net v0.27.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect - golang.org/x/sys v0.21.0 // indirect - golang.org/x/term v0.21.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/term v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/protobuf v1.34.2 // indirect @@ -54,8 +54,8 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect k8s.io/klog/v2 v2.130.1 // indirect - k8s.io/kube-openapi v0.0.0-20240620174524-b456828f718b // indirect - k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 // indirect + k8s.io/kube-openapi v0.0.0-20240709000822-3c01b740850f // indirect + k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect sigs.k8s.io/yaml v1.4.0 // indirect diff --git a/go.sum b/go.sum index 0b87f3dd..618bcbe0 100644 --- a/go.sum +++ b/go.sum @@ -94,10 +94,10 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= -golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= -golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= -golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= +golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= +golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -105,8 +105,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= -golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -117,11 +117,11 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= -golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= -golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= +golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk= +golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= @@ -132,8 +132,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA= -golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c= +golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg= +golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -164,10 +164,10 @@ k8s.io/client-go v0.30.3 h1:bHrJu3xQZNXIi8/MoxYtZBBWQQXwy16zqJwloXXfD3k= k8s.io/client-go v0.30.3/go.mod h1:8d4pf8vYu665/kUbsxWAQ/JDBNWqfFeZnvFiVdmx89U= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= -k8s.io/kube-openapi v0.0.0-20240620174524-b456828f718b h1:Q9xmGWBvOGd8UJyccgpYlLosk/JlfP3xQLNkQlHJeXw= -k8s.io/kube-openapi v0.0.0-20240620174524-b456828f718b/go.mod h1:UxDHUPsUwTOOxSU+oXURfFBcAS6JwiRXTYqYwfuGowc= -k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 h1:jgGTlFYnhF1PM1Ax/lAlxUPE+KfCIXHaathvJg1C3ak= -k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/kube-openapi v0.0.0-20240709000822-3c01b740850f h1:2sXuKesAYbRHxL3aE2PN6zX/gcJr22cjrsej+W784Tc= +k8s.io/kube-openapi v0.0.0-20240709000822-3c01b740850f/go.mod h1:UxDHUPsUwTOOxSU+oXURfFBcAS6JwiRXTYqYwfuGowc= +k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A= +k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= diff --git a/handler/handler.go b/handler/handler.go index d8aa6d15..b380ebc4 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -5,12 +5,13 @@ import ( "github.com/abahmed/kwatch/config" "github.com/abahmed/kwatch/filter" "github.com/abahmed/kwatch/storage" - corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" ) type Handler interface { - ProcessPod(evType string, pod *corev1.Pod) + ProcessPod(evType string, obj runtime.Object) + ProcessNode(evType string, obj runtime.Object) } type handler struct { diff --git a/handler/processNode.go b/handler/processNode.go new file mode 100644 index 00000000..fc014391 --- /dev/null +++ b/handler/processNode.go @@ -0,0 +1,43 @@ +package handler + +import ( + "fmt" + + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +func (h *handler) ProcessNode(eventType string, obj runtime.Object) { + if obj == nil { + return + } + + node, ok := obj.(*corev1.Node) + if !ok { + logrus.Warnf("failed to cast event to node object: %v", obj) + return + } + + if eventType == "DELETED" { + h.memory.DelNode(node.Name) + return + } + + for _, c := range node.Status.Conditions { + if c.Type == corev1.NodeReady { + if c.Status == corev1.ConditionFalse && !h.memory.HasNode(node.Name) { + logrus.Printf("node %s is not ready: %s", node.Name, c.Reason) + h.alertManager.Notify(fmt.Sprintf("Node %s is not ready: %s - %s", + node.Name, + c.Reason, + c.Message, + )) + h.memory.AddNode(node.Name) + } else if c.Status == corev1.ConditionTrue { + h.memory.DelNode(node.Name) + } + } + } + +} diff --git a/handler/processPod.go b/handler/processPod.go index 56af6863..07385d6b 100644 --- a/handler/processPod.go +++ b/handler/processPod.go @@ -5,10 +5,17 @@ import ( "github.com/abahmed/kwatch/util" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" ) -func (h *handler) ProcessPod(eventType string, pod *corev1.Pod) { - if pod == nil { +func (h *handler) ProcessPod(eventType string, obj runtime.Object) { + if obj == nil { + return + } + + pod, ok := obj.(*corev1.Pod) + if !ok { + logrus.Warnf("failed to cast event to pod object: %v", obj) return } diff --git a/main.go b/main.go index 630d4a55..b76534f0 100644 --- a/main.go +++ b/main.go @@ -54,7 +54,7 @@ func main() { ) // start watcher - watcher.Start(client, config, h.ProcessPod) + watcher.Start(client, config, h) } func setLogFormatter(formatter string) { diff --git a/storage/memory/memory.go b/storage/memory/memory.go index a0bef40d..1bba4563 100644 --- a/storage/memory/memory.go +++ b/storage/memory/memory.go @@ -8,12 +8,14 @@ import ( type memory struct { smap sync.Map + nmap sync.Map } // NewMemory returns new Memory object func NewMemory() storage.Storage { return &memory{ smap: sync.Map{}, + nmap: sync.Map{}, } } @@ -87,3 +89,19 @@ func (m *memory) GetPodContainer(namespace, podKey, containerKey string) *storag func (*memory) getKey(namespace, pod string) string { return namespace + "/" + pod } + +// AddNode stores node with key +func (m *memory) AddNode(nodeKey string) { + m.nmap.Store(nodeKey, true) +} + +// HasNode checks if node is stored +func (m *memory) HasNode(nodeKey string) bool { + _, ok := m.nmap.Load(nodeKey) + return ok +} + +// AddNode deletes node with key +func (m *memory) DelNode(nodeKey string) { + m.nmap.Delete(nodeKey) +} diff --git a/storage/memory/memory_test.go b/storage/memory/memory_test.go index 3db9652f..094b962b 100644 --- a/storage/memory/memory_test.go +++ b/storage/memory/memory_test.go @@ -125,3 +125,49 @@ func TestDelPod(t *testing.T) { t.Errorf("expected not to find pod test") } } + +func TestAddNode(t *testing.T) { + mem := &memory{ + nmap: sync.Map{}, + } + + mem.AddNode("default-node-1") + mem.AddNode("default-node-2") + + if _, ok := mem.nmap.Load("default-node-1"); !ok { + t.Errorf("expected to find node default-node-1") + } +} + +func TestHasNode(t *testing.T) { + mem := &memory{ + nmap: sync.Map{}, + } + + mem.AddNode("default-node-1") + mem.AddNode("default-node-2") + + if !mem.HasNode(("default-node-1")) { + t.Errorf("expected to find node default-node-1") + } + + if mem.HasNode("default-node-3") { + t.Errorf("expected not to find node default-node-3") + } +} + +func TestDelNode(t *testing.T) { + mem := &memory{ + smap: sync.Map{}, + } + + mem.AddNode("default-node-1") + mem.AddNode("default-node-2") + + mem.DelNode("default-node-1") + mem.DelNode("default-node-2") + + if _, ok := mem.nmap.Load("default-node-1"); ok { + t.Errorf("expected not to find node default-node-1") + } +} diff --git a/storage/storage.go b/storage/storage.go index c09db911..954bc026 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -19,4 +19,8 @@ type Storage interface { DelPod(namespace, podKey string) HasPodContainer(namespace, podKey, containerKey string) bool GetPodContainer(namespace, podKey, containerKey string) *ContainerState + + AddNode(nodeKey string) + HasNode(nodeKey string) bool + DelNode(nodeKey string) } diff --git a/watcher/start.go b/watcher/start.go index 46e57b6c..bce99924 100644 --- a/watcher/start.go +++ b/watcher/start.go @@ -4,8 +4,9 @@ import ( "context" "github.com/abahmed/kwatch/config" - corev1 "k8s.io/api/core/v1" + "github.com/abahmed/kwatch/handler" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -18,7 +19,52 @@ import ( func Start( client kubernetes.Interface, config *config.Config, - handleFunc func(string, *corev1.Pod)) { + handler handler.Handler) { + + watchers := []*Watcher{ + newPodWatcher(client, config, handler.ProcessPod), + } + + if config.NodeMonitor.Enabled { + watchers = append(watchers, newNodeWatcher(client, handler.ProcessNode)) + } + + stopCh := make(chan struct{}) + defer close(stopCh) + + for idx := range watchers { + go watchers[idx].run(stopCh) + } + + <-stopCh +} + +// newNodeWatcher creates watcher for nodes +func newNodeWatcher( + client kubernetes.Interface, + handler func(evType string, obj runtime.Object), +) *Watcher { + watchFunc := + func(options metav1.ListOptions) (watch.Interface, error) { + return client.CoreV1().Nodes().Watch( + context.Background(), + metav1.ListOptions{}, + ) + } + + return newWatcher( + "node", + watchFunc, + handler, + ) +} + +// newPodWatcher creates watcher for pods +func newPodWatcher( + client kubernetes.Interface, + config *config.Config, + handler func(evType string, obj runtime.Object), +) *Watcher { namespace := metav1.NamespaceAll if len(config.AllowedNamespaces) == 1 { namespace = config.AllowedNamespaces[0] @@ -32,20 +78,29 @@ func Start( ) } + return newWatcher( + "pod", + watchFunc, + handler, + ) +} + +// newWatcher creates watcher with provided name, watch, and handle functions +func newWatcher( + name string, + watchFunc func(options metav1.ListOptions) (watch.Interface, error), + handleFunc func(string, runtime.Object), +) *Watcher { watcher, _ := toolsWatch.NewRetryWatcher( "1", &cache.ListWatch{WatchFunc: watchFunc}, ) - w := &Watcher{ + return &Watcher{ + name: name, watcher: watcher, queue: workqueue.New(), handlerFunc: handleFunc, } - - stopCh := make(chan struct{}) - defer close(stopCh) - - w.run(stopCh) } diff --git a/watcher/watcher.go b/watcher/watcher.go index 21fa86c1..b3dc56a2 100644 --- a/watcher/watcher.go +++ b/watcher/watcher.go @@ -4,7 +4,7 @@ import ( "time" "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" toolsWatch "k8s.io/client-go/tools/watch" @@ -13,13 +13,14 @@ import ( type watcherEvent struct { eventType string - pod *corev1.Pod + obj runtime.Object } type Watcher struct { + name string watcher *toolsWatch.RetryWatcher queue *workqueue.Type - handlerFunc func(string, *corev1.Pod) + handlerFunc func(string, runtime.Object) } // run starts the watcher @@ -27,7 +28,7 @@ func (w *Watcher) run(stopCh chan struct{}) { defer utilruntime.HandleCrash() defer w.queue.ShutDown() - logrus.Info("starting pod watcher") + logrus.Infof("starting %s watcher", w.name) go wait.Until(w.processEvents, time.Second, stopCh) go wait.Until(w.runWorker, time.Second, stopCh) @@ -41,15 +42,9 @@ func (w *Watcher) processEvents() { } for event := range w.watcher.ResultChan() { - pod, ok := event.Object.(*corev1.Pod) - if !ok { - logrus.Warnf("failed to cast event to pod object: %v", event.Object) - continue - } - w.queue.Add(watcherEvent{ eventType: string(event.Type), - pod: pod.DeepCopy(), + obj: event.Object.DeepCopyObject(), }) } } @@ -74,7 +69,7 @@ func (w *Watcher) processNextItem() bool { return true } - w.handlerFunc(ev.eventType, ev.pod) + w.handlerFunc(ev.eventType, ev.obj) return true }