Skip to content

Commit

Permalink
ddc decommission cg
Browse files Browse the repository at this point in the history
  • Loading branch information
catpineapple committed Dec 3, 2024
1 parent c29e991 commit d5102b9
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 52 deletions.
5 changes: 5 additions & 0 deletions api/disaggregated/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ type DorisDisaggregatedClusterSpec struct {
// the name of secret that type is `kubernetes.io/basic-auth` and contains keys username, password for management doris node in cluster as fe, be register.
// the password key is `password`. the username defaults to `root` and is omitempty.
AuthSecret string `json:"authSecret,omitempty"`

// decommission be or not. default value is false.
// if true, will decommission be node when scale down compute group.
// if false, will drop be node when scale down compute group.
EnableDecommission bool `json:"enableDecommission,omitempty"`
}

type MetaService struct {
Expand Down
61 changes: 61 additions & 0 deletions pkg/common/utils/resource/decommission.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package resource

import (
"github.com/apache/doris-operator/pkg/common/utils/mysql"
)

type DecommissionPhase string

const (
Decommissioned DecommissionPhase = "Decommissioned"
Decommissioning DecommissionPhase = "Decommissioning"
DecommissionPhaseSteady DecommissionPhase = "Steady"
DecommissionPhaseUnknown DecommissionPhase = "Unknown"
)

type DecommissionDetail struct {
AllBackendsSize int
UnDecommissionedCount int
DecommissioningCount int
DecommissionedCount int
BeKeepAmount int
}

func ConstructDecommissionDetail(allBackends []*mysql.Backend, cgKeepAmount int32) DecommissionDetail {
var unDecommissionedCount, decommissioningCount, decommissionedCount int

for i := range allBackends {
node := allBackends[i]
if !node.SystemDecommissioned {
unDecommissionedCount++
} else {
if node.TabletNum == 0 {
decommissionedCount++
} else {
decommissioningCount++
}
}
}

return DecommissionDetail{
AllBackendsSize: len(allBackends),
UnDecommissionedCount: unDecommissionedCount,
DecommissioningCount: decommissioningCount,
DecommissionedCount: decommissionedCount,
BeKeepAmount: int(cgKeepAmount),
}
}

func (d *DecommissionDetail) GetDecommissionDetailStatus() DecommissionPhase {
if d.DecommissioningCount == 0 && d.DecommissionedCount == 0 && d.UnDecommissionedCount > d.BeKeepAmount {
return DecommissionPhaseSteady
}
if d.UnDecommissionedCount == d.BeKeepAmount && d.DecommissioningCount > 0 {
return Decommissioning
}

if d.UnDecommissionedCount == d.BeKeepAmount && d.UnDecommissionedCount+d.DecommissionedCount == d.AllBackendsSize {
return Decommissioned
}
return DecommissionPhaseUnknown
}
13 changes: 8 additions & 5 deletions pkg/controller/disaggregated_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,13 @@ func (dc *DisaggregatedClusterReconciler) Reconcile(ctx context.Context, req rec
//display new status.
disRes, disErr := func() (ctrl.Result, error) {
//reorganize status.
if res, err = dc.reorganizeStatus(&ddc); err != nil {
return res, err
if rsRes, rsErr := dc.reorganizeStatus(&ddc); rsErr != nil {
return rsRes, rsErr
}

//update cr or status
if res, err = dc.updateObjectORStatus(ctx, &ddc, hv); err != nil {
return res, err
if updRes, updErr := dc.updateObjectORStatus(ctx, &ddc, hv); updErr != nil {
return updRes, updErr
}

return ctrl.Result{}, nil
Expand All @@ -247,7 +247,10 @@ func (dc *DisaggregatedClusterReconciler) Reconcile(ctx context.Context, req rec
res = disRes
}

return res, err
if msg != "" {
return res, errors.New(msg)
}
return res, nil
}

func (dc *DisaggregatedClusterReconciler) clearUnusedResources(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,32 @@ func (dcgs *DisaggregatedComputeGroupsController) Sync(ctx context.Context, obj
return errors.New("validating compute group failed")
}

var errs []error
cgs := ddc.Spec.ComputeGroups
for i, _ := range cgs {

if event, err := dcgs.computeGroupSync(ctx, ddc, &cgs[i]); err != nil {
if event != nil {
dcgs.K8srecorder.Event(ddc, string(event.Type), string(event.Reason), event.Message)
}
klog.Errorf("disaggregatedComputeGroupsController computeGroups sync failed, compute group Uniqueid %s sync failed, err=%s", cgs[i].UniqueId, sc.EventString(event))
errs = append(errs, err)
if err.Error() != "" {
klog.Errorf("disaggregatedComputeGroupsController computeGroups sync failed, compute group Uniqueid %s sync failed, err=%s", cgs[i].UniqueId, sc.EventString(event))
}
}
}

if len(errs) != 0 {
msgHead := fmt.Sprintf("disaggregatedComputeGroupsController sync namespace: %s ,ddc name: %s, compute group has the following error: ", ddc.Namespace, ddc.Name)
msg := ""
for _, err := range errs {
msg += err.Error()
}
if msg != "" {
return errors.New(msgHead + msg)
}
// msg is "" , means Decommissioning
return errors.New("scaleDown Decommissioning, will Reconcile again, may not be an error, if you meet this error, please ignore it")
}

return nil
Expand Down Expand Up @@ -143,13 +160,14 @@ func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.C
return event, err
}
event, err = dcgs.reconcileStatefulset(ctx, st, ddc, cg)
if err != nil {
if err != nil && err.Error() != "" {
klog.Errorf("disaggregatedComputeGroupsController reconcile statefulset namespace %s name %s failed, err=%s", st.Namespace, st.Name, err.Error())
}

return event, err
}

// reconcileStatefulset return bool means reconcile print error message.
func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) (*sc.Event, error) {
var est appv1.StatefulSet
if err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) {
Expand All @@ -175,27 +193,60 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte
}
scaleType := getScaleType(st, &est, cgStatus.Phase)

if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
return resource.StatefulsetDeepEqualWithKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false)
}); err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGApplyResourceFailed, Message: err.Error()}, err
}

