diff --git a/README.md b/README.md index 45390f78..c2ce2448 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ kubectl apply -f https://raw.githubusercontent.com/abahmed/kwatch/main/deploy/de ### Cleanup ```shell -kubectl delete -f config.yaml +kubectl delete -f https://raw.githubusercontent.com/abahmed/kwatch/main/deploy/config.yaml kubectl delete -f https://raw.githubusercontent.com/abahmed/kwatch/main/deploy/deploy.yaml ``` diff --git a/client/client.go b/client/client.go index 7dd7abbe..8c878639 100644 --- a/client/client.go +++ b/client/client.go @@ -38,5 +38,7 @@ func Create() kubernetes.Interface { logrus.Fatalf("cannot create kubernetes client: %v", err) } + logrus.Debugf("created kubernetes client successfully") + return clientset } diff --git a/controller/controller.go b/controller/controller.go index 3b069d5b..706466ec 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -38,17 +38,20 @@ func Start() { kclient := client.Create() // create rate limiting queue - queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + queue := + workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) indexer, informer := cache.NewIndexerInformer( &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - //options.FieldSelector = "status.phase!=Running" - return kclient.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), options) + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + return kclient.CoreV1(). + Pods(v1.NamespaceAll). + List(context.TODO(), opts) }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - //options.FieldSelector = "status.phase!=Running" - return kclient.CoreV1().Pods(v1.NamespaceAll).Watch(context.TODO(), options) + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + return kclient.CoreV1(). + Pods(v1.NamespaceAll). + Watch(context.TODO(), opts) }, }, &v1.Pod{}, @@ -69,8 +72,8 @@ func Start() { } }, DeleteFunc: func(obj interface{}) { - // IndexerInformer uses a delta queue, therefore for deletes we have to use this - // key function. + // IndexerInformer uses a delta queue, therefore for deletes + // we have to use this key function. key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { logrus.Debugf("received delete for Pod %s\n", key) @@ -79,7 +82,7 @@ func Start() { }, }, cache.Indexers{}) - con := Controller{ + controller := Controller{ name: "pod-crash", informer: informer, indexer: indexer, @@ -91,7 +94,7 @@ func Start() { stopCh := make(chan struct{}) defer close(stopCh) - con.run(constant.NumWorkers, stopCh) + controller.run(constant.NumWorkers, stopCh) } // run starts the controller @@ -104,7 +107,8 @@ func (c *Controller) run(workers int, stopCh chan struct{}) { go c.informer.Run(stopCh) if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { - utilruntime.HandleError(errors.New("timed out waiting for caches to sync")) + utilruntime.HandleError( + errors.New("timed out waiting for caches to sync")) return } @@ -112,8 +116,13 @@ func (c *Controller) run(workers int, stopCh chan struct{}) { // send notification to providers for _, prv := range c.providers { - if err := prv.SendMessage(constant.WelcomeMsg); err != nil { - logrus.Errorf("failed to send msg with %s: %s", prv.Name(), err.Error()) + err := + prv.SendMessage(constant.WelcomeMsg) + if err != nil { + logrus.Errorf( + "failed to send msg with %s: %s", + prv.Name(), + err.Error()) } } @@ -160,12 +169,16 @@ func (c *Controller) processNextItem() bool { func (c *Controller) processItem(key string) error { obj, exists, err := c.indexer.GetByKey(key) if err != nil { - logrus.Errorf("failed to fetch object %s from store: %s", key, err.Error()) + logrus.Errorf( + "failed to fetch object %s from store: %s", + key, + err.Error()) return err } if !exists { - // Below we will warm up our cache with a Pod, so that we will see a delete for one pod + // Below we will warm up our cache with a Pod, so that we will see + // a delete for one pod logrus.Infof("pod %s does not exist anymore\n", key) // Clean up intervals if possible @@ -182,7 +195,16 @@ func (c *Controller) processItem(key string) error { for _, container := range pod.Status.ContainerStatuses { // filter running containers - if container.Ready || container.State.Waiting == nil { + if container.Ready || + (container.State.Waiting == nil && + container.State.Terminated == nil) { + continue + } + + if (container.State.Waiting != nil && + container.State.Waiting.Reason == "ContainerCreating") || + (container.State.Terminated != nil && + container.State.Terminated.Reason == "Completed") { continue } @@ -201,13 +223,22 @@ func (c *Controller) processItem(key string) error { container.RestartCount > 0) // get events for this pod - eventsString := util.GetPodEventsStr(c.kclient, pod.Name, pod.Namespace) + eventsString := + util.GetPodEventsStr(c.kclient, pod.Name, pod.Namespace) + + // get reason according to state + reason := "Unknown" + if container.State.Waiting != nil { + reason = container.State.Waiting.Reason + } else if container.State.Terminated != nil { + reason = container.State.Terminated.Reason + } evnt := event.Event{ Name: pod.Name, Container: container.Name, Namespace: pod.Namespace, - Reason: container.State.Waiting.Reason, + Reason: reason, Logs: logs, Events: eventsString, } diff --git a/main.go b/main.go index 5159ab6c..61f82198 100644 --- a/main.go +++ b/main.go @@ -4,12 +4,15 @@ import ( "os" "path/filepath" + "github.com/abahmed/kwatch/constant" "github.com/abahmed/kwatch/controller" "github.com/sirupsen/logrus" "github.com/spf13/viper" ) func main() { + logrus.Infof(constant.WelcomeMsg) + // initialize configuration configFile := os.Getenv("CONFIG_FILE") if len(configFile) == 0 { diff --git a/provider/slack.go b/provider/slack.go index b723c7f2..288fa7bb 100644 --- a/provider/slack.go +++ b/provider/slack.go @@ -12,15 +12,27 @@ import ( const ( footer = "" - defaultTitle = ":red_circle: kwatch detected crash in a pod" + defaultTitle = ":red_circle: kwatch detected a crash in pod" defaultText = "There is an issue with container in a pod!" ) -type slack struct{} +type slack struct { + webhook string +} // NewSlack returns new Slack object func NewSlack() Provider { - return &slack{} + url := viper.GetString("providers.slack.webhook") + + if len(url) == 0 { + logrus.Warnf("initializing slack with empty webhook url") + } else { + logrus.Infof("initializing slack with webhook url: %s", url) + } + + return &slack{ + webhook: url, + } } // Name returns name of the provider @@ -33,8 +45,7 @@ func (s *slack) SendEvent(ev *event.Event) error { logrus.Debugf("sending to slack event: %v", ev) // check config - url := viper.GetString("providers.slack.webhook") - if len(url) == 0 { + if len(s.webhook) == 0 { return errors.New("webhook url is empty") } @@ -106,19 +117,18 @@ func (s *slack) SendEvent(ev *event.Event) error { } // send message - return slackClient.PostWebhook(url, &msg) + return slackClient.PostWebhook(s.webhook, &msg) } // SendMessage sends text message to the provider func (s *slack) SendMessage(msg string) error { // check config - url := viper.GetString("providers.slack.webhook") - if len(url) == 0 { + if len(s.webhook) == 0 { return errors.New("webhook url is empty") } sMsg := slackClient.WebhookMessage{ Text: msg, } - return slackClient.PostWebhook(url, &sMsg) + return slackClient.PostWebhook(s.webhook, &sMsg) } diff --git a/util/util.go b/util/util.go index 7ae1b648..5ec918ad 100644 --- a/util/util.go +++ b/util/util.go @@ -67,16 +67,17 @@ func GetPodContainerLogs( // try to decode response var status metav1.Status - if parseErr := json.Unmarshal(logs, &status); parseErr == nil { + parseErr := json.Unmarshal(logs, &status) + if parseErr == nil { return status.Message - } else { - logrus.Warnf( - "failed to parse logs for container %s in pod %s@%s: %s", - name, - container, - namespace, - parseErr.Error()) } + + logrus.Warnf( + "failed to parse logs for container %s in pod %s@%s: %s", + name, + container, + namespace, + parseErr.Error()) } return string(logs)