diff --git a/api/disaggregated/v1/types.go b/api/disaggregated/v1/types.go index d82b2c2..7d7ae44 100644 --- a/api/disaggregated/v1/types.go +++ b/api/disaggregated/v1/types.go @@ -39,6 +39,11 @@ type DorisDisaggregatedClusterSpec struct { // the name of secret that type is `kubernetes.io/basic-auth` and contains keys username, password for management doris node in cluster as fe, be register. // the password key is `password`. the username defaults to `root` and is omitempty. AuthSecret string `json:"authSecret,omitempty"` + + // decommission be or not. default value is false. + // if true, will decommission be node when scale down compute group. + // if false, will drop be node when scale down compute group. + EnableDecommission bool `json:"enableDecommission,omitempty"` } type MetaService struct { diff --git a/pkg/common/utils/resource/decommission.go b/pkg/common/utils/resource/decommission.go new file mode 100644 index 0000000..762bc57 --- /dev/null +++ b/pkg/common/utils/resource/decommission.go @@ -0,0 +1,61 @@ +package resource + +import ( + "github.com/apache/doris-operator/pkg/common/utils/mysql" +) + +type DecommissionPhase string + +const ( + Decommissioned DecommissionPhase = "Decommissioned" + Decommissioning DecommissionPhase = "Decommissioning" + DecommissionPhaseSteady DecommissionPhase = "Steady" + DecommissionPhaseUnknown DecommissionPhase = "Unknown" +) + +type DecommissionDetail struct { + AllBackendsSize int + UnDecommissionedCount int + DecommissioningCount int + DecommissionedCount int + BeKeepAmount int +} + +func ConstructDecommissionDetail(allBackends []*mysql.Backend, cgKeepAmount int32) DecommissionDetail { + var unDecommissionedCount, decommissioningCount, decommissionedCount int + + for i := range allBackends { + node := allBackends[i] + if !node.SystemDecommissioned { + unDecommissionedCount++ + } else { + if node.TabletNum == 0 { + decommissionedCount++ + } else { + decommissioningCount++ + } + } + } + + return DecommissionDetail{ + AllBackendsSize: len(allBackends), + UnDecommissionedCount: unDecommissionedCount, + DecommissioningCount: decommissioningCount, + DecommissionedCount: decommissionedCount, + BeKeepAmount: int(cgKeepAmount), + } +} + +func (d *DecommissionDetail) GetDecommissionDetailStatus() DecommissionPhase { + if d.DecommissioningCount == 0 && d.DecommissionedCount == 0 && d.UnDecommissionedCount > d.BeKeepAmount { + return DecommissionPhaseSteady + } + if d.UnDecommissionedCount == d.BeKeepAmount && d.DecommissioningCount > 0 { + return Decommissioning + } + + if d.UnDecommissionedCount == d.BeKeepAmount && d.UnDecommissionedCount+d.DecommissionedCount == d.AllBackendsSize { + return Decommissioned + } + return DecommissionPhaseUnknown +} diff --git a/pkg/controller/disaggregated_cluster_controller.go b/pkg/controller/disaggregated_cluster_controller.go index d8565c2..68b3ea8 100644 --- a/pkg/controller/disaggregated_cluster_controller.go +++ b/pkg/controller/disaggregated_cluster_controller.go @@ -230,13 +230,13 @@ func (dc *DisaggregatedClusterReconciler) Reconcile(ctx context.Context, req rec //display new status. disRes, disErr := func() (ctrl.Result, error) { //reorganize status. - if res, err = dc.reorganizeStatus(&ddc); err != nil { - return res, err + if rsRes, rsErr := dc.reorganizeStatus(&ddc); rsErr != nil { + return rsRes, rsErr } //update cr or status - if res, err = dc.updateObjectORStatus(ctx, &ddc, hv); err != nil { - return res, err + if updRes, updErr := dc.updateObjectORStatus(ctx, &ddc, hv); updErr != nil { + return updRes, updErr } return ctrl.Result{}, nil @@ -247,7 +247,10 @@ func (dc *DisaggregatedClusterReconciler) Reconcile(ctx context.Context, req rec res = disRes } - return res, err + if msg != "" { + return res, errors.New(msg) + } + return res, nil } func (dc *DisaggregatedClusterReconciler) clearUnusedResources(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) { diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go index cb0193d..7fdfc57 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go @@ -81,6 +81,7 @@ func (dcgs *DisaggregatedComputeGroupsController) Sync(ctx context.Context, obj return errors.New("validating compute group failed") } + var errs []error cgs := ddc.Spec.ComputeGroups for i, _ := range cgs { @@ -88,8 +89,24 @@ func (dcgs *DisaggregatedComputeGroupsController) Sync(ctx context.Context, obj if event != nil { dcgs.K8srecorder.Event(ddc, string(event.Type), string(event.Reason), event.Message) } - klog.Errorf("disaggregatedComputeGroupsController computeGroups sync failed, compute group Uniqueid %s sync failed, err=%s", cgs[i].UniqueId, sc.EventString(event)) + errs = append(errs, err) + if err.Error() != "" { + klog.Errorf("disaggregatedComputeGroupsController computeGroups sync failed, compute group Uniqueid %s sync failed, err=%s", cgs[i].UniqueId, sc.EventString(event)) + } + } + } + + if len(errs) != 0 { + msgHead := fmt.Sprintf("disaggregatedComputeGroupsController sync namespace: %s ,ddc name: %s, compute group has the following error: ", ddc.Namespace, ddc.Name) + msg := "" + for _, err := range errs { + msg += err.Error() } + if msg != "" { + return errors.New(msgHead + msg) + } + // msg is "" , means Decommissioning + return errors.New("scaleDown Decommissioning, will Reconcile again, may not be an error, if you meet this error, please ignore it") } return nil @@ -143,13 +160,14 @@ func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.C return event, err } event, err = dcgs.reconcileStatefulset(ctx, st, ddc, cg) - if err != nil { + if err != nil && err.Error() != "" { klog.Errorf("disaggregatedComputeGroupsController reconcile statefulset namespace %s name %s failed, err=%s", st.Namespace, st.Name, err.Error()) } return event, err } +// reconcileStatefulset return bool means reconcile print error message. func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) (*sc.Event, error) { var est appv1.StatefulSet if err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) { @@ -175,27 +193,60 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte } scaleType := getScaleType(st, &est, cgStatus.Phase) - if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool { - return resource.StatefulsetDeepEqualWithKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false) - }); err != nil { - klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) - return &sc.Event{Type: sc.EventWarning, Reason: sc.CGApplyResourceFailed, Message: err.Error()}, err - } - switch scaleType { //case "resume": //case "suspend": case "scaleDown": - cgKeepAmount := *cg.Replicas - cgName := cluster.GetCGName(cg) - if err := dcgs.scaledOutBENodesBySQL(ctx, dcgs.K8sclient, cluster, cgName, cgKeepAmount); err != nil { - cgStatus.Phase = dv1.ScaleDownFailed - klog.Errorf("ScaleDownBE failed, err:%s ", err.Error()) + sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, cluster) + if err != nil { + klog.Errorf("scaleDown getMasterSqlClient failed, get fe master node connection err:%s", err.Error()) return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err } + defer sqlClient.Close() + + cgKeepAmount := *cg.Replicas + cgName := cluster.GetCGName(cg) + + decommissionPhase, err := dcgs.decommissionProgressCheck(cluster, sqlClient, cgName, cgKeepAmount) + if err != nil { + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err + } + + switch decommissionPhase { + case resource.DecommissionPhaseSteady: + err = dcgs.decommissionBENodesBySQL(sqlClient, cgName, cgKeepAmount) + if err != nil { + cgStatus.Phase = dv1.ScaleDownFailed + klog.Errorf("ScaleDownBE decommissionBENodesBySQL failed, err:%s ", err.Error()) + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, + err + } + cgStatus.Phase = dv1.Scaling + return nil, errors.New("") + case resource.Decommissioning: + cgStatus.Phase = dv1.Scaling + klog.Infof("ScaleDownBE decommissionBENodesBySQL in progress") + return nil, errors.New("") + case resource.Decommissioned: + dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount) + default: + // default is DecommissionPhaseUnknown, drop be , not decommission + if err := dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount); err != nil { + cgStatus.Phase = dv1.ScaleDownFailed + klog.Errorf("ScaleDownBE scaledOutBENodesBySQL failed, err:%s ", err.Error()) + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, + err + } + } cgStatus.Phase = dv1.Scaling + } + if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool { + return resource.StatefulsetDeepEqualWithKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false) + }); err != nil { + klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGApplyResourceFailed, Message: err.Error()}, err } return nil, nil @@ -311,7 +362,13 @@ func (dcgs *DisaggregatedComputeGroupsController) ClearResources(ctx context.Con // drop compute group cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-") cgKeepAmount := int32(0) - err := dcgs.scaledOutBENodesBySQL(ctx, dcgs.K8sclient, ddc, cgName, cgKeepAmount) + sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc) + if err != nil { + klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error()) + dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) + } + defer sqlClient.Close() + err = dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount) if err != nil { klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error()) dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) @@ -485,12 +542,79 @@ func (dcgs *DisaggregatedComputeGroupsController) updateCGStatus(ddc *dv1.DorisD return nil } +func getScaledOutBENode( + masterDBClient *mysql.DB, + cgName string, + cgKeepAmount int32) ([]*mysql.Backend, error) { + + allBackends, err := masterDBClient.GetBackendsByCGName(cgName) + if err != nil { + klog.Errorf("scaledOutBEPreprocessing failed, ShowBackends err:%s", err.Error()) + return nil, err + } + + var dropNodes []*mysql.Backend + for i := range allBackends { + node := allBackends[i] + split := strings.Split(node.Host, ".") + splitCGIDArr := strings.Split(split[0], "-") + podNum, err := strconv.Atoi(splitCGIDArr[len(splitCGIDArr)-1]) + if err != nil { + klog.Errorf("scaledOutBEPreprocessing splitCGIDArr can not split host : %s,err:%s", node.Host, err.Error()) + return nil, err + } + if podNum >= int(cgKeepAmount) { + dropNodes = append(dropNodes, node) + } + } + return dropNodes, nil +} + func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesBySQL( - ctx context.Context, k8sclient client.Client, - cluster *dv1.DorisDisaggregatedCluster, + masterDBClient *mysql.DB, cgName string, cgKeepAmount int32) error { + dropNodes, err := getScaledOutBENode(masterDBClient, cgName, cgKeepAmount) + if err != nil { + klog.Errorf("scaledOutBENodesBySQL getScaledOutBENode failed, err:%s ", err.Error()) + return err + } + + if len(dropNodes) == 0 { + return nil + } + err = masterDBClient.DropBE(dropNodes) + if err != nil { + klog.Errorf("scaledOutBENodesBySQL DropBENodes failed, err:%s ", err.Error()) + return err + } + return nil +} + +func (dcgs *DisaggregatedComputeGroupsController) decommissionBENodesBySQL( + masterDBClient *mysql.DB, + cgName string, + cgKeepAmount int32) error { + + dropNodes, err := getScaledOutBENode(masterDBClient, cgName, cgKeepAmount) + if err != nil { + klog.Errorf("decommissionBENodesBySQL getScaledOutBENode failed, err:%s ", err.Error()) + return err + } + + if len(dropNodes) == 0 { + return nil + } + err = masterDBClient.DecommissionBE(dropNodes) + if err != nil { + klog.Errorf("decommissionBENodesBySQL DropBENodes failed, err:%s ", err.Error()) + return err + } + return nil +} + +func (dcgs *DisaggregatedComputeGroupsController) getMasterSqlClient(ctx context.Context, k8sclient client.Client, cluster *dv1.DorisDisaggregatedCluster) (*mysql.DB, error) { // get user and password secret, _ := k8s.GetSecret(ctx, k8sclient, cluster.Namespace, cluster.Spec.AuthSecret) adminUserName, password := resource.GetDorisLoginInformation(secret) @@ -513,39 +637,23 @@ func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesBySQL( // Connect to the master and run the SQL statement of system admin, because it is not excluded that the user can shrink be and fe at the same time masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf) if err != nil { - klog.Errorf("dropNodeBySQLClient NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error()) - return err - } - defer masterDBClient.Close() - - allBackends, err := masterDBClient.GetBackendsByCGName(cgName) - if err != nil { - klog.Errorf("dropNodeBySQLClient failed, ShowBackends err:%s", err.Error()) - return err - } - - var dropNodes []*mysql.Backend - for i := range allBackends { - node := allBackends[i] - split := strings.Split(node.Host, ".") - splitCGIDArr := strings.Split(split[0], "-") - podNum, err := strconv.Atoi(splitCGIDArr[len(splitCGIDArr)-1]) - if err != nil { - klog.Errorf("dropNodeBySQLClient splitCGIDArr can not split host : %s,err:%s", node.Host, err.Error()) - return err - } - if podNum >= int(cgKeepAmount) { - dropNodes = append(dropNodes, node) - } + klog.Errorf("getMasterSqlClient NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error()) + return nil, err } + return masterDBClient, nil +} - if len(dropNodes) == 0 { - return nil +// isDecommissionProgressFinished check decommission status +// if not start decomission or decommission succeed return true +func (dcgs *DisaggregatedComputeGroupsController) decommissionProgressCheck(cluster *dv1.DorisDisaggregatedCluster, masterDBClient *mysql.DB, cgName string, cgKeepAmount int32) (resource.DecommissionPhase, error) { + if !cluster.Spec.EnableDecommission { + return resource.DecommissionPhaseUnknown, nil } - err = masterDBClient.DropBE(dropNodes) + allBackends, err := masterDBClient.GetBackendsByCGName(cgName) if err != nil { - klog.Errorf("dropNodeBySQLClient DropBENodes failed, err:%s ", err.Error()) - return err + klog.Errorf("decommissionProgressCheck failed, ShowBackends err:%s", err.Error()) + return resource.DecommissionPhaseUnknown, err } - return nil + decommissionDetail := resource.ConstructDecommissionDetail(allBackends, cgKeepAmount) + return decommissionDetail.GetDecommissionDetailStatus(), nil }