diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 86713ae1..85a27219 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -1,4 +1,3 @@ - --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole @@ -6,174 +5,175 @@ metadata: creationTimestamp: null name: manager-role rules: -- apiGroups: - - flinkoperator.k8s.io - resources: - - flinkclusters - verbs: - - get - - list - - watch - - create - - update - - patch - - delete -- apiGroups: - - flinkoperator.k8s.io - resources: - - flinkclusters/status - verbs: - - get - - update - - patch -- apiGroups: - - apps - resources: - - deployments - - statefulsets - verbs: - - get - - list - - watch - - create - - update - - patch - - delete -- apiGroups: - - apps - resources: - - deployments/status - - statefulsets/status - verbs: - - get -- apiGroups: - - "" - resources: - - pods - - secrets - verbs: - - create - - get - - list - - watch - - patch -- apiGroups: - - apps - resources: - - controllerrevisions - verbs: - - get - - list - - watch - - create - - update - - patch - - delete -- apiGroups: - - "" - resources: - - pods/status - verbs: - - get -- apiGroups: - - "" - resources: - - services - verbs: - - get - - list - - watch - - create - - update - - patch - - delete -- apiGroups: - - "" - resources: - - services/status - verbs: - - get -- apiGroups: - - "" - resources: - - events - verbs: - - get - - list - - watch - - create - - update - - patch - - delete -- apiGroups: - - "" - resources: - - events/status - verbs: - - get -- apiGroups: - - "" - resources: - - configmaps - verbs: - - get - - list - - watch - - create - - update - - patch - - delete -- apiGroups: - - batch - resources: - - jobs - verbs: - - get - - list - - watch - - create - - update - - patch - - delete -- apiGroups: - - batch - resources: - - jobs/status - verbs: - - get -- apiGroups: - - extensions - resources: - - ingresses - verbs: - - get - - list - - watch - - create - - update - - patch - - delete -- apiGroups: - - extensions - resources: - - ingresses/status - verbs: - - get -- apiGroups: - - admissionregistration.k8s.io - resources: - - mutatingwebhookconfigurations - - validatingwebhookconfigurations - verbs: - - get - - create - - update - - patch -- apiGroups: - - scheduling.volcano.sh - resources: - - podgroups - verbs: - - get - - create - - update + - apiGroups: + - flinkoperator.k8s.io + resources: + - flinkclusters + verbs: + - get + - list + - watch + - create + - update + - patch + - delete + - apiGroups: + - flinkoperator.k8s.io + resources: + - flinkclusters/status + verbs: + - get + - update + - patch + - apiGroups: + - apps + resources: + - deployments + - statefulsets + verbs: + - get + - list + - watch + - create + - update + - patch + - delete + - apiGroups: + - apps + resources: + - deployments/status + - statefulsets/status + verbs: + - get + - apiGroups: + - "" + resources: + - pods + - secrets + verbs: + - create + - get + - list + - watch + - patch + - apiGroups: + - apps + resources: + - controllerrevisions + verbs: + - get + - list + - watch + - create + - update + - patch + - delete + - apiGroups: + - "" + resources: + - pods/status + verbs: + - get + - apiGroups: + - "" + resources: + - services + - persistentvolumeclaims + verbs: + - get + - list + - watch + - create + - update + - patch + - delete + - apiGroups: + - "" + resources: + - services/status + verbs: + - get + - apiGroups: + - "" + resources: + - events + verbs: + - get + - list + - watch + - create + - update + - patch + - delete + - apiGroups: + - "" + resources: + - events/status + verbs: + - get + - apiGroups: + - "" + resources: + - configmaps + verbs: + - get + - list + - watch + - create + - update + - patch + - delete + - apiGroups: + - batch + resources: + - jobs + verbs: + - get + - list + - watch + - create + - update + - patch + - delete + - apiGroups: + - batch + resources: + - jobs/status + verbs: + - get + - apiGroups: + - extensions + resources: + - ingresses + verbs: + - get + - list + - watch + - create + - update + - patch + - delete + - apiGroups: + - extensions + resources: + - ingresses/status + verbs: + - get + - apiGroups: + - admissionregistration.k8s.io + resources: + - mutatingwebhookconfigurations + - validatingwebhookconfigurations + verbs: + - get + - create + - update + - patch + - apiGroups: + - scheduling.volcano.sh + resources: + - podgroups + verbs: + - get + - create + - update diff --git a/controllers/flinkcluster_controller.go b/controllers/flinkcluster_controller.go index 057bbd0c..fd5c9322 100644 --- a/controllers/flinkcluster_controller.go +++ b/controllers/flinkcluster_controller.go @@ -51,6 +51,7 @@ type FlinkClusterReconciler struct { // +kubebuilder:rbac:groups=apps,resources=deployments/status,verbs=get // +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get +// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch // +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get // +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete diff --git a/controllers/flinkcluster_converter.go b/controllers/flinkcluster_converter.go index 2e68fe33..2e496e13 100644 --- a/controllers/flinkcluster_converter.go +++ b/controllers/flinkcluster_converter.go @@ -214,15 +214,6 @@ func getDesiredJobManagerStatefulSet( ServiceAccountName: getServiceAccountName(serviceAccount), } - var pvcs []corev1.PersistentVolumeClaim - if jobManagerSpec.VolumeClaimTemplates != nil { - pvcs = make([]corev1.PersistentVolumeClaim, len(jobManagerSpec.VolumeClaimTemplates)) - for i, pvc := range jobManagerSpec.VolumeClaimTemplates { - pvc.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ToOwnerReference(flinkCluster)} - pvcs[i] = pvc - } - } - var jobManagerStatefulSet = &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: clusterNamespace, @@ -234,7 +225,7 @@ func getDesiredJobManagerStatefulSet( Replicas: jobManagerSpec.Replicas, Selector: &metav1.LabelSelector{MatchLabels: podLabels}, ServiceName: jobManagerStatefulSetName, - VolumeClaimTemplates: pvcs, + VolumeClaimTemplates: jobManagerSpec.VolumeClaimTemplates, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: podLabels, @@ -522,15 +513,6 @@ func getDesiredTaskManagerStatefulSet( ServiceAccountName: getServiceAccountName(serviceAccount), } - var pvcs []corev1.PersistentVolumeClaim - if taskManagerSpec.VolumeClaimTemplates != nil { - pvcs = make([]corev1.PersistentVolumeClaim, len(taskManagerSpec.VolumeClaimTemplates)) - for i, pvc := range taskManagerSpec.VolumeClaimTemplates { - pvc.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ToOwnerReference(flinkCluster)} - pvcs[i] = pvc - } - } - var taskManagerStatefulSet = &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: clusterNamespace, @@ -543,7 +525,7 @@ func getDesiredTaskManagerStatefulSet( Replicas: &taskManagerSpec.Replicas, Selector: &metav1.LabelSelector{MatchLabels: podLabels}, ServiceName: taskManagerStatefulSetName, - VolumeClaimTemplates: pvcs, + VolumeClaimTemplates: taskManagerSpec.VolumeClaimTemplates, PodManagementPolicy: "Parallel", Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ diff --git a/controllers/flinkcluster_converter_test.go b/controllers/flinkcluster_converter_test.go index e46542fb..6497ac08 100644 --- a/controllers/flinkcluster_converter_test.go +++ b/controllers/flinkcluster_converter_test.go @@ -608,15 +608,6 @@ func TestGetDesiredClusterState(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: "pvc-test", - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "flinkoperator.k8s.io/v1beta1", - Kind: "FlinkCluster", - Name: "flinkjobcluster-sample", - Controller: &controller, - BlockOwnerDeletion: &blockOwnerDeletion, - }, - }, }, Spec: v1.PersistentVolumeClaimSpec{ AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, diff --git a/controllers/flinkcluster_observer.go b/controllers/flinkcluster_observer.go index b6c38963..8c48fcb3 100644 --- a/controllers/flinkcluster_observer.go +++ b/controllers/flinkcluster_observer.go @@ -49,21 +49,22 @@ type ClusterStateObserver struct { // ObservedClusterState holds observed state of a cluster. type ObservedClusterState struct { - cluster *v1beta1.FlinkCluster - revisions []*appsv1.ControllerRevision - configMap *corev1.ConfigMap - jmStatefulSet *appsv1.StatefulSet - jmService *corev1.Service - jmIngress *extensionsv1beta1.Ingress - tmStatefulSet *appsv1.StatefulSet - job *batchv1.Job - jobPod *corev1.Pod - flinkJobStatus FlinkJobStatus - flinkJobSubmitLog *FlinkJobSubmitLog - savepoint *flinkclient.SavepointStatus - revisionStatus *RevisionStatus - savepointErr error - observeTime time.Time + cluster *v1beta1.FlinkCluster + revisions []*appsv1.ControllerRevision + configMap *corev1.ConfigMap + jmStatefulSet *appsv1.StatefulSet + jmService *corev1.Service + jmIngress *extensionsv1beta1.Ingress + tmStatefulSet *appsv1.StatefulSet + persistentVolumeClaims *corev1.PersistentVolumeClaimList + job *batchv1.Job + jobPod *corev1.Pod + flinkJobStatus FlinkJobStatus + flinkJobSubmitLog *FlinkJobSubmitLog + savepoint *flinkclient.SavepointStatus + revisionStatus *RevisionStatus + savepointErr error + observeTime time.Time } type FlinkJobStatus struct { @@ -196,6 +197,10 @@ func (observer *ClusterStateObserver) observe( // Savepoint observe error do not affect deploy reconciliation loop. observer.observeSavepoint(observed) + var pvcs = new(corev1.PersistentVolumeClaimList) + observer.observePersistentVolumeClaims(pvcs) + observed.persistentVolumeClaims = pvcs + // (Optional) job. err = observer.observeJob(observed) @@ -524,6 +529,31 @@ func (observer *ClusterStateObserver) observeJobPod( return nil } +func (observer *ClusterStateObserver) observePersistentVolumeClaims( + observedClaims *corev1.PersistentVolumeClaimList) error { + var log = observer.log + var clusterNamespace = observer.request.Namespace + var clusterName = observer.request.Name + var selector = labels.SelectorFromSet(map[string]string{"cluster": clusterName}) + + var err = observer.k8sClient.List( + observer.context, + observedClaims, + client.InNamespace(clusterNamespace), + client.MatchingLabelsSelector{Selector: selector}) + if err != nil { + if client.IgnoreNotFound(err) != nil { + log.Error(err, "Failed to get persistent volume claim list") + return err + } + log.Info("Observed persistent volume claim list", "state", "nil") + } else { + log.Info("Observed persistent volume claim list", "state", *observedClaims) + } + + return nil +} + type RevisionStatus struct { currentRevision *appsv1.ControllerRevision nextRevision *appsv1.ControllerRevision diff --git a/controllers/flinkcluster_reconciler.go b/controllers/flinkcluster_reconciler.go index b45ce669..4f987116 100644 --- a/controllers/flinkcluster_reconciler.go +++ b/controllers/flinkcluster_reconciler.go @@ -109,7 +109,15 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) { return ctrl.Result{}, err } + err = reconciler.reconcilePersistentVolumeClaims() + if err != nil { + return ctrl.Result{}, err + } + result, err := reconciler.reconcileJob() + if err != nil { + return ctrl.Result{}, err + } return result, nil } @@ -444,6 +452,51 @@ func (reconciler *ClusterReconciler) deleteConfigMap( return err } +func (reconciler *ClusterReconciler) reconcilePersistentVolumeClaims() error { + observed := reconciler.observed + pvcs := observed.persistentVolumeClaims + jm := observed.jmStatefulSet + tm := observed.tmStatefulSet + + for _, pvc := range pvcs.Items { + if c, ok := pvc.Labels["component"]; ok && c == "jobmanager" && jm != nil { + reconciler.reconcilePersistentVolumeClaim(&pvc, jm) + } + if c, ok := pvc.Labels["component"]; ok && c == "taskmanager" && tm != nil { + reconciler.reconcilePersistentVolumeClaim(&pvc, tm) + } + } + + return nil +} + +func (reconciler *ClusterReconciler) reconcilePersistentVolumeClaim(pvc *corev1.PersistentVolumeClaim, sset *appsv1.StatefulSet) error { + var log = reconciler.log + ctx := reconciler.context + k8sClient := reconciler.k8sClient + + if len(pvc.GetOwnerReferences()) != 0 { + return nil + } + + patch := fmt.Sprintf( + `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":false}],"uid":"%s"}}`, + sset.APIVersion, + sset.Kind, + sset.GetName(), + sset.GetUID(), + pvc.GetUID(), + ) + err := k8sClient.Patch(ctx, pvc, client.RawPatch(types.MergePatchType, []byte(patch))) + if err != nil { + log.Error(err, "Failed to update PersistentVolumeClaim") + } else { + log.Info("PersistentVolumeClaim patched") + } + + return err +} + func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) { var log = reconciler.log var desiredJob = reconciler.desired.Job