switch scaleType {
//case "resume":
//case "suspend":
case "scaleDown":
cgKeepAmount := *cg.Replicas
cgName := cluster.GetCGName(cg)
if err := dcgs.scaledOutBENodesBySQL(ctx, dcgs.K8sclient, cluster, cgName, cgKeepAmount); err != nil {
cgStatus.Phase = dv1.ScaleDownFailed
klog.Errorf("ScaleDownBE failed, err:%s ", err.Error())
sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, cluster)
if err != nil {
klog.Errorf("scaleDown getMasterSqlClient failed, get fe master node connection err:%s", err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()},
err
}
defer sqlClient.Close()

cgKeepAmount := *cg.Replicas
cgName := cluster.GetCGName(cg)

decommissionPhase, err := dcgs.decommissionProgressCheck(cluster, sqlClient, cgName, cgKeepAmount)
if err != nil {
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err
}

switch decommissionPhase {
case resource.DecommissionPhaseSteady:
err = dcgs.decommissionBENodesBySQL(sqlClient, cgName, cgKeepAmount)
if err != nil {
cgStatus.Phase = dv1.ScaleDownFailed
klog.Errorf("ScaleDownBE decommissionBENodesBySQL failed, err:%s ", err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()},
err
}
cgStatus.Phase = dv1.Scaling
return nil, errors.New("")
case resource.Decommissioning:
cgStatus.Phase = dv1.Scaling
klog.Infof("ScaleDownBE decommissionBENodesBySQL in progress")
return nil, errors.New("")
case resource.Decommissioned:
dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount)
default:
// default is DecommissionPhaseUnknown, drop be , not decommission
if err := dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount); err != nil {
cgStatus.Phase = dv1.ScaleDownFailed
klog.Errorf("ScaleDownBE scaledOutBENodesBySQL failed, err:%s ", err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()},
err
}
}
cgStatus.Phase = dv1.Scaling
}

if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
return resource.StatefulsetDeepEqualWithKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false)
}); err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGApplyResourceFailed, Message: err.Error()}, err
}

