Skip to content

Commit

Permalink
fix: hscale is always running if fail pods are exist before doing hsc…
Browse files Browse the repository at this point in the history
…ale (#3305)
  • Loading branch information
wangyelei authored May 18, 2023
1 parent b5abe41 commit 414601b
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 52 deletions.
81 changes: 48 additions & 33 deletions controllers/apps/operations/ops_progress_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,7 @@ func handleStatefulSetProgress(reqCtx intctrlutil.RequestCtx,
podList *corev1.PodList,
pgRes progressResource,
compStatus *appsv1alpha1.OpsRequestComponentStatus) (int32, error) {
currComponent, err := components.NewComponentByType(cli,
opsRes.Cluster, pgRes.clusterComponent, *pgRes.clusterComponentDef)
if err != nil {
return 0, err
}
var componentName = pgRes.clusterComponent.Name
minReadySeconds, err := util.GetComponentStsMinReadySeconds(reqCtx.Ctx,
cli, *opsRes.Cluster, componentName)
currComponent, minReadySeconds, err := getCompImplAndMinReadySeconds(reqCtx, cli, opsRes, pgRes)
if err != nil {
return 0, err
}
Expand All @@ -280,6 +273,23 @@ func handleStatefulSetProgress(reqCtx intctrlutil.RequestCtx,
return completedCount, err
}

func getCompImplAndMinReadySeconds(reqCtx intctrlutil.RequestCtx,
cli client.Client,
opsRes *OpsResource,
pgRes progressResource) (types.Component, int32, error) {
currComponent, err := components.NewComponentByType(cli,
opsRes.Cluster, pgRes.clusterComponent, *pgRes.clusterComponentDef)
if err != nil {
return nil, 0, err
}
minReadySeconds, err := util.GetComponentStsMinReadySeconds(reqCtx.Ctx,
cli, *opsRes.Cluster, pgRes.clusterComponent.Name)
if err != nil {
return nil, 0, err
}
return currComponent, minReadySeconds, nil
}

// handlePendingProgressDetail handles the pending progressDetail and sets it to progressDetails.
func handlePendingProgressDetail(opsRes *OpsResource,
compStatus *appsv1alpha1.OpsRequestComponentStatus,
Expand Down Expand Up @@ -435,7 +445,7 @@ func handleComponentProgressForScalingReplicas(reqCtx intctrlutil.RequestCtx,
expectProgressCount = dValue * -1
}
if !isScaleOut {
completedCount, err = handleScaleDownProgress(opsRes, pgRes, podList, compStatus)
completedCount, err = handleScaleDownProgress(reqCtx, cli, opsRes, pgRes, podList, compStatus)
expectProgressCount = getFinalExpectCount(compStatus, expectProgressCount)
return expectProgressCount, completedCount, err
}
Expand All @@ -455,14 +465,7 @@ func handleScaleOutProgress(reqCtx intctrlutil.RequestCtx,
pgRes progressResource,
podList *corev1.PodList,
compStatus *appsv1alpha1.OpsRequestComponentStatus) (int32, error) {
var componentName = pgRes.clusterComponent.Name
currComponent, err := components.NewComponentByType(cli,
opsRes.Cluster, pgRes.clusterComponent, *pgRes.clusterComponentDef)
if err != nil {
return 0, err
}
minReadySeconds, err := util.GetComponentWorkloadMinReadySeconds(reqCtx.Ctx,
cli, *opsRes.Cluster, pgRes.clusterComponentDef.WorkloadType, componentName)
currComponent, minReadySeconds, err := getCompImplAndMinReadySeconds(reqCtx, cli, opsRes, pgRes)
if err != nil {
return 0, err
}
Expand All @@ -476,30 +479,19 @@ func handleScaleOutProgress(reqCtx intctrlutil.RequestCtx,
progressDetail := appsv1alpha1.ProgressStatusDetail{ObjectKey: objectKey}
if currComponent.PodIsAvailable(&v, minReadySeconds) {
completedCount += 1
message := fmt.Sprintf("Successfully created pod: %s in Component: %s", objectKey, componentName)
progressDetail.SetStatusAndMessage(appsv1alpha1.SucceedProgressStatus, message)
setComponentStatusProgressDetail(opsRes.Recorder, opsRes.OpsRequest,
&compStatus.ProgressDetails, progressDetail)
pgRes.opsMessageKey = "created"
handleSucceedProgressDetail(opsRes, pgRes, compStatus, progressDetail)
continue
}

if util.IsFailedOrAbnormal(compStatus.Phase) {
// means the pod is failed.
podMessage := getFailedPodMessage(opsRes.Cluster, componentName, &v)
message := fmt.Sprintf("Failed to create pod: %s in Component: %s, message: %s", objectKey, componentName, podMessage)
progressDetail.SetStatusAndMessage(appsv1alpha1.FailedProgressStatus, message)
completedCount += 1
} else {
progressDetail.SetStatusAndMessage(appsv1alpha1.ProcessingProgressStatus, "Start to create pod: "+objectKey)
}
setComponentStatusProgressDetail(opsRes.Recorder, opsRes.OpsRequest,
&compStatus.ProgressDetails, progressDetail)
completedCount += handleFailedOrProcessingProgressDetail(opsRes, pgRes, compStatus, progressDetail, &v)
}
return completedCount, nil
}

// handleScaleDownProgress handles the progressDetails of scaled down replicas.
func handleScaleDownProgress(
reqCtx intctrlutil.RequestCtx,
cli client.Client,
opsRes *OpsResource,
pgRes progressResource,
podList *corev1.PodList,
Expand Down Expand Up @@ -537,6 +529,29 @@ func handleScaleDownProgress(
setComponentStatusProgressDetail(opsRes.Recorder, opsRes.OpsRequest,
&compStatus.ProgressDetails, progressDetail)
}
// handle the re-created pods if these pods are failed before doing horizontal scaling.
currComponent, minReadySeconds, err := getCompImplAndMinReadySeconds(reqCtx, cli, opsRes, pgRes)
if err != nil {
return 0, err
}
for _, v := range podList.Items {
objectKey := getProgressObjectKey(constant.PodKind, v.Name)
progressDetail := findStatusProgressDetail(compStatus.ProgressDetails, objectKey)
if progressDetail == nil {
continue
}
if isCompletedProgressStatus(progressDetail.Status) {
completedCount += 1
continue
}
pgRes.opsMessageKey = "re-create"
if currComponent.PodIsAvailable(&v, minReadySeconds) {
completedCount += 1
handleSucceedProgressDetail(opsRes, pgRes, compStatus, *progressDetail)
continue
}
completedCount += handleFailedOrProcessingProgressDetail(opsRes, pgRes, compStatus, *progressDetail, &v)
}
return completedCount, nil
}

Expand Down
47 changes: 28 additions & 19 deletions controllers/apps/operations/ops_progress_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package operations

import (
"fmt"
"time"

. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -122,8 +121,7 @@ var _ = Describe("Ops ProgressDetails", func() {

By("create restart ops and pods of consensus component")
opsRes.OpsRequest = createRestartOpsObj(clusterName, "restart-"+randomStr)
mockComponentIsOperating(opsRes.Cluster, appsv1alpha1.SpecReconcilingClusterCompPhase, consensusComp, statelessComp) // appsv1alpha1.RebootingPhase
// TODO: add RebootingPhase status condition
mockComponentIsOperating(opsRes.Cluster, appsv1alpha1.SpecReconcilingClusterCompPhase, consensusComp, statelessComp)
podList := initConsensusPods(ctx, k8sClient, opsRes, clusterName)

By("mock restart OpsRequest is Running")
Expand All @@ -136,13 +134,11 @@ var _ = Describe("Ops ProgressDetails", func() {

By("test the progressDetails when stateless pod updates during restart operation")
Expect(opsRes.OpsRequest.Status.Components[statelessComp].Phase).Should(Equal(appsv1alpha1.SpecReconcilingClusterCompPhase)) // appsv1alpha1.RebootingPhase
// TODO: check RebootingPhase status condition
testProgressDetailsWithStatelessPodUpdating(reqCtx, opsRes)

By("create horizontalScaling operation to test the progressDetails when scaling down the replicas")
opsRes.OpsRequest = createHorizontalScaling(clusterName, 1)
opsRes.OpsRequest = createHorizontalScaling(clusterName, 2)
mockComponentIsOperating(opsRes.Cluster, appsv1alpha1.SpecReconcilingClusterCompPhase, consensusComp) // appsv1alpha1.HorizontalScalingPhase
// TODO: add HorizontalScalingPhase status condition
initClusterForOps(opsRes)

By("mock HorizontalScaling OpsRequest phase is running")
Expand All @@ -153,25 +149,40 @@ var _ = Describe("Ops ProgressDetails", func() {
_, err = GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())

By("mock the pod is terminating")
pod := &podList[0]
pod.Kind = constant.PodKind
testk8s.MockPodIsTerminating(ctx, testCtx, pod)
By("mock the pod is terminating, pod[0] is target pod to delete. and mock pod[1] is failed and deleted by stateful controller")
for i := 0; i < 2; i++ {
pod := &podList[i]
pod.Kind = constant.PodKind
testk8s.MockPodIsTerminating(ctx, testCtx, pod)
_, _ = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(getProgressDetailStatus(opsRes, consensusComp, pod)).Should(Equal(appsv1alpha1.ProcessingProgressStatus))

}
By("mock the target pod is deleted and progressDetail status should be succeed")
targetPod := &podList[0]
testk8s.RemovePodFinalizer(ctx, testCtx, targetPod)
_, _ = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(getProgressDetailStatus(opsRes, consensusComp, pod)).Should(Equal(appsv1alpha1.ProcessingProgressStatus))
Expect(getProgressDetailStatus(opsRes, consensusComp, targetPod)).Should(Equal(appsv1alpha1.SucceedProgressStatus))
Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("1/2"))

By("mock the pod is deleted and progressDetail status should be succeed")
By("mock the pod[1] to re-create")
pod := &podList[1]
testk8s.RemovePodFinalizer(ctx, testCtx, pod)
testapps.MockConsensusComponentStsPod(&testCtx, nil, clusterName, consensusComp,
pod.Name, "Follower", "ReadWrite")
// expect the progress is 2/2
_, _ = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(getProgressDetailStatus(opsRes, consensusComp, pod)).Should(Equal(appsv1alpha1.SucceedProgressStatus))
Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("1/2"))
Expect(getProgressDetailStatus(opsRes, consensusComp, targetPod)).Should(Equal(appsv1alpha1.SucceedProgressStatus))
Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("2/2"))

By("create horizontalScaling operation to test the progressDetails when scaling up the replicas ")
initClusterForOps(opsRes)
expectClusterComponentReplicas := int32(2)
Expect(testapps.ChangeObj(&testCtx, opsRes.Cluster, func(lcluster *appsv1alpha1.Cluster) {
lcluster.Spec.ComponentSpecs[1].Replicas = expectClusterComponentReplicas
})).ShouldNot(HaveOccurred())
// ops will use the startTimestamp to make decision, start time should not equal the pod createTime during testing.
time.Sleep(time.Second)
opsRes.OpsRequest = createHorizontalScaling(clusterName, 3)
// update ops phase to Running first
_, err = GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expand All @@ -182,13 +193,11 @@ var _ = Describe("Ops ProgressDetails", func() {
Expect(err).ShouldNot(HaveOccurred())

By("test the progressDetails when scaling up replicas")
podName := fmt.Sprintf("%s-%s-%d", clusterName, consensusComp, 0)
testapps.MockConsensusComponentStsPod(&testCtx, nil, clusterName, consensusComp,
podName, "leader", "ReadWrite")
pod = &corev1.Pod{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: podName, Namespace: testCtx.DefaultNamespace}, pod)).Should(Succeed())
targetPod.Name, "leader", "ReadWrite")
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: targetPod.Name, Namespace: testCtx.DefaultNamespace}, targetPod)).Should(Succeed())
_, _ = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(getProgressDetailStatus(opsRes, consensusComp, pod)).Should(Equal(appsv1alpha1.SucceedProgressStatus))
Expect(getProgressDetailStatus(opsRes, consensusComp, targetPod)).Should(Equal(appsv1alpha1.SucceedProgressStatus))
Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("1/1"))
})

Expand Down

0 comments on commit 414601b

Please sign in to comment.