Skip to content

Commit

Permalink
feat(dbm-services): 重启服务后重载重启前的运行中的模拟执行任务 TencentBlueKing#8123
Browse files Browse the repository at this point in the history
  • Loading branch information
ymakedaq authored and iSecloud committed Nov 27, 2024
1 parent 4c14cd2 commit ec0f506
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 96 deletions.
8 changes: 2 additions & 6 deletions dbm-services/go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,8 @@ github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/boombuler/barcode v1.0.0 h1:s1TvRnXwL2xJRaccrdcBQMZxq6X7DvsMogtmJeHDdrc=
github.com/bshuster-repo/logrus-logstash-hook v0.4.1 h1:pgAtgj+A31JBVtEHu2uHuEx0n+2ukqUJnS2vVe5pQNA=
github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd h1:rFt+Y/IK1aEZkEHchZRSq9OQbsSzIT/OrI8YFFmRIng=
github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b h1:otBG+dV+YK+Soembjv71DPz3uX/V/6MMlSyD9JBQ6kQ=
Expand Down Expand Up @@ -1116,8 +1118,6 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=
github.com/form3tech-oss/jwt-go v3.2.5+incompatible h1:/l4kBbb4/vGSsdtB5nUe8L7B9mImVMaBPw9L/0TBHU8=
github.com/fsouza/fake-gcs-server v1.17.0 h1:OeH75kBZcZa3ZE+zz/mFdJ2btt9FgqfjI7gIh9+5fvk=
Expand Down Expand Up @@ -1199,8 +1199,6 @@ github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg=
github.com/googleapis/enterprise-certificate-proxy v0.2.1/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k=
github.com/googleapis/enterprise-certificate-proxy v0.2.3 h1:yk9/cqRKtT9wXZSsRH9aurXEpJX+U6FLtpYTdC3R06k=
Expand Down Expand Up @@ -1453,8 +1451,6 @@ github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e h1:aoZm08cpOy4WuID//EZDgc
github.com/posener/complete v1.2.3 h1:NP0eAhjcjImqslEwo/1hq7gpajME0fTLTezBKDqfXqo=
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021 h1:0XM1XL/OFFJjXsYXlG30spTkV/E9+gmd5GD1w2HE8xM=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
Expand Down
7 changes: 7 additions & 0 deletions dbm-services/mysql/db-simulation/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type AppConfig struct {
TdbctlPodResource TdbctlPodResource `yaml:"tdbctlPodResource"`
SimulationNodeLables []LabelItem `yaml:"simulationNodeLables"`
SimulationtaintLables []LabelItem `yaml:"simulationtaintLables"`
Redis RedisDb `yaml:"redis"`
}

// BkRepoConfig bkrepo config
Expand Down Expand Up @@ -106,6 +107,12 @@ type ImgConfig struct {
Image string `yaml:"image"`
}

// RedisDb redis
type RedisDb struct {
Addr string `yaml:"addr"`
Password string `yaml:"password"`
}

func init() {
viper.AutomaticEnv()
GAppConfig.ListenAddr = "0.0.0.0:80"
Expand Down
241 changes: 223 additions & 18 deletions dbm-services/mysql/db-simulation/app/service/simulation_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,36 @@
package service

import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"os"
"regexp"
"slices"
"strings"
"time"

"github.com/bsm/redislock"
"github.com/redis/go-redis/v9"
"github.com/samber/lo"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

util "dbm-services/common/go-pubpkg/cmutil"
"dbm-services/common/go-pubpkg/logger"
"dbm-services/mysql/db-simulation/app"
"dbm-services/mysql/db-simulation/app/config"
"dbm-services/mysql/db-simulation/model"
)

// DelPod 控制运行模拟执行后是否删除拉起的Pod的开关
// 用于保留现场排查问题
var DelPod = true

// HeartbeatInterval 心跳时间间隔
var HeartbeatInterval = 15

