Skip to content

Commit

Permalink
Merge pull request #11 from 123shang60/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
123shang60 authored May 12, 2022
2 parents 617ca98 + 4b5012e commit 45bf91c
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 6 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
Expand All @@ -53,7 +53,7 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v1
uses: github/codeql-action/autobuild@v2

# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
Expand All @@ -67,4 +67,4 @@ jobs:
# make release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
uses: github/codeql-action/analyze@v2
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

# Image URL to use all building/pushing image targets
IMG ?= controller:latest
PLATFORM ?= linux/amd64,linux/arm,linux/arm64,linux/ppc64le,linux/s390x
# ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary.
ENVTEST_K8S_VERSION = 1.23

Expand Down Expand Up @@ -79,7 +80,7 @@ docker-push: ## Push docker image with the manager.

.PHONY: container
container: ## Build and Push image with all platform with the manager
docker buildx build -f Dockerfile -t ${IMG} --platform=linux/amd64,linux/arm,linux/arm64,linux/ppc64le,linux/s390x --push .
docker buildx build -f Dockerfile -t ${IMG} --platform=${PLATFORM} --push .

##@ Deployment

Expand Down
12 changes: 10 additions & 2 deletions controllers/flinksession_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
flinkv1 "github.com/123shang60/flink-session-operator/api/v1"
"github.com/cnf/structhash"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -166,6 +167,7 @@ func (r *FlinkSessionReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&flinkv1.FlinkSession{}).
Watches(&source.Kind{Type: &corev1.Service{}}, &FlinkSessionServiceHandler{}).
Watches(&source.Kind{Type: &batchv1.Job{}}, &FlinkSessionJobHandler{}).
Complete(r)
}

Expand Down Expand Up @@ -197,9 +199,13 @@ func (r *FlinkSessionReconciler) updateExternalResources(session *flinkv1.FlinkS
klog.Info("自动清理 ha 及状态后端功能关闭,跳过相关流程!")
r.Recorder.Eventf(session, corev1.EventTypeWarning, "FlinkSession Update", "External Resources Clean skip!")
}
// 更新时只清理不成功的 job ,防止误删
// 清理不成功,只告警但继续执行
if err := r.cleanBootJob(session, 0); err != nil {
r.Recorder.Eventf(session, corev1.EventTypeWarning, "FlinkSession Update", "Clean Job Error: %s", err.Error())
}

err := r.commitBootJob(session)
if err != nil {
if err := r.commitBootJob(session); err != nil {
r.Recorder.Eventf(session, corev1.EventTypeWarning, "FlinkSession Update", "Commit Job Error: %s", err.Error())
return err
}
Expand Down Expand Up @@ -244,5 +250,7 @@ func (r *FlinkSessionReconciler) deleteExternalResources(session *flinkv1.FlinkS
func (r *FlinkSessionReconciler) updateSelfStatus(session *flinkv1.FlinkSession) error {
// 忽略错误
r.updateNodePort(session)
// 更新状态只清理成功的 job
r.cleanBootJob(session, 1)
return nil
}
42 changes: 42 additions & 0 deletions controllers/flinksession_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"time"
)
Expand Down Expand Up @@ -157,3 +158,44 @@ func (r *FlinkSessionReconciler) commitBootJob(session *flinkv1.FlinkSession) er

return nil
}

func (r *FlinkSessionReconciler) cleanBootJob(session *flinkv1.FlinkSession, success int32) error {
jobList := batchv1.JobList{}
if err := r.List(context.Background(), &jobList, client.MatchingLabels{
"flink": "flink-session-operator",
}, client.InNamespace(session.GetNamespace())); err != nil {
klog.Error("获取列表失败!", err)
return err
} else {
for _, job := range jobList.Items {
if job.Status.Succeeded == success {
for _, reference := range job.GetObjectMeta().GetOwnerReferences() {
if reference.APIVersion == `flink.shang12360.cn/v1` &&
reference.Kind == `FlinkSession` &&
reference.Name == session.Name {
jobName := job.Name
if err := r.Delete(context.Background(), &job); err != nil {
klog.Error("删除job失败!", err)
} else {
klog.Info("清理 job :", jobName)
}

if err := r.Delete(context.Background(), &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: session.Namespace,
},
}); err != nil {
klog.Error("删除job configmap失败!", err)
} else {
klog.Info("清理 job configmap:", jobName)
}

break
}
}
}
}
}
return nil
}
87 changes: 87 additions & 0 deletions controllers/flinksession_jobs_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package controllers

import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type FlinkSessionJobHandler struct {
}

// Create implements EventHandler.
func (e *FlinkSessionJobHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
klog.Error("CreateEvent received with no metadata", "event", evt)
return
}
if job, ok := evt.Object.(*batchv1.Job); ok {
for _, reference := range job.GetObjectMeta().GetOwnerReferences() {
if reference.APIVersion == `flink.shang12360.cn/v1` &&
reference.Kind == `FlinkSession` {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: reference.Name,
Namespace: job.GetNamespace(),
}})
}
}
}
}

// Update implements EventHandler.
func (e *FlinkSessionJobHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
switch {
case evt.ObjectNew != nil:
if job, ok := evt.ObjectNew.(*batchv1.Job); ok {
for _, reference := range job.GetObjectMeta().GetOwnerReferences() {
if reference.APIVersion == `flink.shang12360.cn/v1` &&
reference.Kind == `FlinkSession` {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: reference.Name,
Namespace: job.GetNamespace(),
}})
}
}
}
case evt.ObjectOld != nil:
if svc, ok := evt.ObjectOld.(*corev1.Service); ok {
klog.Info("old", svc)
}
default:
klog.Error("UpdateEvent received with no metadata", "event", evt)
}
}

// Delete implements EventHandler.
func (e *FlinkSessionJobHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
klog.Error("DeleteEvent received with no metadata", "event", evt)
return
}
//if svc, ok := evt.Object.(*corev1.Service); ok {
// klog.Info("del", svc)
//}
}

// Generic implements EventHandler.
func (e *FlinkSessionJobHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
klog.Error("GenericEvent received with no metadata", "event", evt)
return
}
if job, ok := evt.Object.(*batchv1.Job); ok {
for _, reference := range job.GetObjectMeta().GetOwnerReferences() {
if reference.APIVersion == `flink.shang12360.cn/v1` &&
reference.Kind == `FlinkSession` {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: reference.Name,
Namespace: job.GetNamespace(),
}})
}
}
}
}

0 comments on commit 45bf91c

Please sign in to comment.