Skip to content

Commit

Permalink
modify status for decommission
Browse files Browse the repository at this point in the history
catpineapple committed Dec 4, 2024
1 parent 36a8ff7 commit fd931ea
Showing 3 changed files with 39 additions and 39 deletions.
1 change: 1 addition & 0 deletions api/disaggregated/v1/types.go
Original file line number Diff line number Diff line change
@@ -304,6 +304,7 @@ const (

//Scaling represents service in Scaling.
Scaling Phase = "Scaling"
Decommissioning Phase = "Decommissioning"
ScaleDownFailed Phase = "ScaleDownFailed"
ResumeFailed Phase = "ResumeFailed"
SuspendFailed Phase = "SuspendFailed"
12 changes: 6 additions & 6 deletions pkg/common/utils/resource/decommission.go
Original file line number Diff line number Diff line change
@@ -26,19 +26,19 @@ type DecommissionPhase string
const (
Decommissioned DecommissionPhase = "Decommissioned"
Decommissioning DecommissionPhase = "Decommissioning"
DecommissionPhaseSteady DecommissionPhase = "Steady"
DecommissionBefore DecommissionPhase = "DecommissionBefore"
DecommissionPhaseUnknown DecommissionPhase = "Unknown"
)

type DecommissionDetail struct {
type ComputeNodeStatusCounts struct {
AllBackendsSize int
UnDecommissionedCount int
DecommissioningCount int
DecommissionedCount int
BeKeepAmount int
}

func ConstructDecommissionDetail(allBackends []*mysql.Backend, cgKeepAmount int32) DecommissionDetail {
func ConstructComputeNodeStatusCounts(allBackends []*mysql.Backend, cgKeepAmount int32) ComputeNodeStatusCounts {
var unDecommissionedCount, decommissioningCount, decommissionedCount int

for i := range allBackends {
@@ -54,7 +54,7 @@ func ConstructDecommissionDetail(allBackends []*mysql.Backend, cgKeepAmount int3
}
}

return DecommissionDetail{
return ComputeNodeStatusCounts{
AllBackendsSize: len(allBackends),
UnDecommissionedCount: unDecommissionedCount,
DecommissioningCount: decommissioningCount,
@@ -63,9 +63,9 @@ func ConstructDecommissionDetail(allBackends []*mysql.Backend, cgKeepAmount int3
}
}

func (d *DecommissionDetail) GetDecommissionDetailStatus() DecommissionPhase {
func (d *ComputeNodeStatusCounts) GetDecommissionStatus() DecommissionPhase {
if d.DecommissioningCount == 0 && d.DecommissionedCount == 0 && d.UnDecommissionedCount > d.BeKeepAmount {
return DecommissionPhaseSteady
return DecommissionBefore
}
if d.UnDecommissionedCount == d.BeKeepAmount && d.DecommissioningCount > 0 {
return Decommissioning
Original file line number Diff line number Diff line change
@@ -191,7 +191,7 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte
break
}
}
scaleType := getScaleType(st, &est, cgStatus.Phase)
scaleType := getOperationType(st, &est, cgStatus.Phase)

switch scaleType {
//case "resume":
@@ -208,30 +208,31 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte
cgKeepAmount := *cg.Replicas
cgName := cluster.GetCGName(cg)

decommissionPhase, err := dcgs.decommissionProgressCheck(cluster, sqlClient, cgName, cgKeepAmount)
if err != nil {
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err
}

switch decommissionPhase {
case resource.DecommissionPhaseSteady:
err = dcgs.decommissionBENodesBySQL(sqlClient, cgName, cgKeepAmount)
if cluster.Spec.EnableDecommission {
decommissionPhase, err := dcgs.decommissionProgressCheck(sqlClient, cgName, cgKeepAmount)
if err != nil {
cgStatus.Phase = dv1.ScaleDownFailed
klog.Errorf("ScaleDownBE decommissionBENodesBySQL failed, err:%s ", err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()},
err
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err
}

switch decommissionPhase {
case resource.DecommissionBefore:
err = dcgs.decommissionBENodesBySQL(sqlClient, cgName, cgKeepAmount)
if err != nil {
cgStatus.Phase = dv1.ScaleDownFailed
klog.Errorf("ScaleDownBE decommissionBENodesBySQL failed, err:%s ", err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()},
err
}
cgStatus.Phase = dv1.Decommissioning
return nil, errors.New("")
case resource.Decommissioning, resource.DecommissionPhaseUnknown:
cgStatus.Phase = dv1.Decommissioning
klog.Infof("ScaleDownBE decommissionBENodesBySQL in progress")
return nil, errors.New("")
case resource.Decommissioned:
dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount)
}
cgStatus.Phase = dv1.Scaling
return nil, errors.New("")
case resource.Decommissioning:
cgStatus.Phase = dv1.Scaling
klog.Infof("ScaleDownBE decommissionBENodesBySQL in progress")
return nil, errors.New("")
case resource.Decommissioned:
dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount)
default:
// default is DecommissionPhaseUnknown, drop be , not decommission
} else { // not decommission , drop node
if err := dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount); err != nil {
cgStatus.Phase = dv1.ScaleDownFailed
klog.Errorf("ScaleDownBE scaledOutBENodesBySQL failed, err:%s ", err.Error())
@@ -252,8 +253,9 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte
return nil, nil
}

func getScaleType(st, est *appv1.StatefulSet, phase dv1.Phase) string {
if *(st.Spec.Replicas) < *(est.Spec.Replicas) || phase == dv1.ScaleDownFailed {
func getOperationType(st, est *appv1.StatefulSet, phase dv1.Phase) string {
//Should not check 'phase == dv1.Ready', because the default value of the state initialization is Reconciling in the new Reconcile
if *(st.Spec.Replicas) < *(est.Spec.Replicas) || phase == dv1.Decommissioning || phase == dv1.ScaleDownFailed {
return "scaleDown"
}
return ""
@@ -277,7 +279,7 @@ func (dcgs *DisaggregatedComputeGroupsController) initialCGStatus(ddc *dv1.Doris
if cgss[i].UniqueId == uniqueId {
if cgss[i].Phase == dv1.ScaleDownFailed || cgss[i].Phase == dv1.Suspended ||
cgss[i].Phase == dv1.SuspendFailed || cgss[i].Phase == dv1.ResumeFailed ||
cgss[i].Phase == dv1.Scaling {
cgss[i].Phase == dv1.Scaling || cgss[i].Phase == dv1.Decommissioning {
defaultStatus.Phase = cgss[i].Phase
}
defaultStatus.SuspendReplicas = cgss[i].SuspendReplicas
@@ -644,16 +646,13 @@ func (dcgs *DisaggregatedComputeGroupsController) getMasterSqlClient(ctx context
}

// isDecommissionProgressFinished check decommission status
// if not start decomission or decommission succeed return true
func (dcgs *DisaggregatedComputeGroupsController) decommissionProgressCheck(cluster *dv1.DorisDisaggregatedCluster, masterDBClient *mysql.DB, cgName string, cgKeepAmount int32) (resource.DecommissionPhase, error) {
if !cluster.Spec.EnableDecommission {
return resource.DecommissionPhaseUnknown, nil
}
// if not start decommission or decommission succeed return true
func (dcgs *DisaggregatedComputeGroupsController) decommissionProgressCheck(masterDBClient *mysql.DB, cgName string, cgKeepAmount int32) (resource.DecommissionPhase, error) {
allBackends, err := masterDBClient.GetBackendsByCGName(cgName)
if err != nil {
klog.Errorf("decommissionProgressCheck failed, ShowBackends err:%s", err.Error())
return resource.DecommissionPhaseUnknown, err
}
decommissionDetail := resource.ConstructDecommissionDetail(allBackends, cgKeepAmount)
return decommissionDetail.GetDecommissionDetailStatus(), nil
decommissionDetail := resource.ConstructComputeNodeStatusCounts(allBackends, cgKeepAmount)
return decommissionDetail.GetDecommissionStatus(), nil
}

0 comments on commit fd931ea

Please sign in to comment.