return nil, nil
Expand Down Expand Up @@ -311,7 +362,13 @@ func (dcgs *DisaggregatedComputeGroupsController) ClearResources(ctx context.Con
// drop compute group
cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-")
cgKeepAmount := int32(0)
err := dcgs.scaledOutBENodesBySQL(ctx, dcgs.K8sclient, ddc, cgName, cgKeepAmount)
sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc)
if err != nil {
klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
}
defer sqlClient.Close()
err = dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount)
if err != nil {
klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
Expand Down Expand Up @@ -485,12 +542,79 @@ func (dcgs *DisaggregatedComputeGroupsController) updateCGStatus(ddc *dv1.DorisD
return nil
}

func getScaledOutBENode(
masterDBClient *mysql.DB,
cgName string,
cgKeepAmount int32) ([]*mysql.Backend, error) {

allBackends, err := masterDBClient.GetBackendsByCGName(cgName)
if err != nil {
klog.Errorf("scaledOutBEPreprocessing failed, ShowBackends err:%s", err.Error())
return nil, err
}

var dropNodes []*mysql.Backend
for i := range allBackends {
node := allBackends[i]
split := strings.Split(node.Host, ".")
splitCGIDArr := strings.Split(split[0], "-")
podNum, err := strconv.Atoi(splitCGIDArr[len(splitCGIDArr)-1])
if err != nil {
klog.Errorf("scaledOutBEPreprocessing splitCGIDArr can not split host : %s,err:%s", node.Host, err.Error())
return nil, err
}
if podNum >= int(cgKeepAmount) {
dropNodes = append(dropNodes, node)
}
}
return dropNodes, nil
}

func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesBySQL(
ctx context.Context, k8sclient client.Client,
cluster *dv1.DorisDisaggregatedCluster,
masterDBClient *mysql.DB,
cgName string,
cgKeepAmount int32) error {

dropNodes, err := getScaledOutBENode(masterDBClient, cgName, cgKeepAmount)
if err != nil {
klog.Errorf("scaledOutBENodesBySQL getScaledOutBENode failed, err:%s ", err.Error())
return err
}

if len(dropNodes) == 0 {
return nil
}
err = masterDBClient.DropBE(dropNodes)
if err != nil {
klog.Errorf("scaledOutBENodesBySQL DropBENodes failed, err:%s ", err.Error())
return err
}
return nil
}

func (dcgs *DisaggregatedComputeGroupsController) decommissionBENodesBySQL(
masterDBClient *mysql.DB,
cgName string,
cgKeepAmount int32) error {

dropNodes, err := getScaledOutBENode(masterDBClient, cgName, cgKeepAmount)
if err != nil {
klog.Errorf("decommissionBENodesBySQL getScaledOutBENode failed, err:%s ", err.Error())
return err
}

if len(dropNodes) == 0 {
return nil
}
err = masterDBClient.DecommissionBE(dropNodes)
if err != nil {
klog.Errorf("decommissionBENodesBySQL DropBENodes failed, err:%s ", err.Error())
return err
}
return nil
}

func (dcgs *DisaggregatedComputeGroupsController) getMasterSqlClient(ctx context.Context, k8sclient client.Client, cluster *dv1.DorisDisaggregatedCluster) (*mysql.DB, error) {
// get user and password
secret, _ := k8s.GetSecret(ctx, k8sclient, cluster.Namespace, cluster.Spec.AuthSecret)
adminUserName, password := resource.GetDorisLoginInformation(secret)
Expand All @@ -513,39 +637,23 @@ func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesBySQL(
// Connect to the master and run the SQL statement of system admin, because it is not excluded that the user can shrink be and fe at the same time
masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf)
if err != nil {
klog.Errorf("dropNodeBySQLClient NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error())
return err
}
defer masterDBClient.Close()

allBackends, err := masterDBClient.GetBackendsByCGName(cgName)
if err != nil {
klog.Errorf("dropNodeBySQLClient failed, ShowBackends err:%s", err.Error())
return err
}

var dropNodes []*mysql.Backend
for i := range allBackends {
node := allBackends[i]
split := strings.Split(node.Host, ".")
splitCGIDArr := strings.Split(split[0], "-")
podNum, err := strconv.Atoi(splitCGIDArr[len(splitCGIDArr)-1])
if err != nil {
klog.Errorf("dropNodeBySQLClient splitCGIDArr can not split host : %s,err:%s", node.Host, err.Error())
return err
}
if podNum >= int(cgKeepAmount) {
dropNodes = append(dropNodes, node)
}
klog.Errorf("getMasterSqlClient NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error())
return nil, err
}
return masterDBClient, nil
}

if len(dropNodes) == 0 {
return nil
// isDecommissionProgressFinished check decommission status
// if not start decomission or decommission succeed return true
func (dcgs *DisaggregatedComputeGroupsController) decommissionProgressCheck(cluster *dv1.DorisDisaggregatedCluster, masterDBClient *mysql.DB, cgName string, cgKeepAmount int32) (resource.DecommissionPhase, error) {
if !cluster.Spec.EnableDecommission {
return resource.DecommissionPhaseUnknown, nil
}
err = masterDBClient.DropBE(dropNodes)
allBackends, err := masterDBClient.GetBackendsByCGName(cgName)
if err != nil {
klog.Errorf("dropNodeBySQLClient DropBENodes failed, err:%s ", err.Error())
return err
klog.Errorf("decommissionProgressCheck failed, ShowBackends err:%s", err.Error())
return resource.DecommissionPhaseUnknown, err
}
return nil
decommissionDetail := resource.ConstructDecommissionDetail(allBackends, cgKeepAmount)
return decommissionDetail.GetDecommissionDetailStatus(), nil
}

0 comments on commit d5102b9

Please sign in to comment.