// BaseParam 请求模拟执行的基础参数
type BaseParam struct {
//nolint
Expand All @@ -49,6 +61,24 @@ type BaseParam struct {
ExcuteObjects []ExcuteSQLFileObjV2 `json:"execute_objects" binding:"gt=0,dive,required"`
}

// BuildTendbPodName build tendb pod name
func (b BaseParam) BuildTendbPodName() string {
podName := fmt.Sprintf("tendb-%s-%s", strings.ToLower(b.MySQLVersion),
replaceUnderSource(b.TaskId))
return podName
}

// BuildSpiderPodName build spider pod name
func (b BaseParam) BuildSpiderPodName() string {
podName := fmt.Sprintf("spider-%s-%s", strings.ToLower(b.MySQLVersion),
replaceUnderSource(b.TaskId))
return podName
}

func replaceUnderSource(str string) string {
return strings.ReplaceAll(str, "_", "-")
}

// BuildStartArgs mysql pod start args
func (b BaseParam) BuildStartArgs() []string {
if len(b.MySQLStartConfigs) == 0 {
Expand Down Expand Up @@ -114,13 +144,12 @@ var SpiderTaskChan chan SimulationTask
// CtrlChan 并发控制
var ctrlChan chan struct{}

var rdb *redis.Client

func init() {
TaskChan = make(chan SimulationTask, 100)
SpiderTaskChan = make(chan SimulationTask, 100)
ctrlChan = make(chan struct{}, 30)
}

func init() {
timer := time.NewTicker(60 * time.Second)
go func() {
for {
Expand All @@ -134,6 +163,128 @@ func init() {
}
}
}()
logger.Info("redis addr %s", config.GAppConfig.Redis.Addr)
rdb = redis.NewClient(&redis.Options{
Addr: config.GAppConfig.Redis.Addr,
Password: config.GAppConfig.Redis.Password,
DB: 0,
})
go func() {
// 等待一个周期的原因,避免重载执行正常的任务
time.Sleep(time.Duration(HeartbeatInterval) * time.Second)
reloadRunningTaskFromdb()
rdb.Close()
}()
}

// ReloadParam reload running task from db
type ReloadParam struct {
BaseParam
SpiderVersion *string `json:"spider_version,omitempty"`
}

// reloadRunningTaskFromdb 重载重启服务前运行的任务 加锁是避免多个服务同时启动,避免相同任务被重载
func reloadRunningTaskFromdb() {
key := "simulation:reload:lock"
locker := redislock.New(rdb)
ctx := context.Background()
rlock, err := locker.Obtain(ctx, key, 60*time.Second, nil)
if err != nil {
logger.Error("obtain lock failed %v", err)
return
}
defer func() {
// nolint
rlock.Release(ctx)
}()
var tks []model.TbSimulationTask
if err := model.DB.Model(model.TbSimulationTask{}).Where(
//nolint
"phase not in (?) and create_time > DATE_SUB(NOW(),INTERVAL 6 HOUR) and time_to_sec(timediff(heartbeat_time,now())) > ? ",
[]string{model.PhaseDone, model.PhaseReloading}, HeartbeatInterval).Scan(&tks).Error; err != nil {
logger.Error("get running task failed %s", err.Error())
return
}
if len(tks) == 0 {
logger.Info("no need reload running task")
return
}
for _, tk := range tks {
var err error
var req model.TbRequestRecord
var p ReloadParam
var podName string
if err = model.DB.Model(model.TbRequestRecord{}).Where("request_id = ? ", tk.RequestID).Find(&req).
Error; err != nil {
logger.Error("get request content failed %s", err.Error())
//nolint
model.CompleteTask(tk.TaskId, tk.MySQLVersion, model.TaskFailed, "", "", "")
continue
}
if err = json.Unmarshal([]byte(req.RequestBody), &p); err != nil {
logger.Error("get request content failed %s", err.Error())
//nolint
model.CompleteTask(tk.TaskId, tk.MySQLVersion, model.TaskFailed, "", "", "")
continue
}
if p.SpiderVersion == nil {
podName = p.BuildTendbPodName()
} else {
podName = p.BuildSpiderPodName()
}
// delete old pod
var gracePeriodSeconds int64
if slices.Contains([]string{model.PhaseCreatePod, model.PhaseLoadSchema, model.PhaseRunning}, tk.Phase) {
err = Kcs.Cli.CoreV1().Pods(Kcs.Namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriodSeconds,
})
if err != nil {
logger.Error("delete pod failed %s", err.Error())
//nolint
model.CompleteTask(tk.TaskId, tk.MySQLVersion, model.TaskFailed, "", "", "")
continue
}
}
// wait pod delete
time.Sleep(3 * time.Second)
logger.Info("loading task %s", tk.TaskId)
model.UpdatePhase(tk.TaskId, tk.MySQLVersion, model.PhaseReloading)
if p.SpiderVersion != nil {
SpiderTaskChan <- p.BuildTsk(tk.RequestID)
} else {
TaskChan <- p.BuildTsk(tk.RequestID)
}
}
}

// BuildTsk build read load task
func (r ReloadParam) BuildTsk(requestId string) (tsk SimulationTask) {
tsk = SimulationTask{
RequestId: requestId,
DbPodSets: NewDbPodSets(),
BaseParam: &r.BaseParam,
Version: r.MySQLVersion,
}
version := r.MySQLVersion
img, err := GetImgFromMySQLVersion(version)
if err != nil {
logger.Error("GetImgFromMySQLVersion %s failed:%s", version, err.Error())
return
}
tsk.DbImage = img
if r.SpiderVersion != nil {
tsk.SpiderImage, tsk.TdbCtlImage = GetSpiderAndTdbctlImg(*r.SpiderVersion, LatestVersion)
}
tsk.BaseInfo = &MySQLPodBaseInfo{
PodName: fmt.Sprintf("tendb-%s-%s", strings.ToLower(version),
replaceUnderSource(r.TaskId)),
Lables: map[string]string{"task_id": replaceUnderSource(r.TaskId),
"request_id": requestId},
RootPwd: r.TaskId[0:4],
Args: r.BuildStartArgs(),
Charset: r.MySQLCharSet,
}
return tsk
}

func run(task SimulationTask, tkType string) {
Expand All @@ -154,6 +305,21 @@ func run(task SimulationTask, tkType string) {
return
}
}()
doneChan := make(chan struct{})
ticker := time.NewTicker(time.Duration(HeartbeatInterval) * time.Second)
go func() {
for {
select {
case <-ticker.C:
model.UpdateHeartbeat(task.TaskId)
case <-doneChan:
logger.Info("simulation run done")
return
}
}
}()
// 关闭协程
defer func() { ticker.Stop(); doneChan <- struct{}{} }()
xlogger := task.getXlogger()
// create Pod
model.UpdatePhase(task.TaskId, task.MySQLVersion, model.PhaseCreatePod)
Expand Down Expand Up @@ -201,21 +367,7 @@ func (t *SimulationTask) getDbsExcludeSysDb() (err error) {
func (t *SimulationTask) SimulationRun(containerName string, xlogger *logger.Logger) (sstdout, sstderr string,
err error) {
logger.Info("will execute in %s", containerName)
doneChan := make(chan struct{})
ticker := time.NewTicker(30 * time.Second)
go func() {
for {
select {
case <-ticker.C:
model.UpdateHeartbeat(t.TaskId, sstderr, sstdout)
case <-doneChan:
logger.Info("simulation run done")
return
}
}
}()
// 关闭协程
defer func() { ticker.Stop(); doneChan <- struct{}{} }()

model.UpdatePhase(t.TaskId, t.MySQLVersion, model.PhaseLoadSchema)
// Load schema SQL
sstdout, sstderr, err = t.loadSchemaSQL(containerName)
Expand Down Expand Up @@ -383,3 +535,56 @@ func (t *SimulationTask) executeMultFilesObject(e ExcuteSQLFileObjV2, containerN
}
return
}

// GetImgFromMySQLVersion 根据版本获取模拟执行运行的镜像配置
func GetImgFromMySQLVersion(version string) (img string, err error) {
img, errx := model.GetImageName("mysql", version)
if errx == nil {
logger.Info("get image from db img config: %s", img)
return img, nil
}
switch {
case regexp.MustCompile("5.5").MatchString(version):
return config.GAppConfig.Image.Tendb55Img, nil
case regexp.MustCompile("5.6").MatchString(version):
return config.GAppConfig.Image.Tendb56Img, nil
case regexp.MustCompile("5.7").MatchString(version):
return config.GAppConfig.Image.Tendb57Img, nil
case regexp.MustCompile("8.0").MatchString(version):
return config.GAppConfig.Image.Tendb80Img, nil
default:
return "", fmt.Errorf("not match any version")
}
}

// GetSpiderAndTdbctlImg TODO
func GetSpiderAndTdbctlImg(spiderVersion, tdbctlVersion string) (spiderImg, tdbctlImg string) {
return getSpiderImg(spiderVersion), getTdbctlImg(tdbctlVersion)
}

const (
// LatestVersion latest version
LatestVersion = "latest"
)

func getSpiderImg(version string) (img string) {
if lo.IsEmpty(version) {
version = LatestVersion
}
img, errx := model.GetImageName("spider", version)
if errx == nil {
return img
}
return config.GAppConfig.Image.SpiderImg
}

func getTdbctlImg(version string) (img string) {
if lo.IsEmpty(version) {
version = LatestVersion
}
img, errx := model.GetImageName("tdbctl", version)
if errx == nil {
return img
}
return config.GAppConfig.Image.TdbCtlImg
}
4 changes: 4 additions & 0 deletions dbm-services/mysql/db-simulation/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ require (
)

require (
github.com/bsm/redislock v0.9.4 // indirect
github.com/bytedance/sonic v1.10.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
Expand Down Expand Up @@ -61,6 +64,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/redis/go-redis/v9 v9.0.3 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
Expand Down
Loading

0 comments on commit ec0f506

Please sign in to comment.