Skip to content

Commit

Permalink
add rolling restart
Browse files Browse the repository at this point in the history
  • Loading branch information
catpineapple committed Nov 21, 2024
1 parent 820c6a8 commit 5e70cdb
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 28 deletions.
7 changes: 3 additions & 4 deletions api/doris/v1/doriscluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
const (
//ComponentsResourceHash the component hash
ComponentResourceHash string = "app.doris.components/hash"

FERestartAt string = "apache.doris.fe/restartedAt"
BERestartAt string = "apache.doris.be/restartedAt"
)

// the labels key
Expand All @@ -43,10 +46,6 @@ const (
OwnerReference string = "app.doris.ownerreference/name"

ServiceRoleForCluster string = "app.doris.service/role"

FERestartAt string = "apache.doris.fe/restartedAt"
BERestartAt string = "apache.doris.be/restartedAt"
DorisRollingRestartAt string = "apache.doris.org/restartedAt"
)

type ServiceRole string
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/sub_controller/be/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (be *Controller) UpdateComponentStatus(cluster *v1.DorisCluster) error {
return nil
}

return be.ClassifyPodsByStatus(cluster.Namespace, cluster.Status.BEStatus, v1.GenerateStatefulSetSelector(cluster, v1.Component_BE), *cluster.Spec.BeSpec.Replicas)
return be.ClassifyPodsByStatus(cluster.Namespace, cluster.Status.BEStatus, v1.GenerateStatefulSetSelector(cluster, v1.Component_BE), *cluster.Spec.BeSpec.Replicas, v1.Component_BE)
}

func (be *Controller) ClearResources(ctx context.Context, dcr *v1.DorisCluster) (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/sub_controller/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (bk *Controller) UpdateComponentStatus(cluster *v1.DorisCluster) error {

cluster.Status.BrokerStatus = bs
bs.AccessService = v1.GenerateExternalServiceName(cluster, v1.Component_Broker)
return bk.ClassifyPodsByStatus(cluster.Namespace, bs, v1.GenerateStatefulSetSelector(cluster, v1.Component_Broker), *cluster.Spec.BrokerSpec.Replicas)
return bk.ClassifyPodsByStatus(cluster.Namespace, bs, v1.GenerateStatefulSetSelector(cluster, v1.Component_Broker), *cluster.Spec.BrokerSpec.Replicas, v1.Component_Broker)

}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/sub_controller/cn/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (cn *Controller) UpdateComponentStatus(cluster *dorisv1.DorisCluster) error

replicas := *est.Spec.Replicas
cs.AccessService = dorisv1.GenerateExternalServiceName(cluster, dorisv1.Component_CN)
return cn.ClassifyPodsByStatus(cluster.Namespace, &cs.ComponentStatus, dorisv1.GenerateStatefulSetSelector(cluster, dorisv1.Component_CN), replicas)
return cn.ClassifyPodsByStatus(cluster.Namespace, &cs.ComponentStatus, dorisv1.GenerateStatefulSetSelector(cluster, dorisv1.Component_CN), replicas, dorisv1.Component_CN)
}

// autoscaler represents start autoscaler or not.
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/sub_controller/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ var (
MSServiceDeletedFailed EventReason = "MSServiceDeletedFailed"
MSStatefulsetDeleteFailed EventReason = "MSStatefulsetDeleteFailed"
FDBAddressNotConfiged EventReason = "FDBAddressNotConfiged"
RestartParamIllegal EventReason = "RestartParamIllegal"
RollingRestart EventReason = "BERestarting"
RestartParameterIllegal EventReason = "RestartParameterIllegal"
RollingRestart EventReason = "Restarting"
)

type Event struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/sub_controller/fe/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (fc *Controller) UpdateComponentStatus(cluster *v1.DorisCluster) error {
return nil
}

return fc.ClassifyPodsByStatus(cluster.Namespace, cluster.Status.FEStatus, v1.GenerateStatefulSetSelector(cluster, v1.Component_FE), *cluster.Spec.FeSpec.Replicas)
return fc.ClassifyPodsByStatus(cluster.Namespace, cluster.Status.FEStatus, v1.GenerateStatefulSetSelector(cluster, v1.Component_FE), *cluster.Spec.FeSpec.Replicas, v1.Component_FE)
}

