Skip to content

Commit

Permalink
modify decommission
Browse files Browse the repository at this point in the history
  • Loading branch information
catpineapple committed Dec 9, 2024
1 parent 8c08871 commit 2d6d9de
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 38 deletions.
4 changes: 0 additions & 4 deletions api/disaggregated/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,6 @@ type Phase string

const (
Ready Phase = "Ready"
//Upgrading represents the spec of the service changed, service in smoothing upgrade.
Upgrading Phase = "Upgrading"
//Failed represents service failed to start, can't be accessed.
Failed Phase = "Failed"
//Creating represents service in creating stage.
Reconciling Phase = "Reconciling"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,12 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte

err := dcgs.preApplyStatefulSet(ctx, st, &est, cluster, cg)
if err != nil {
if skipApplyStatefulset(err) {
return nil, nil
}
klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset preApplyStatefulSet namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err
}
if skipApplyStatefulset(cluster, cg) {
return nil, nil
}

if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
return resource.StatefulsetDeepEqualWithKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false)
Expand Down Expand Up @@ -209,9 +209,7 @@ func (dcgs *DisaggregatedComputeGroupsController) initialCGStatus(ddc *dv1.Doris

for i := range cgss {
if cgss[i].UniqueId == uniqueId {
if cgss[i].Phase == dv1.ScaleDownFailed || cgss[i].Phase == dv1.Suspended ||
cgss[i].Phase == dv1.SuspendFailed || cgss[i].Phase == dv1.ResumeFailed ||
cgss[i].Phase == dv1.Scaling || cgss[i].Phase == dv1.Decommissioning {
if cgss[i].Phase != dv1.Ready {
defaultStatus.Phase = cgss[i].Phase
}
defaultStatus.SuspendReplicas = cgss[i].SuspendReplicas
Expand Down Expand Up @@ -292,22 +290,23 @@ func (dcgs *DisaggregatedComputeGroupsController) ClearResources(ctx context.Con
}
if !cleared {
eCGs = append(eCGs, clearCGs[i])
} else {
// drop compute group
cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-")
cgKeepAmount := int32(0)
sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc)
if err != nil {
klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
}
defer sqlClient.Close()
err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount)
if err != nil {
klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
}
continue
}
// drop compute group
cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-")
cgKeepAmount := int32(0)
sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc)
if err != nil {
klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
}
defer sqlClient.Close()
err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount)
if err != nil {
klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
}

}

for i := range eCGs {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package computegroups

import (
"context"
"errors"
dv1 "github.com/apache/doris-operator/api/disaggregated/v1"
"github.com/apache/doris-operator/pkg/common/utils/k8s"
"github.com/apache/doris-operator/pkg/common/utils/mysql"
Expand All @@ -31,8 +30,6 @@ import (
"strings"
)

const decommissioningMessage = "decommissionBENodes in progress"

func (dcgs *DisaggregatedComputeGroupsController) preApplyStatefulSet(ctx context.Context, st, est *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) error {
var cgStatus *dv1.ComputeGroupStatus
uniqueId := cg.UniqueId
Expand All @@ -50,6 +47,8 @@ func (dcgs *DisaggregatedComputeGroupsController) preApplyStatefulSet(ctx contex
if err != nil {
return err
}
default:
// default do nothing, not need pre ApplyStatefulSet
}

return nil
Expand All @@ -68,22 +67,22 @@ func (dcgs *DisaggregatedComputeGroupsController) scaleOut(ctx context.Context,
cgName := cluster.GetCGName(cg)

if cluster.Spec.EnableDecommission {
if err := dcgs.scaledOutBENodesByDecommission(cgStatus, sqlClient, cgName, cgKeepAmount); err != nil {
if err := dcgs.scaledOutBENodesByDecommission(cluster, cgStatus, sqlClient, cgName, cgKeepAmount); err != nil {
return err
}
} else { // not decommission , drop node
if err := dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount); err != nil {
cgStatus.Phase = dv1.ScaleDownFailed
klog.Errorf("ScaleOut scaledOutBENodesByDrop failed, err:%s ", err.Error())
klog.Errorf("ScaleOut scaledOutBENodesByDrop ddcName:%s, namespace:%s, computeGroupName:%s, drop nodes failed:%s ", cluster.Name, cluster.Namespace, cgName, err.Error())
return err
}
cgStatus.Phase = dv1.Scaling
}
cgStatus.Phase = dv1.Scaling
// return nil will apply sts
return nil
}

func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission(cgStatus *dv1.ComputeGroupStatus, sqlClient *mysql.DB, cgName string, cgKeepAmount int32) error {
func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission(cluster *dv1.DorisDisaggregatedCluster, cgStatus *dv1.ComputeGroupStatus, sqlClient *mysql.DB, cgName string, cgKeepAmount int32) error {
decommissionPhase, err := dcgs.decommissionProgressCheck(sqlClient, cgName, cgKeepAmount)
if err != nil {
return err
Expand All @@ -93,18 +92,19 @@ func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission
err = dcgs.decommissionBENodes(sqlClient, cgName, cgKeepAmount)
if err != nil {
cgStatus.Phase = dv1.ScaleDownFailed
klog.Errorf("scaledOutBENodesByDecommission failed, err:%s ", err.Error())
klog.Errorf("scaledOutBENodesByDecommission ddcName:%s, namespace:%s, computeGroupName:%s , Decommission failed, err:%s ", cluster.Name, cluster.Namespace, cgName, err.Error())
return err
}
cgStatus.Phase = dv1.Decommissioning
return errors.New(decommissioningMessage)
return nil
case resource.Decommissioning, resource.DecommissionPhaseUnknown:
cgStatus.Phase = dv1.Decommissioning
klog.Infof("scaledOutBENodesByDecommission in progress")
return errors.New(decommissioningMessage)
klog.Infof("scaledOutBENodesByDecommission ddcName:%s, namespace:%s, computeGroupName:%s, Decommission in progress", cluster.Name, cluster.Namespace, cgName)
return nil
case resource.Decommissioned:
dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount)
}
cgStatus.Phase = dv1.Scaling
return nil
}

Expand Down Expand Up @@ -229,8 +229,17 @@ func getScaledOutBENode(
return dropNodes, nil
}

func skipApplyStatefulset(err error) bool {
if err == nil || err.Error() == decommissioningMessage {
func skipApplyStatefulset(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) bool {
var cgStatus *dv1.ComputeGroupStatus
uniqueId := cg.UniqueId
for i := range ddc.Status.ComputeGroupStatuses {
if ddc.Status.ComputeGroupStatuses[i].UniqueId == uniqueId {
cgStatus = &ddc.Status.ComputeGroupStatuses[i]
break
}
}

if cgStatus.Phase == dv1.Decommissioning {
return true
}
return false
Expand Down

0 comments on commit 2d6d9de

Please sign in to comment.