Skip to content

Commit

Permalink
feat: task server support multi tenant
Browse files Browse the repository at this point in the history
--story=120702788
  • Loading branch information
wcy00000000000000 committed Nov 27, 2024
1 parent ae5065c commit 654276c
Show file tree
Hide file tree
Showing 20 changed files with 508 additions and 369 deletions.
5 changes: 0 additions & 5 deletions src/common/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1457,11 +1457,6 @@ const (
BKBizDefault = "bizdefault"
)

const (
// MetaDataSynchronizeFlagField synchronize flag
MetaDataSynchronizeFlagField = "flag"
)

const (
// AttributePlaceHolderMaxLength TODO
AttributePlaceHolderMaxLength = 2000
Expand Down
89 changes: 68 additions & 21 deletions src/common/index/collections/apitask.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package collections

import (
"configcenter/src/common"
"configcenter/src/common/metadata"
"configcenter/src/storage/dal/types"

"go.mongodb.org/mongo-driver/bson"
Expand All @@ -24,47 +25,93 @@ func init() {
// 先注册未规范化的索引,如果索引出现冲突旧,删除未规范化的索引
registerIndexes(common.BKTableNameAPITask, deprecatedAPITaskIndexes)
registerIndexes(common.BKTableNameAPITask, commAPITaskIndexes)
registerIndexes(common.BKTableNameAPITaskSyncHistory, apiTaskSyncHistoryIndexes)

}

// TODO:
// 新加和修改后的索引,索引名字一定要用对应的前缀,CCLogicUniqueIdxNamePrefix|common.CCLogicIndexNamePrefix
var commAPITaskIndexes = []types.Index{}

// deprecated 未规范化前的索引,只允许删除不允许新加和修改,
var deprecatedAPITaskIndexes = []types.Index{
var commAPITaskIndexes = []types.Index{
{
Name: "idx_taskID",
Keys: bson.D{{
"task_id", 1},
Name: common.CCLogicIndexNamePrefix + "lastTime",
Keys: bson.D{{common.LastTimeField, -1}},
Background: true,
// delete redundant tasks from 6 months ago
ExpireAfterSeconds: 6 * 30 * 24 * 60 * 60,
},
{
Name: common.CCLogicIndexNamePrefix + "taskType_status_createTime",
Keys: bson.D{
{common.BKTaskTypeField, 1},
{common.BKStatusField, 1},
{common.CreateTimeField, 1},
},
Unique: true,
Background: true,
},
{
Name: "idx_name_status_createTime",
Name: common.CCLogicUniqueIdxNamePrefix + "tenantID_taskType_instID_extra",
Keys: bson.D{
{"create_time", 1},
{"name", 1},
{"status", 1},
{common.TenantID, 1},
{common.BKTaskTypeField, 1},
{common.BKInstIDField, 1},
{metadata.APITaskExtraField, 1},
},
Background: true,
Unique: true,
PartialFilterExpression: map[string]interface{}{
common.BKStatusField: map[string]interface{}{
common.BKDBIN: []metadata.APITaskStatus{metadata.APITaskStatusNew, metadata.APITaskStatusWaitExecute,
metadata.APITaskStatusExecute},
},
},
},
{
Name: "idx_status_lastTime",
Name: common.CCLogicUniqueIdxNamePrefix + "tenantID_instID_taskType_createTime",
Keys: bson.D{
{"status", 1},
{"last_time", 1},
{common.TenantID, 1},
{common.BKInstIDField, 1},
{common.BKTaskTypeField, 1},
{common.CreateTimeField, -1},
},
Background: true,
},
}

var apiTaskSyncHistoryIndexes = []types.Index{
{
Name: common.CCLogicIndexNamePrefix + "lastTime",
Keys: bson.D{{common.LastTimeField, -1}},
Background: true,
// delete redundant tasks from 6 months ago
ExpireAfterSeconds: 6 * 30 * 24 * 60 * 60,
},
{
Name: "idx_name_flag_createTime",
Name: common.CCLogicUniqueIdxNamePrefix + "tenantID_taskID_taskType",
Keys: bson.D{
{"name", 1},
{"flag", 1},
{"create_time", 1},
{common.TenantID, 1},
{common.BKTaskIDField, 1},
{common.BKTaskTypeField, 1},
},
Background: true,
},
{
Name: common.CCLogicUniqueIdxNamePrefix + "tenantID_instID_taskType_createTime",
Keys: bson.D{
{common.TenantID, 1},
{common.BKInstIDField, 1},
{common.BKTaskTypeField, 1},
{common.CreateTimeField, -1},
},
Background: true,
},
}

// deprecated 未规范化前的索引,只允许删除不允许新加和修改,
var deprecatedAPITaskIndexes = []types.Index{
{
Name: "idx_taskID",
Keys: bson.D{{
"task_id", 1},
},
Unique: true,
Background: true,
},
}
7 changes: 7 additions & 0 deletions src/common/index/collections/deprecatedindexname.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ var deprecatedIndexName = map[string][]string{
"idx_name_status_createTime",
"idx_status_lastTime",
"idx_name_flag_createTime",
"idx_flag_status_createTime",
"idx_lastTime_status",
"idx_lastTime",
},
common.BKTableNameAPITaskSyncHistory: {
"idx_instID_flag_createTime",
"idx_lastTime",
},
common.BKTableNameBaseProcess: {
"bk_biz_id_1",
Expand Down
4 changes: 2 additions & 2 deletions src/common/metadata/taskserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type APITaskDetail struct {
// Detail 子任务详情列表
Detail []APISubTaskDetail `json:"detail,omitempty" bson:"detail"`
// TenantID 租户ID
TenantID string `json:"tenant_id,omitempty" bson:"tenant_id"`
TenantID string `json:"-" bson:"tenant_id"`
// CreateTime 任务创建时间
CreateTime time.Time `json:"create_time,omitempty" bson:"create_time"`
// LastTime 任务最后更新时间
Expand Down Expand Up @@ -95,7 +95,7 @@ type APITaskSyncStatus struct {
// LastTime 任务最后更新时间
LastTime time.Time `json:"last_time,omitempty" bson:"last_time"`
// TenantID 开发商ID
TenantID string `json:"tenant_id,omitempty" bson:"tenant_id"`
TenantID string `json:"-" bson:"tenant_id"`
}

// APITaskStatus task status type
Expand Down
27 changes: 20 additions & 7 deletions src/common/tablenames.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,15 @@ func GetInstObjIDByTableName(collectionName, tenantID string) (string, error) {
}

var platformTableMap = map[string]struct{}{
BKTableNameSystem: {},
BKTableNameIDgenerator: {},
BKTableNameTenant: {},
BKTableNameTenantTemplate: {},
BKTableNamePlatformAuditLog: {},
BKTableNameWatchToken: {},
BKTableNameLastWatchEvent: {},
BKTableNameSystem: {},
BKTableNameIDgenerator: {},
BKTableNameTenant: {},
BKTableNameTenantTemplate: {},
BKTableNamePlatformAuditLog: {},
BKTableNameWatchToken: {},
BKTableNameLastWatchEvent: {},
BKTableNameAPITask: {},
BKTableNameAPITaskSyncHistory: {},
}

// IsPlatformTable returns if the target table is a platform table
Expand All @@ -287,6 +289,17 @@ func IsPlatformTable(tableName string) bool {
return exists
}

var platformTableWithTenantMap = map[string]struct{}{
BKTableNameAPITask: {},
BKTableNameAPITaskSyncHistory: {},
}

// IsPlatformTableWithTenant returns if the target table is a platform table with tenant id field
func IsPlatformTableWithTenant(tableName string) bool {
_, exists := platformTableWithTenantMap[tableName]
return exists
}

// PlatformTables returns platform tables
func PlatformTables() []string {
tables := make([]string, 0)
Expand Down
13 changes: 13 additions & 0 deletions src/scene_server/admin_server/logics/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"configcenter/src/scene_server/admin_server/app/options"
"configcenter/src/scene_server/admin_server/upgrader"
"configcenter/src/storage/dal"
"configcenter/src/storage/dal/mongo/local"
"configcenter/src/storage/dal/mongo/sharding"
"configcenter/src/storage/dal/types"
"configcenter/src/thirdparty/monitor"
Expand Down Expand Up @@ -210,6 +211,18 @@ func (st *shardingDBTable) syncDBTableIndexes(ctx context.Context) error {
}

blog.Infof("start sync table(%s) index, rid: %s", tableName, platDBTable.rid)

if common.IsPlatformTableWithTenant(tableName) {
err := st.db.ExecForAllDB(func(db local.DB) error {
dbTableInst := &dbTable{db: db, rid: "platform-with-tenant-" + st.rid}
return dbTableInst.syncIndexesToDB(ctx, tableName, indexes, deprecatedIndexNames[tableName])
})
if err != nil {
blog.Warnf("sync table (%s) index failed. err: %v, rid: %s", tableName, err, platDBTable.rid)
continue
}
}

if err := platDBTable.syncIndexesToDB(ctx, tableName, indexes, deprecatedIndexNames[tableName]); err != nil {
blog.Warnf("sync table (%s) index failed. err: %v, rid: %s", tableName, err, platDBTable.rid)
continue
Expand Down
2 changes: 0 additions & 2 deletions src/scene_server/task_server/app/app.go

This file was deleted.

38 changes: 19 additions & 19 deletions src/scene_server/task_server/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* limitations under the License.
*/

// Package app starts task server
package app

import (
Expand All @@ -27,11 +28,12 @@ import (
"configcenter/src/scene_server/task_server/logics"
tasksvc "configcenter/src/scene_server/task_server/service"
"configcenter/src/storage/dal/redis"
"configcenter/src/storage/driver/mongodb"

"github.com/emicklei/go-restful/v3"
)

// Run TODO
// Run starts task server
func Run(ctx context.Context, cancel context.CancelFunc, op *options.ServerOption) error {
svrInfo, err := types.NewServerInfo(op.ServConf)
if err != nil {
Expand All @@ -56,21 +58,18 @@ func Run(ctx context.Context, cancel context.CancelFunc, op *options.ServerOptio
}
configReady := false
for sleepCnt := 0; sleepCnt < common.APPConfigWaitTime; sleepCnt++ {
if nil != taskSrv.Config {
if taskSrv.Config != nil {
configReady = true
break
}
blog.Infof("waiting for config ready ...")
time.Sleep(time.Second)
}
if false == configReady {
if !configReady {
blog.Infof("waiting config timeout.")
return errors.New("configuration item not found")
}
taskSrv.Config.Mongo, err = engine.WithMongo()
if err != nil {
return err
}

taskSrv.Config.Redis, err = engine.WithRedis()
if err != nil {
return err
Expand All @@ -80,29 +79,30 @@ func Run(ctx context.Context, cancel context.CancelFunc, op *options.ServerOptio
blog.Errorf("new redis client failed, err: %s", err.Error())
return fmt.Errorf("new redis client failed, err: %s", err.Error())
}
db, err := taskSrv.Config.Mongo.GetMongoClient()

taskSrv.Config.Mongo, err = engine.WithMongo()
if err != nil {
return err
}
cryptoConf, err := cc.Crypto("crypto")
if err != nil {
blog.Errorf("new mongo client failed, err: %s", err.Error())
return fmt.Errorf("new mongo client failed, err: %s", err.Error())
return fmt.Errorf("get crypto config failed, err: %v", err)
}

initErr := db.InitTxnManager(cacheDB)
if initErr != nil {
blog.Errorf("init txn manager failed, err: %v", initErr)
return initErr
if err = mongodb.SetShardingCli("", &taskSrv.Config.Mongo, cryptoConf); err != nil {
return fmt.Errorf("new mongo client failed, err: %v", err)
}
if initErr := mongodb.Dal().InitTxnManager(cacheDB); initErr != nil {
return fmt.Errorf("init txn manager failed, err: %v", initErr)
}

service.Engine = engine
service.Config = taskSrv.Config
service.CacheDB = cacheDB
service.DB = db
taskSrv.Core = engine
service.Logics = logics.NewLogics(engine.CoreAPI, db)
service.Logics = logics.NewLogics(engine.CoreAPI)
taskSrv.Service = service

// cron job delete history task
go taskSrv.Service.TimerDeleteHistoryTask(ctx)

if err := backbone.StartServer(ctx, cancel, engine, service.WebService(), true); err != nil {
blog.Errorf("start backbone failed, err: %+v", err)
return err
Expand Down
5 changes: 1 addition & 4 deletions src/scene_server/task_server/logics/logics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@ package logics

import (
"configcenter/src/apimachinery"
"configcenter/src/storage/dal"
)

// Logics TODO
type Logics struct {
CoreAPI apimachinery.ClientSetInterface
db dal.RDB
}

// NewLogics get logics handle
func NewLogics(coreAPI apimachinery.ClientSetInterface, db dal.RDB) *Logics {
func NewLogics(coreAPI apimachinery.ClientSetInterface) *Logics {
return &Logics{
CoreAPI: coreAPI,
db: db,
}
}
Loading

0 comments on commit 654276c

Please sign in to comment.