Skip to content

Commit

Permalink
Fix exclude reasons (#4)
Browse files Browse the repository at this point in the history
* feature: detect new pod crashes

* fix: update config.yaml

* fixes alert for ContainerCreating and Completed

* fix welcome message
  • Loading branch information
abahmed authored Nov 30, 2021
1 parent 398aefb commit c195691
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 37 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ kubectl apply -f https://raw.githubusercontent.com/abahmed/kwatch/main/deploy/de
### Cleanup

```shell
kubectl delete -f config.yaml
kubectl delete -f https://raw.githubusercontent.com/abahmed/kwatch/main/deploy/config.yaml
kubectl delete -f https://raw.githubusercontent.com/abahmed/kwatch/main/deploy/deploy.yaml
```

Expand Down
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,7 @@ func Create() kubernetes.Interface {
logrus.Fatalf("cannot create kubernetes client: %v", err)
}

logrus.Debugf("created kubernetes client successfully")

return clientset
}
69 changes: 50 additions & 19 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,20 @@ func Start() {
kclient := client.Create()

// create rate limiting queue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
queue :=
workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

indexer, informer := cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
//options.FieldSelector = "status.phase!=Running"
return kclient.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), options)
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
return kclient.CoreV1().
Pods(v1.NamespaceAll).
List(context.TODO(), opts)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
//options.FieldSelector = "status.phase!=Running"
return kclient.CoreV1().Pods(v1.NamespaceAll).Watch(context.TODO(), options)
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
return kclient.CoreV1().
Pods(v1.NamespaceAll).
Watch(context.TODO(), opts)
},
},
&v1.Pod{},
Expand All @@ -69,8 +72,8 @@ func Start() {
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
// IndexerInformer uses a delta queue, therefore for deletes
// we have to use this key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
logrus.Debugf("received delete for Pod %s\n", key)
Expand All @@ -79,7 +82,7 @@ func Start() {
},
}, cache.Indexers{})

