Skip to content

Commit

Permalink
add cluster name to the event when export to stdout
Browse files Browse the repository at this point in the history
Signed-off-by: wanjunlei <[email protected]>
  • Loading branch information
wanjunlei committed Nov 28, 2023
1 parent 216699b commit e04ce1e
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 21 deletions.
19 changes: 16 additions & 3 deletions pkg/exporter/kube_events_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type K8sEventSource struct {
inf cache.SharedIndexInformer
sinkers []types.Sinker
mutex sync.Mutex

cluster string
}

func (s *K8sEventSource) ReloadConfig(c *config.ExporterConfig) {
Expand All @@ -49,9 +51,9 @@ func (s *K8sEventSource) ReloadConfig(c *config.ExporterConfig) {

for _, w := range c.Sinks.Webhooks {
if w.Url != "" {
sinkers = append(sinkers, &sinks.WebhookSinker{Url: w.Url, Cluster: s.getClusterName()})
sinkers = append(sinkers, &sinks.WebhookSinker{Url: w.Url})
} else if w.Service != nil {
sinkers = append(sinkers, &sinks.WebhookSinker{Cluster: s.getClusterName(), Url: fmt.Sprintf("http://%s.%s.svc:%d/%s",
sinkers = append(sinkers, &sinks.WebhookSinker{Url: fmt.Sprintf("http://%s.%s.svc:%d/%s",
w.Service.Name, w.Service.Namespace, *w.Service.Port, w.Service.Path)})
}
}
Expand Down Expand Up @@ -155,12 +157,21 @@ func (s *K8sEventSource) sinkEvents(ctx context.Context) {
s.workqueue.Done(evt)
}
}()

events := types.Events{}
for _, e := range evts {
events.KubeEvents = append(events.KubeEvents, &types.ExtendedEvent{
Event: e,
Cluster: s.cluster,
})
}

evtSinkers := s.getSinkers()
if len(evtSinkers) == 0 {
return
}
for _, sinker := range evtSinkers {
if err = sinker.Sink(ctx, evts); err != nil {
if err = sinker.Sink(ctx, events); err != nil {
err = fmt.Errorf("error sinking events: %v", err)
klog.Error(err)
return
Expand Down Expand Up @@ -196,5 +207,7 @@ func NewKubeEventSource(client *kubernetes.Clientset) *K8sEventSource {
},
})

s.cluster = s.getClusterName()

return s
}
6 changes: 3 additions & 3 deletions pkg/exporter/sinks/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
"encoding/json"
"fmt"

v1 "k8s.io/api/core/v1"
"github.com/kubesphere/kube-events/pkg/exporter/types"
)

type StdoutSinker struct {
}

func (s *StdoutSinker) Sink(ctx context.Context, evts []*v1.Event) error {
for _, evt := range evts {
func (s *StdoutSinker) Sink(ctx context.Context, evts types.Events) error {
for _, evt := range evts.KubeEvents {
bs, err := json.Marshal(evt)
if err != nil {
return err
Expand Down
18 changes: 5 additions & 13 deletions pkg/exporter/sinks/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,20 @@ import (
"fmt"
"net/http"

"github.com/kubesphere/kube-events/pkg/exporter/types"

"github.com/kubesphere/kube-events/pkg/util"
v1 "k8s.io/api/core/v1"
)

type WebhookSinker struct {
Url string
Cluster string
}

type extendedEvent struct {
*v1.Event `json:",inline"`
Cluster string `json:"cluster,omitempty"`
}

func (s *WebhookSinker) Sink(ctx context.Context, evts []*v1.Event) error {
func (s *WebhookSinker) Sink(ctx context.Context, evts types.Events) error {
var buf bytes.Buffer
for _, evt := range evts {
extendedEvt := extendedEvent{
Event: evt,
Cluster: s.Cluster,
}
if bs, err := json.Marshal(extendedEvt); err != nil {
for _, evt := range evts.KubeEvents {
if bs, err := json.Marshal(evt); err != nil {
return err
} else if _, err := buf.Write(bs); err != nil {
return err
Expand Down
9 changes: 7 additions & 2 deletions pkg/exporter/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ import (
)

type Events struct {
KubeEvents []*v1.Event `json:"kubeEvents"`
KubeEvents []*ExtendedEvent `json:"kubeEvents"`
}

type ExtendedEvent struct {
*v1.Event `json:",inline"`
Cluster string `json:"cluster,omitempty"`
}

type Sinker interface {
Sink(ctx context.Context, events []*v1.Event) error
Sink(ctx context.Context, events Events) error
}

0 comments on commit e04ce1e

Please sign in to comment.