Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support new event version #171

Merged
merged 2 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&configFile, "config.file", "", "Event exporter configuration file path")
flag.StringVar(&util.EventVersion, "event.version", util.EventVersionOld, "event version, eventsv1 or corev1")
}

func main() {
Expand All @@ -42,6 +43,8 @@ func main() {
klog.Fatal("Error building kubernetes clientset: ", e)
}

go util.SetClusterName(kclient)

ctx, cancel := context.WithCancel(context.Background())
wg, ctx := errgroup.WithContext(ctx)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
k8s.io/apimachinery v0.27.4
k8s.io/client-go v12.0.0+incompatible
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.90.1
sigs.k8s.io/controller-runtime v0.14.6
sigs.k8s.io/yaml v1.4.0
)
Expand Down Expand Up @@ -114,7 +115,6 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.26.1 // indirect
k8s.io/component-base v0.26.1 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
33 changes: 24 additions & 9 deletions pkg/exporter/kube_events_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"fmt"
"sync"

"github.com/kubesphere/kube-events/pkg/util"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imports not sort

eventsv1 "k8s.io/api/events/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kubesphere/kube-events/pkg/config"
"github.com/kubesphere/kube-events/pkg/exporter/sinks"
"github.com/kubesphere/kube-events/pkg/exporter/types"
Expand Down Expand Up @@ -104,7 +109,7 @@ func (s *K8sEventSource) waitForCacheSync(stopc <-chan struct{}) error {
return nil
}

func (s *K8sEventSource) drainEvents() (evts []*corev1.Event, shutdown bool) {
func (s *K8sEventSource) drainEvents() (evts []client.Object, shutdown bool) {
var (
i = 0
m = s.workqueue.Len()
Expand All @@ -116,7 +121,7 @@ func (s *K8sEventSource) drainEvents() (evts []*corev1.Event, shutdown bool) {
var obj interface{}
obj, shutdown = s.workqueue.Get()
if obj != nil {
evts = append(evts, obj.(*corev1.Event))
evts = append(evts, obj.(client.Object))
}
i++
if i >= m {
Expand Down Expand Up @@ -150,7 +155,7 @@ func (s *K8sEventSource) sinkEvents(ctx context.Context) {
} else if numRequeues := s.workqueue.NumRequeues(evt); numRequeues >= maxRetries {
s.workqueue.Forget(evt)
klog.Infof("Dropping event %s/%s out of the queue because of failing %d times: %v\n",
evt.Namespace, evt.Name, numRequeues, err)
evt.GetNamespace(), evt.GetName(), numRequeues, err)
} else {
s.workqueue.AddRateLimited(evt)
}
Expand All @@ -162,7 +167,7 @@ func (s *K8sEventSource) sinkEvents(ctx context.Context) {
for _, e := range evts {
events.KubeEvents = append(events.KubeEvents, &types.ExtendedEvent{
Event: e,
Cluster: s.cluster,
Cluster: util.GetCluster(),
})
}

Expand All @@ -185,7 +190,8 @@ func (s *K8sEventSource) enqueueEvent(obj interface{}) {
if obj == nil {
return
}
evt, ok := obj.(*corev1.Event)

evt, ok := obj.(client.Object)
if ok {
evt.SetManagedFields(nil) // set it nil because it is quite verbose
s.workqueue.Add(evt)
Expand All @@ -197,17 +203,26 @@ func NewKubeEventSource(client *kubernetes.Clientset) *K8sEventSource {
client: client,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "events"),
}
lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(),
"events", metav1.NamespaceAll, fields.Everything())
s.inf = cache.NewSharedIndexInformer(lw, &corev1.Event{}, 0, cache.Indexers{})
var eventType runtime.Object
var lw *cache.ListWatch
if util.EventVersion == util.EventVersionNew {
eventType = &eventsv1.Event{}
lw = cache.NewListWatchFromClient(client.EventsV1().RESTClient(),
"events", metav1.NamespaceAll, fields.Everything())
} else {
eventType = &corev1.Event{}
lw = cache.NewListWatchFromClient(client.CoreV1().RESTClient(),
"events", metav1.NamespaceAll, fields.Everything())
}
s.inf = cache.NewSharedIndexInformer(lw, eventType, 0, cache.Indexers{})
s.inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: s.enqueueEvent,
UpdateFunc: func(old, new interface{}) {
s.enqueueEvent(new)
},
})

s.cluster = s.getClusterName()
s.cluster = util.GetCluster()

return s
}
7 changes: 3 additions & 4 deletions pkg/exporter/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ package types

import (
"context"

v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type Events struct {
KubeEvents []*ExtendedEvent `json:"kubeEvents"`
}

type ExtendedEvent struct {
*v1.Event `json:",inline"`
Cluster string `json:"cluster,omitempty"`
Event client.Object `json:",inline"`
Cluster string `json:"cluster,omitempty"`
}

type Sinker interface {
Expand Down
53 changes: 53 additions & 0 deletions pkg/util/annoutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package util

import (
"context"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)

const (
EventVersionOld = "corev1"
EventVersionNew = "eventsv1"
)

var cluster string
var EventVersion string

func SetClusterName(client *kubernetes.Clientset) {
setCluster(client)
t := time.NewTicker(60 * time.Second)
defer t.Stop()
for {
select {
Copy link
Member

@benjaminhuo benjaminhuo Aug 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

成功一次还需要继续 loop 吗
民生银行那个是怎么出错的可以重现吗,他的情况我理解如果第一次没有成功从 annotation获取到 cluster name,retry 到成功就可以了。

会有 cluster 改名的情况吗,很少吧

@wanjunlei @Gentleelephant

case <-t.C:
if cluster != "" {
return
}
setCluster(client)
klog.Infof("current cluster is [%s]", GetCluster())
}
}

}

func setCluster(client *kubernetes.Clientset) {

ns, err := client.CoreV1().Namespaces().Get(context.Background(), "kubesphere-system", metav1.GetOptions{})
if err != nil {
klog.Errorf("get namespace kubesphere-system error: %s", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function needs to exit here, otherwise, ns will be nil and the following code will report an error.

return
}

if ns.Annotations != nil {
cluster = ns.Annotations["cluster.kubesphere.io/name"]
}

}

func GetCluster() string {
return cluster
}