con := Controller{
controller := Controller{
name: "pod-crash",
informer: informer,
indexer: indexer,
Expand All @@ -91,7 +94,7 @@ func Start() {
stopCh := make(chan struct{})
defer close(stopCh)

con.run(constant.NumWorkers, stopCh)
controller.run(constant.NumWorkers, stopCh)
}

// run starts the controller
Expand All @@ -104,16 +107,22 @@ func (c *Controller) run(workers int, stopCh chan struct{}) {
go c.informer.Run(stopCh)

if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
utilruntime.HandleError(errors.New("timed out waiting for caches to sync"))
utilruntime.HandleError(
errors.New("timed out waiting for caches to sync"))
return
}

logrus.Infof("%s controller synced and ready", c.name)

// send notification to providers
for _, prv := range c.providers {
if err := prv.SendMessage(constant.WelcomeMsg); err != nil {
logrus.Errorf("failed to send msg with %s: %s", prv.Name(), err.Error())
err :=
prv.SendMessage(constant.WelcomeMsg)
if err != nil {
logrus.Errorf(
"failed to send msg with %s: %s",
prv.Name(),
err.Error())
}
}

Expand Down Expand Up @@ -160,12 +169,16 @@ func (c *Controller) processNextItem() bool {
func (c *Controller) processItem(key string) error {
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
logrus.Errorf("failed to fetch object %s from store: %s", key, err.Error())
logrus.Errorf(
"failed to fetch object %s from store: %s",
key,
err.Error())
return err
}

if !exists {
// Below we will warm up our cache with a Pod, so that we will see a delete for one pod
// Below we will warm up our cache with a Pod, so that we will see
// a delete for one pod
logrus.Infof("pod %s does not exist anymore\n", key)

// Clean up intervals if possible
Expand All @@ -182,7 +195,16 @@ func (c *Controller) processItem(key string) error {

for _, container := range pod.Status.ContainerStatuses {
// filter running containers
if container.Ready || container.State.Waiting == nil {
if container.Ready ||
(container.State.Waiting == nil &&
container.State.Terminated == nil) {
continue
}

if (container.State.Waiting != nil &&
container.State.Waiting.Reason == "ContainerCreating") ||
(container.State.Terminated != nil &&
container.State.Terminated.Reason == "Completed") {
continue
}

Expand All @@ -201,13 +223,22 @@ func (c *Controller) processItem(key string) error {
container.RestartCount > 0)

// get events for this pod
eventsString := util.GetPodEventsStr(c.kclient, pod.Name, pod.Namespace)
eventsString :=
util.GetPodEventsStr(c.kclient, pod.Name, pod.Namespace)

// get reason according to state
reason := "Unknown"
if container.State.Waiting != nil {
reason = container.State.Waiting.Reason
} else if container.State.Terminated != nil {
reason = container.State.Terminated.Reason
}

evnt := event.Event{
Name: pod.Name,
Container: container.Name,
Namespace: pod.Namespace,
Reason: container.State.Waiting.Reason,
Reason: reason,
Logs: logs,
Events: eventsString,
}
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"os"
"path/filepath"

"github.com/abahmed/kwatch/constant"
"github.com/abahmed/kwatch/controller"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)

func main() {
logrus.Infof(constant.WelcomeMsg)

// initialize configuration
configFile := os.Getenv("CONFIG_FILE")
if len(configFile) == 0 {
Expand Down
28 changes: 19 additions & 9 deletions provider/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,27 @@ import (

const (
footer = "<https://github.com/abahmed/kwatch|kwatch>"
defaultTitle = ":red_circle: kwatch detected crash in a pod"
defaultTitle = ":red_circle: kwatch detected a crash in pod"
defaultText = "There is an issue with container in a pod!"
)

type slack struct{}
type slack struct {
webhook string
}

// NewSlack returns new Slack object
func NewSlack() Provider {
return &slack{}
url := viper.GetString("providers.slack.webhook")

if len(url) == 0 {
logrus.Warnf("initializing slack with empty webhook url")
} else {
logrus.Infof("initializing slack with webhook url: %s", url)
}

return &slack{
webhook: url,
}
}

// Name returns name of the provider
Expand All @@ -33,8 +45,7 @@ func (s *slack) SendEvent(ev *event.Event) error {
logrus.Debugf("sending to slack event: %v", ev)

// check config
url := viper.GetString("providers.slack.webhook")
if len(url) == 0 {
if len(s.webhook) == 0 {
return errors.New("webhook url is empty")
}

Expand Down Expand Up @@ -106,19 +117,18 @@ func (s *slack) SendEvent(ev *event.Event) error {
}

// send message
return slackClient.PostWebhook(url, &msg)
return slackClient.PostWebhook(s.webhook, &msg)
}

// SendMessage sends text message to the provider
func (s *slack) SendMessage(msg string) error {
// check config
url := viper.GetString("providers.slack.webhook")
if len(url) == 0 {
if len(s.webhook) == 0 {
return errors.New("webhook url is empty")
}

sMsg := slackClient.WebhookMessage{
Text: msg,
}
return slackClient.PostWebhook(url, &sMsg)
return slackClient.PostWebhook(s.webhook, &sMsg)
}
17 changes: 9 additions & 8 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,17 @@ func GetPodContainerLogs(

// try to decode response
var status metav1.Status
if parseErr := json.Unmarshal(logs, &status); parseErr == nil {
parseErr := json.Unmarshal(logs, &status)
if parseErr == nil {
return status.Message
} else {
logrus.Warnf(
"failed to parse logs for container %s in pod %s@%s: %s",
name,
container,
namespace,
parseErr.Error())
}

logrus.Warnf(
"failed to parse logs for container %s in pod %s@%s: %s",
name,
container,
namespace,
parseErr.Error())
}

return string(logs)
Expand Down

0 comments on commit c195691

Please sign in to comment.