Skip to content

Commit

Permalink
🚀 support storage to avoid duplicate notifications (#20)
Browse files Browse the repository at this point in the history
* support storage to avoid duplicate notifications

* update configs
  • Loading branch information
abahmed authored Dec 22, 2021
1 parent 2c630a3 commit bb2dacb
Show file tree
Hide file tree
Showing 16 changed files with 141 additions and 28 deletions.
32 changes: 23 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,13 @@
</a>
</p>

**kwatch** helps you monitor all changes in your Kubernetes(K8s) cluster, detects crashes in your running apps in realtime, and publishes notifications to your channels (Slack, Discord, etc.)
**kwatch** helps you monitor all changes in your Kubernetes(K8s) cluster, detects crashes in your running apps in realtime, and publishes notifications to your channels (Slack, Discord, etc.) instantly

## Contribute & Support
+ Add a [GitHub Star](https://github.com/abahmed/kwatch/stargazers)
+ [Suggest new features, ideas and optimizations](https://github.com/abahmed/kwatch/issues)
+ [Report issues](https://github.com/abahmed/kwatch/issues)

## Screenshots

<p align="center">
<img src="https://raw.githubusercontent.com/abahmed/kwatch/main/assets/demo.png" width="60%"/>
</p>

## Getting Started

Expand Down Expand Up @@ -61,7 +56,11 @@ kubectl apply -f https://raw.githubusercontent.com/abahmed/kwatch/v0.0.7/deploy/

#### Slack

If you want to enable Slack, provide the webhook with optional text and title.
<p>
<img src="./assets/slack.png" width="30%"/>
</p>

If you want to enable Slack, provide the webhook with optional text and title


| Parameter | Description |
Expand All @@ -72,8 +71,11 @@ If you want to enable Slack, provide the webhook with optional text and title.

#### Discord

If you want to enable Discord, provide the webhook with optional text and title.
<p>
<img src="./assets/discord.png" width="30%"/>
</p>

If you want to enable Discord, provide the webhook with optional text and title

| Parameter | Description |
|:---------------------------------|:------------------------------------------- |
Expand All @@ -83,7 +85,11 @@ If you want to enable Discord, provide the webhook with optional text and title.

#### PagerDuty

If you want to enable PagerDuty, provide the integration key.
<p>
<img src="./assets/pagerduty.png" width="50%"/>
</p>

If you want to enable PagerDuty, provide the integration key

| Parameter | Description |
|:---------------------------------|:------------------------------------------- |
Expand All @@ -96,6 +102,14 @@ kubectl delete -f https://raw.githubusercontent.com/abahmed/kwatch/v0.0.7/deploy
kubectl delete -f https://raw.githubusercontent.com/abahmed/kwatch/v0.0.7/deploy/deploy.yaml
```

## Who uses kwatch?

**kwatch** is being used by multiple entities including, but not limited to

[<img src="./assets/users/trella.png"/>](https://www.trella.app)

If you want to add your entity, [open issue](https://github.com/abahmed/kwatch/issues) to add it

## Contributors

<a href="https://github.com/abahmed/kwatch/graphs/contributors">
Expand Down
Binary file modified assets/.DS_Store
Binary file not shown.
Binary file added assets/discord.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added assets/pagerduty.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
File renamed without changes
Binary file added assets/users/trella.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 1 addition & 2 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@ alert:
discord:
webhook: ""
namespaces:
- default

- default
47 changes: 35 additions & 12 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package controller
import (
"errors"
"fmt"
"github.com/spf13/viper"
"time"

"github.com/spf13/viper"

"github.com/abahmed/kwatch/constant"
"github.com/abahmed/kwatch/event"
"github.com/abahmed/kwatch/provider"
"github.com/abahmed/kwatch/storage"
"github.com/abahmed/kwatch/util"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
Expand All @@ -27,6 +29,7 @@ type Controller struct {
kclient kubernetes.Interface
queue workqueue.RateLimitingInterface
providers []provider.Provider
store storage.Storage
}

// run starts the controller
Expand Down Expand Up @@ -104,6 +107,8 @@ func (c *Controller) processItem(key string) error {
// a delete for one pod
logrus.Infof("pod %s does not exist anymore\n", key)

c.store.DelPod(key)

// Clean up intervals if possible
return nil
}
Expand All @@ -115,17 +120,20 @@ func (c *Controller) processItem(key string) error {
// to avoid re-queuing it
return nil
}
// filer by namespaces in config

// filter by namespaces in config if specified
namespaces := viper.GetStringSlice("namespaces")
if len(namespaces) != 0 && !util.IsStrInSlice(pod.Namespace, namespaces) {
logrus.Info(pod.Namespace, "skip namespace %s as not selected in configuration")
logrus.Infof("skip namespace %s as not selected in configuration", pod.Namespace)
return nil
}

for _, container := range pod.Status.ContainerStatuses {
// filter running containers
if container.Ready ||
(container.State.Waiting == nil &&
container.State.Terminated == nil) {
c.store.DelPodContainer(key, container.Name)
continue
}

Expand All @@ -136,32 +144,44 @@ func (c *Controller) processItem(key string) error {
continue
}

// if reported, continue
if c.store.HasPodContainer(key, container.Name) {
continue
}

logrus.Debugf(
"processing container %s in pod %s@%s",
container.Name,
pod.Name,
pod.Namespace)

// get reason according to state
reason := "Unknown"
if container.State.Waiting != nil {
reason = container.State.Waiting.Reason
} else if container.State.Terminated != nil {
reason = container.State.Terminated.Reason
}

// get logs for this container
previous := true
if reason == "Error" {
previous = false
} else if container.RestartCount > 0 {
previous = true
}

logs := util.GetPodContainerLogs(
c.kclient,
pod.Name,
container.Name,
pod.Namespace,
container.RestartCount > 0)
previous)

// get events for this pod
eventsString :=
util.GetPodEventsStr(c.kclient, pod.Name, pod.Namespace)

// get reason according to state
reason := "Unknown"
if container.State.Waiting != nil {
reason = container.State.Waiting.Reason
} else if container.State.Terminated != nil {
reason = container.State.Terminated.Reason
}

evnt := event.Event{
Name: pod.Name,
Container: container.Name,
Expand All @@ -171,6 +191,9 @@ func (c *Controller) processItem(key string) error {
Events: eventsString,
}

// save container as it's reported to avoid duplication
c.store.AddPodContainer(key, container.Name)

// send event to providers
util.SendProvidersEvent(c.providers, evnt)
}
Expand Down
2 changes: 2 additions & 0 deletions controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/abahmed/kwatch/client"
"github.com/abahmed/kwatch/constant"
"github.com/abahmed/kwatch/storage"
"github.com/abahmed/kwatch/util"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -72,6 +73,7 @@ func Start() {
queue: queue,
kclient: kclient,
providers: util.GetProviders(),
store: storage.NewMemory(),
}

stopCh := make(chan struct{})
Expand Down
2 changes: 0 additions & 2 deletions deploy/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,3 @@ data:
webhook: <webhook_url>
namespaces:
- <optional_namespace>
:
2 changes: 1 addition & 1 deletion provider/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type discord struct {
webhook string
}

// NewDiscord returns new Discord object
// NewDiscord returns new Discord instance
func NewDiscord(url string) Provider {
if len(url) == 0 {
logrus.Warnf("initializing discord with empty webhook url")
Expand Down
1 change: 1 addition & 0 deletions provider/pagerduty.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type pagerduty struct {
integrationKey string
}

// NewPagerDuty returns new PagerDuty instance
func NewPagerDuty(integrationKey string) Provider {
if len(integrationKey) == 0 {
logrus.Warnf("initializing pagerduty with an empty integration key")
Expand Down
2 changes: 1 addition & 1 deletion provider/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type slack struct {
webhook string
}

// NewSlack returns new Slack object
// NewSlack returns new Slack instance
func NewSlack(url string) Provider {
if len(url) == 0 {
logrus.Warnf("initializing slack with empty webhook url")
Expand Down
65 changes: 65 additions & 0 deletions storage/memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package storage

import (
"sync"

"github.com/sirupsen/logrus"
)

type memory struct {
smap sync.Map
}

// NewMemory returns new Memory object
func NewMemory() Storage {
return &memory{
smap: sync.Map{},
}
}

// AddPodContainer attaches container to pod to mark it has an error
func (m *memory) AddPodContainer(podKey, containerKey string) {
if v, ok := m.smap.Load(podKey); ok {
containers := v.(map[string]bool)
containers[containerKey] = true

m.smap.Store(podKey, containers)
return
}
m.smap.Store(podKey, map[string]bool{containerKey: true})
}

// Delete deletes pod with all its containers
func (m *memory) DelPod(key string) {
logrus.Info("del called: " + key)

m.smap.Delete(key)
}

// DelPodContainer detaches container from pod to mark error is resolved
func (m *memory) DelPodContainer(podKey, containerKey string) {
v, ok := m.smap.Load(podKey)
if !ok {
return
}

containers := v.(map[string]bool)
delete(containers, containerKey)

m.smap.Store(podKey, containers)
}

// HasPodContainer checks if container is attached to given pod or not
func (m *memory) HasPodContainer(podKey, containerKey string) bool {
v, ok := m.smap.Load(podKey)
if !ok {
return false
}

containers := v.(map[string]bool)
if _, ok := containers[containerKey]; ok {
return true
}

return false
}
8 changes: 8 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package storage

type Storage interface {
AddPodContainer(podKey, containerKey string)
DelPodContainer(podKey, containerKey string)
DelPod(podKey string)
HasPodContainer(podKey, containerKey string) bool
}
5 changes: 4 additions & 1 deletion util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/abahmed/kwatch/event"
"strings"

"github.com/abahmed/kwatch/event"

"github.com/abahmed/kwatch/provider"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand Down Expand Up @@ -116,6 +117,7 @@ func GetProviders() []provider.Provider {

// SendProvidersMsg sends string msg to all providers
func SendProvidersMsg(p []provider.Provider, msg string) {
logrus.Infof("sending message: %s", msg)
for _, prv := range p {
err :=
prv.SendMessage(msg)
Expand All @@ -130,6 +132,7 @@ func SendProvidersMsg(p []provider.Provider, msg string) {

// SendProvidersEvent sends event to all providers
func SendProvidersEvent(p []provider.Provider, event event.Event) {
logrus.Infof("sending event: %+v", event)
for _, prv := range p {
if err := prv.SendEvent(&event); err != nil {
logrus.Errorf(
Expand Down

0 comments on commit bb2dacb

Please sign in to comment.