Skip to content

Commit

Permalink
feature: detect new pod crashes (#3)
Browse files Browse the repository at this point in the history
* feature: detect new pod crashes

* fix: update config.yaml
  • Loading branch information
abahmed authored Nov 29, 2021
1 parent b0cf8d4 commit 398aefb
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 81 deletions.
23 changes: 23 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
## Contributing to kwatch

:tada: Anyone can contribute to kwatch. Newcomers are always welcome to contribute to kwatch, and we are happy to offer help to newcomers.
Before making changes, please first discuss the change you want to make through [Discord](https://discord.gg/kzJszdKmJ7)


### There are many ways to contribute:

+ [Suggest new features to be implemented](https://github.com/abahmed/kwatch/issues)
+ [Report issues](https://github.com/abahmed/kwatch/issues)
+ [Improve Documentation](https://github.com/abahmed/kwatch)
+ [Fix issues](https://github.com/abahmed/kwatch/issues)


### Code Contribution

If you wish to work on an issue, please comment on the issue that you want to work on it. This is to prevent duplicated efforts on the same issue.


Contributions to kwatch should be made in the form of pull requests to the **main** branch. Each pull request will be reviewed by someone with permission to land patches. After reviewing the patch, it could be landed in the master branch or given feedback for changes.

### Code of Conduct
We expect everyone to follow the [Code Of Conduct](./CODE_OF_CONDUCT.md)
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ kubectl apply -f https://raw.githubusercontent.com/abahmed/kwatch/main/deploy/de

| Parameter | Description |Required |
|:--------------------------|:----------------------------------------- |:-------------- |
| `maxRecentLogLines` | Max tail log lines in messages | No |
| `providers.slack.webhook` | Slack webhook URL | Yes |
| `providers.slack.title` | Customized title in slack message | No |
| `providers.slack.text` | Customized text in slack message | No |
Expand Down
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"k8s.io/client-go/util/homedir"
)

// Create returns kubernetes client after initializing it with in-cluster, or
// out of cluster config
func Create() kubernetes.Interface {
// try to use in cluster config
config, err := rest.InClusterConfig()
Expand Down
1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
maxRecentLogLines: 0
providers:
slack:
webhook: ""
6 changes: 3 additions & 3 deletions constant/constant.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package constant

// WelcomeMsg
// WelcomeMsg is used to be sent to all providers when kwatch starts
const WelcomeMsg = ":tada: kwatch just started!"

// NumRequeues
// NumRequeues indicates number of retries when worker fails to handle item
const NumRequeues = 5

// NumWorkers
// NumWorkers is the number concurrent workers that consume items for the queue
const NumWorkers = 4
85 changes: 45 additions & 40 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ import (
"k8s.io/client-go/util/workqueue"
)

// Controller holds necessary
type Controller struct {
name string
informer cache.Controller
indexer cache.Indexer
kclient kubernetes.Interface
queue workqueue.RateLimitingInterface
serverStartTime time.Time
providers []provider.Provider
name string
informer cache.Controller
indexer cache.Indexer
kclient kubernetes.Interface
queue workqueue.RateLimitingInterface
providers []provider.Provider
}

// Start creates an instance of controller after initialization and runs it
func Start() {
// create kubernetes client
kclient := client.Create()
Expand All @@ -42,11 +43,11 @@ func Start() {
indexer, informer := cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = "status.phase!=Running"
//options.FieldSelector = "status.phase!=Running"
return kclient.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = "status.phase!=Running"
//options.FieldSelector = "status.phase!=Running"
return kclient.CoreV1().Pods(v1.NamespaceAll).Watch(context.TODO(), options)
},
},
Expand Down Expand Up @@ -90,14 +91,14 @@ func Start() {
stopCh := make(chan struct{})
defer close(stopCh)

con.Run(constant.NumWorkers, stopCh)
con.run(constant.NumWorkers, stopCh)
}

func (c *Controller) Run(workers int, stopCh chan struct{}) {
// run starts the controller
func (c *Controller) run(workers int, stopCh chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

c.serverStartTime = time.Now().Local()
logrus.Infof("starting %s controller", c.name)

go c.informer.Run(stopCh)
Expand Down Expand Up @@ -179,41 +180,45 @@ func (c *Controller) processItem(key string) error {
return nil
}

// ignore messages happened before starting controller
if pod.CreationTimestamp.Sub(c.serverStartTime).Seconds() <= 0 {
return nil
}

for _, container := range pod.Status.ContainerStatuses {
// filter running containers
if container.Ready || container.State.Waiting == nil {
continue
}

switch container.State.Waiting.Reason {
case "CrashLoopBackOff":
case "ImagePullBackOff":
case "ErrImagePull":
// retrieve logs of container
logs := util.GetPodContainerLogs(c.kclient, pod.Name, container.Name, pod.Namespace)

// get only failed events
eventsString := util.GetPodFailedEvents(c.kclient, pod.Name, pod.Namespace)

evnt := event.Event{
Name: pod.Name,
Container: container.Name,
Namespace: pod.Namespace,
Reason: container.State.Waiting.Reason,
Logs: logs,
Events: eventsString,
}
logrus.Debugf(
"processing container %s in pod %s@%s",
container.Name,
pod.Name,
pod.Namespace)

// get logs for this container
logs := util.GetPodContainerLogs(
c.kclient,
pod.Name,
container.Name,
pod.Namespace,
container.RestartCount > 0)

// get events for this pod
eventsString := util.GetPodEventsStr(c.kclient, pod.Name, pod.Namespace)

evnt := event.Event{
Name: pod.Name,
Container: container.Name,
Namespace: pod.Namespace,
Reason: container.State.Waiting.Reason,
Logs: logs,
Events: eventsString,
}

// send notification to providers
for _, prv := range c.providers {
if err := prv.SendEvent(&evnt); err != nil {
logrus.Errorf("failed to send event with %s: %s", prv.Name(), err.Error())
}
// send notification to providers
for _, prv := range c.providers {
if err := prv.SendEvent(&evnt); err != nil {
logrus.Errorf(
"failed to send event with %s: %s",
prv.Name(),
err.Error())
}
}
}
Expand Down
1 change: 1 addition & 0 deletions deploy/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ metadata:
namespace: kwatch
data:
config.yaml: |
maxRecentLogLines: <optional_number_of_lines>
providers:
slack:
webhook: <webhook_url>
1 change: 1 addition & 0 deletions event/event.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package event

// Event used to represent info needed by providers to send messages
type Event struct {
Name string
Container string
Expand Down
43 changes: 23 additions & 20 deletions provider/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,30 @@ import (

"github.com/abahmed/kwatch/event"
"github.com/sirupsen/logrus"
"github.com/slack-go/slack"
slackClient "github.com/slack-go/slack"
"github.com/spf13/viper"
)

const (
Footer = "<https://github.com/abahmed/kwatch|kwatch>"
DefaultTitle = ":red_circle: Pod Crash"
DefaultText = "There is an issue with your pod!"
footer = "<https://github.com/abahmed/kwatch|kwatch>"
defaultTitle = ":red_circle: kwatch detected crash in a pod"
defaultText = "There is an issue with container in a pod!"
)

type Slack struct{}
type slack struct{}

// NewSlack returns new Slack object
func NewSlack() Provider {
return &Slack{}
return &slack{}
}

func (s *Slack) Name() string {
// Name returns name of the provider
func (s *slack) Name() string {
return "Slack"
}

func (s *Slack) SendEvent(ev *event.Event) error {
// SendEvent sends event to the provider
func (s *slack) SendEvent(ev *event.Event) error {
logrus.Debugf("sending to slack event: %v", ev)

// check config
Expand All @@ -37,7 +39,7 @@ func (s *Slack) SendEvent(ev *event.Event) error {
}

// initialize fields with basic info
fields := []slack.AttachmentField{
fields := []slackClient.AttachmentField{
{
Title: "Name",
Value: ev.Name,
Expand All @@ -63,7 +65,7 @@ func (s *Slack) SendEvent(ev *event.Event) error {
// add events part if it exists
events := strings.TrimSpace(ev.Events)
if len(events) > 0 {
fields = append(fields, slack.AttachmentField{
fields = append(fields, slackClient.AttachmentField{
Title: ":mag: Events",
Value: "```\n" + events + "```",
})
Expand All @@ -72,7 +74,7 @@ func (s *Slack) SendEvent(ev *event.Event) error {
// add logs part if it exists
logs := strings.TrimSpace(ev.Logs)
if len(logs) > 0 {
fields = append(fields, slack.AttachmentField{
fields = append(fields, slackClient.AttachmentField{
Title: ":memo: Logs",
Value: "```\n" + logs + "```",
})
Expand All @@ -81,41 +83,42 @@ func (s *Slack) SendEvent(ev *event.Event) error {
// use custom title if it's provided, otherwise use default
title := viper.GetString("providers.slack.title")
if len(title) == 0 {
title = DefaultTitle
title = defaultTitle
}

// use custom text if it's provided, otherwise use default
text := viper.GetString("providers.slack.text")
if len(text) == 0 {
text = DefaultText
text = defaultText
}

msg := slack.WebhookMessage{
Attachments: []slack.Attachment{
msg := slackClient.WebhookMessage{
Attachments: []slackClient.Attachment{
{
Color: "danger",
Title: title,
Text: text,
Fields: fields,
MarkdownIn: []string{"fields"},
Footer: Footer,
Footer: footer,
},
},
}

// send message
return slack.PostWebhook(url, &msg)
return slackClient.PostWebhook(url, &msg)
}

func (s *Slack) SendMessage(msg string) error {
// SendMessage sends text message to the provider
func (s *slack) SendMessage(msg string) error {
// check config
url := viper.GetString("providers.slack.webhook")
if len(url) == 0 {
return errors.New("webhook url is empty")
}

sMsg := slack.WebhookMessage{
sMsg := slackClient.WebhookMessage{
Text: msg,
}
return slack.PostWebhook(url, &sMsg)
return slackClient.PostWebhook(url, &sMsg)
}
Loading

0 comments on commit 398aefb

Please sign in to comment.