// New construct a FeController.
Expand Down
45 changes: 27 additions & 18 deletions pkg/controller/sub_controller/sub_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,16 @@ type SubDefaultController struct {
func (d *SubDefaultController) CheckRestartTimeAndInject(dcr *dorisv1.DorisCluster, componentType dorisv1.ComponentType) bool {
var baseSpec *dorisv1.BaseSpec
var restartedAt string
var restartAnnotationsKey string
switch componentType {
case dorisv1.Component_FE:
baseSpec = &dcr.Spec.FeSpec.BaseSpec
restartedAt = dcr.Annotations[dorisv1.FERestartAt]
restartAnnotationsKey = dorisv1.FERestartAt
case dorisv1.Component_BE:
baseSpec = &dcr.Spec.BeSpec.BaseSpec
restartedAt = dcr.Annotations[dorisv1.BERestartAt]
restartAnnotationsKey = dorisv1.BERestartAt
default:
klog.Errorf("CheckRestartTimeAndInject dorisClusterName %s, namespace %s componentType %s not supported.", dcr.Name, dcr.Namespace, componentType)
}
Expand All @@ -80,7 +83,7 @@ func (d *SubDefaultController) CheckRestartTimeAndInject(dcr *dorisv1.DorisClust
if err != nil {
checkErr := fmt.Errorf("CheckRestartTimeAndInject error: time format is incorrect. dorisClusterName: %s, namespace: %s, componentType %s, wrong parse 'restartedAt': %s , error: %s", dcr.Name, dcr.Namespace, componentType, restartedAt, err.Error())
klog.Error(checkErr.Error())
d.K8srecorder.Event(dcr, string(EventWarning), string(RestartParamIllegal), checkErr.Error())
d.K8srecorder.Event(dcr, string(EventWarning), string(RestartParameterIllegal), checkErr.Error())
return false
}

Expand All @@ -89,48 +92,54 @@ func (d *SubDefaultController) CheckRestartTimeAndInject(dcr *dorisv1.DorisClust
if restartAt.After(parseTime) {
checkErr := fmt.Errorf("CheckRestartTimeAndInject The time has expired, dorisClusterName: %s, namespace: %s, componentType %s, wrong parse 'restartedAt': %s : The time has expired, if you want to restart doris, please set a future time", dcr.Name, dcr.Namespace, componentType, restartedAt)
klog.Error(checkErr.Error())
d.K8srecorder.Event(dcr, string(EventWarning), string(RestartParamIllegal), checkErr.Error())
d.K8srecorder.Event(dcr, string(EventWarning), string(RestartParameterIllegal), checkErr.Error())
return false
}

klog.Infof("CheckRestartTime successed, DCR %s in namespace %s, will restart %s ", dcr.Name, dcr.Namespace, componentType)
d.K8srecorder.Event(
dcr,
string(EventNormal),
string(RollingRestart),
fmt.Sprintf("CheckRestartTime successed, DCR %s in namespace %s, restart %s ", dcr.Name, dcr.Namespace, componentType),
)

// check passed, set annotations to doriscluster baseSpec
if baseSpec.Annotations == nil {
baseSpec.Annotations = make(map[string]string)
}
baseSpec.Annotations[dorisv1.DorisRollingRestartAt] = restartedAt
baseSpec.Annotations[restartAnnotationsKey] = restartedAt
return true
}

// UpdateStatus update the component status on src.
func (d *SubDefaultController) UpdateStatus(namespace string, status *dorisv1.ComponentStatus, labels map[string]string, replicas int32) error {
return d.ClassifyPodsByStatus(namespace, status, labels, replicas)
func (d *SubDefaultController) UpdateStatus(namespace string, status *dorisv1.ComponentStatus, labels map[string]string, replicas int32, componentType dorisv1.ComponentType) error {
return d.ClassifyPodsByStatus(namespace, status, labels, replicas, componentType)
}

func (d *SubDefaultController) ClassifyPodsByStatus(namespace string, status *dorisv1.ComponentStatus, labels map[string]string, replicas int32) error {
func (d *SubDefaultController) ClassifyPodsByStatus(namespace string, status *dorisv1.ComponentStatus, labels map[string]string, replicas int32, componentType dorisv1.ComponentType) error {
var podList corev1.PodList
if err := d.K8sclient.List(context.Background(), &podList, client.InNamespace(namespace), client.MatchingLabels(labels)); err != nil {
return err
}

var creatings, readys, faileds []string
var firstRestartAnnotation string
var firstRestartAnnotation, restartAnnotationsKey string
podmap := make(map[string]corev1.Pod)
if len(podList.Items) > 0 {
firstRestartAnnotation = podList.Items[0].Annotations[dorisv1.DorisRollingRestartAt]

if len(podList.Items) == 0 {
status.RunningMembers = readys
status.FailedMembers = faileds
status.CreatingMembers = creatings
return nil
}

switch componentType {
case dorisv1.Component_FE:
restartAnnotationsKey = dorisv1.FERestartAt
case dorisv1.Component_BE:
restartAnnotationsKey = dorisv1.BERestartAt
}
firstRestartAnnotation = podList.Items[0].Annotations[restartAnnotationsKey]

//get all pod status that controlled by st.
stsRollingRestartAnnotationsSameCheck := true
for _, pod := range podList.Items {
stsRollingRestartAnnotationsSameCheck = stsRollingRestartAnnotationsSameCheck && pod.Annotations[dorisv1.DorisRollingRestartAt] == firstRestartAnnotation
if pod.Annotations[restartAnnotationsKey] != firstRestartAnnotation {
stsRollingRestartAnnotationsSameCheck = false
}
podmap[pod.Name] = pod
if ready := k8s.PodIsReady(&pod.Status); ready {
readys = append(readys, pod.Name)
Expand Down

0 comments on commit 5e70cdb

Please sign in to comment.