Skip to content

Commit

Permalink
feat: worker选取及恢复机制优化 TencentBlueKing#311
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Oct 23, 2024
1 parent 4e26792 commit 86888ea
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 71 deletions.
5 changes: 2 additions & 3 deletions src/backend/booster/bk_dist/booster/pkg/booster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/env"
dcFile "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/file"
dcProtocol "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/protocol"
dcPump "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/pump"
dcSDK "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/sdk"
Expand Down Expand Up @@ -630,7 +629,7 @@ func (b *Booster) unregisterWork() error {
return nil
}

func (b *Booster) sendAdditionFile() {
/*func (b *Booster) sendAdditionFile() {
if b.config.Works.Local || b.config.Works.Degraded {
return
}
Expand Down Expand Up @@ -679,7 +678,7 @@ func (b *Booster) sendAdditionFile() {
return
}
blog.Infof("booster: finish send addition files: %v", b.config.Works.AdditionFiles)
}
}*/

func (b *Booster) runWorks(
ctx context.Context,
Expand Down
214 changes: 154 additions & 60 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
// corkMaxSize = 1024 * 1024 * 1024
largeFileSize = 1024 * 1024 * 100 // 100MB

fileMaxFailCount = 5
//fileMaxFailCount = 5
)

// NewMgr get a new Remote Mgr
Expand Down Expand Up @@ -73,7 +73,8 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr {
conf: work.Config(),
resourceCheckTick: 5 * time.Second,
workerCheckTick: 5 * time.Second,
toolChainRetryTick: 10 * time.Second,
toolChainRetryTick: 6 * time.Second,
fileRetryCheckTick: 7 * time.Second,
sendCorkTick: 10 * time.Millisecond,
corkSize: corkSize,
corkMaxSize: corkMaxSize,
Expand Down Expand Up @@ -116,6 +117,7 @@ type Mgr struct {

resourceCheckTick time.Duration
workerCheckTick time.Duration
fileRetryCheckTick time.Duration
toolChainRetryTick time.Duration
lastUsed uint64 // only accurate to second now
lastApplied uint64 // only accurate to second now
Expand Down Expand Up @@ -166,7 +168,7 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.F
if ci.Match(desc) {
//if worker is send failed before, try to send it again
if ci.SendStatus == types.FileSendFailed && !query {
blog.Debugf("file: retry send file %s, fail count %d", desc.FilePath, ci.FailCount)
blog.Debugf("file: retry send file %s", desc.FilePath)
ci.SendStatus = types.FileSending
return ci, false
}
Expand Down Expand Up @@ -216,7 +218,7 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult {
fileMatched := true
//if file is send failed before, try to send it again
if ci.SendStatus == types.FileSendFailed {
blog.Debugf("file: retry send file %s, fail count %d", desc.FilePath, ci.FailCount)
blog.Debugf("file: retry send file %s", desc.FilePath)
fileMatched = false
ci.SendStatus = types.FileSending
}
Expand Down Expand Up @@ -259,7 +261,6 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS
FileMode: desc.Filemode,
LinkTarget: desc.LinkTarget,
SendStatus: status,
FailCount: 0,
}

c, ok := fsm.cache[desc.FilePath]
Expand All @@ -272,12 +273,6 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS
for _, ci := range *c {
if ci.Match(desc) {
ci.SendStatus = status
if status == types.FileSendFailed {
ci.FailCount++
}
if status == types.FileSendSucceed {
ci.FailCount = 0
}
return
}
}
Expand All @@ -286,7 +281,7 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS
return
}

func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool {
/*func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool {
fsm.RLock()
defer fsm.RUnlock()
Expand All @@ -307,8 +302,57 @@ func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool {
}
}
return false
}*/

func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool {
fsm.RLock()
defer fsm.RUnlock()

if fsm.cache == nil {
return false
}
for _, desc := range descs {
c, ok := fsm.cache[desc.FilePath]
if !ok || c == nil || len(*c) == 0 {
continue
}
for _, ci := range *fsm.cache[desc.FilePath] {
if ci.Match(desc) {
if ci.SendStatus == types.FileSendFailed {
return true
}
}
}
}

return false
}
func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc {
fsm.RLock()
defer fsm.RUnlock()

failFiles := make([]dcSDK.FileDesc, 0)
for _, v := range fsm.cache {
for _, ci := range *v {
if ci.SendStatus == types.FileSendFailed {
failFiles = append(failFiles, dcSDK.FileDesc{
FilePath: ci.FullPath,
Compresstype: dcProtocol.CompressLZ4,
FileSize: ci.Size,
Lastmodifytime: ci.LastModifyTime,
Md5: "",
Targetrelativepath: ci.TargetRelativePath,
Filemode: ci.FileMode,
LinkTarget: ci.LinkTarget,
NoDuplicated: true,
//Priority: dcSDK.GetPriority(ci),
})
}
}
}
return failFiles
}

// Init do the initialization for remote manager
// !! only call once !!
Expand All @@ -335,8 +379,9 @@ func (m *Mgr) Init() {
if m.conf.AutoResourceMgr {
go m.resourceCheck(ctx)
}

go m.workerCheck(ctx)
if m.work.ID() != "" {
go m.workerCheck(ctx)
}

if m.conf.SendCork {
m.sendCorkChan = make(chan bool, 1000)
Expand Down Expand Up @@ -479,14 +524,19 @@ func (m *Mgr) resourceCheck(ctx context.Context) {
func (m *Mgr) workerCheck(ctx context.Context) {
blog.Infof("remote: run worker check tick for work: %s", m.work.ID())
ticker := time.NewTicker(m.workerCheckTick)
fileRetryTicker := time.NewTicker(m.fileRetryCheckTick)
toolChainRetryTicker := time.NewTicker(m.toolChainRetryTick)

defer ticker.Stop()
defer fileRetryTicker.Stop()
defer toolChainRetryTicker.Stop()

for {
select {
case <-ctx.Done():
blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID())
return

//recover dead worker
case <-ticker.C:
handler := m.remoteWorker.Handler(0, nil, nil, nil)
for _, w := range m.resource.GetDeadWorkers() {
Expand All @@ -501,7 +551,24 @@ func (m *Mgr) workerCheck(ctx context.Context) {
}(w)

}

// retry send fail file for all workers
case <-fileRetryTicker.C:
blog.Debugf("remote: try to retry send fail file for work(%s)", m.work.ID())
for _, h := range m.work.Resource().GetHosts() {
go m.retrySendFiles(h)
}
//retry failed tool chain for all workers, recover disable worker
case <-toolChainRetryTicker.C:
blog.Debugf("remote: try to retry send tool chain for work(%s)", m.work.ID())
handler := m.remoteWorker.Handler(0, nil, nil, nil)
for _, h := range m.work.Resource().GetHosts() {
go m.retrySendToolChain(handler, &types.RemoteTaskExecuteRequest{
Pid: 0,
Server: h,
Sandbox: &dcSyscall.Sandbox{Dir: ""},
Stats: &dcSDK.ControllerJobStats{},
})
}
}
}
}
Expand All @@ -524,6 +591,22 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas
defer dcSDK.StatsTimeNow(&req.Stats.RemoteWorkLeaveTime)
m.work.Basic().UpdateJobStats(req.Stats)

servers := m.work.Resource().GetHosts()
m.fileSendMutex.Lock()
for _, c := range req.Req.Commands {
for server, f := range m.fileSendMap {
if f.isFilesSendFailed(c.Inputfiles) {
for _, s := range servers {
if s.Server == server {
req.BanWorkerList = append(req.BanWorkerList, s)
break
}
}
}
}
}
m.fileSendMutex.Unlock()
blog.Debugf("remote: try to execute remote task for work(%s) from pid(%d) with ban worker list %v", m.work.ID(), req.Pid, req.BanWorkerList)
// 如果有超过100MB的大文件,则在选择host时,作为选择条件
fpath, _ := getMaxSizeFile(req, m.largeFileSize)
req.Server = m.lockSlots(dcSDK.JobUsageRemoteExe, fpath, req.BanWorkerList)
Expand Down Expand Up @@ -558,13 +641,13 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas
m.work.ID(), req.Pid, req.Server.Server, err, req.Server.Server)

m.resource.DisableWorker(req.Server)
m.retrySendToolChain(handler, req)
//m.retrySendToolChain(handler, req)
return nil, err
}

if m.isFilesAlreadySendFailed(req.Server.Server, req.Req.Commands) {
/*if m.isFilesAlreadySendFailed(req.Server.Server, req.Req.Commands) {
return nil, fmt.Errorf("remote: no need to send files for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server)
}
}*/
remoteDirs, err := m.ensureFilesWithPriority(handler, req.Pid, req.Sandbox, getFileDetailsFromExecuteRequest(req))
if err != nil {
req.BanWorkerList = append(req.BanWorkerList, req.Server)
Expand Down Expand Up @@ -640,8 +723,34 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error)
)
}

func (m *Mgr) retrySendFiles(h *dcProtocol.Host) {
m.fileSendMutex.Lock()
sendMap := m.fileSendMap[h.Server]
if sendMap == nil {
m.fileSendMutex.Unlock()
return
}
m.fileSendMutex.Unlock()
failFiles := sendMap.getFailFiles()
if len(failFiles) == 0 {
return
}

blog.Debugf("remote: try to retry send fail file for work(%s) with fail files %v", m.work.ID(), failFiles)
if _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{
Pid: 0,
Req: failFiles,
Server: h,
Sandbox: &dcSyscall.Sandbox{Dir: ""},
Stats: &dcSDK.ControllerJobStats{},
}); err != nil {
blog.Errorf("mgr: send remote file failed: %v", err)
}
blog.Debugf("remote: try to retry send fail file for work(%s) succeed", m.work.ID())
}

// check if files send to remote worker failed and no need to send again
func (m *Mgr) isFilesAlreadySendFailed(server string, commands []dcSDK.BKCommand) bool {
/*func (m *Mgr) isFilesAlreadySendFailed(server string, commands []dcSDK.BKCommand) bool {
m.fileSendMutex.Lock()
target, ok := m.fileSendMap[server]
if !ok {
Expand All @@ -656,7 +765,7 @@ func (m *Mgr) isFilesAlreadySendFailed(server string, commands []dcSDK.BKCommand
}
}
return false
}
}*/

func (m *Mgr) ensureFilesWithPriority(
handler dcSDK.RemoteWorkerHandler,
Expand Down Expand Up @@ -768,7 +877,7 @@ func (m *Mgr) ensureFiles(
if f.CompressedSize == -1 || f.FileSize == -1 {
t = dcSDK.FilterRuleHandleDefault
}

blog.Debugf("remote: ensure file %s and match rule %d", fd.File.FilePath, t)
servers := make([]*dcProtocol.Host, 0, 0)
switch t {
case dcSDK.FilterRuleHandleDefault:
Expand Down Expand Up @@ -1307,47 +1416,32 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect
return fcs, true, nil
}

// retry send failed tool chain
func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) {
if m.resource.CanWorkerRetry(req.Server) {
go func(handler dcSDK.RemoteWorkerHandler, req types.RemoteTaskExecuteRequest) {
for i := 0; i < toolChainRetryTimes; {
fileCollections, isNotTeminated, err := m.getFailedFileCollectionByHost(req.Server.Server)
if !isNotTeminated {
time.Sleep(m.toolChainRetryTick)
continue
}
i++
blog.Infof("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s)",
m.work.ID(), i, req.Pid, req.Server.Server)
if err != nil || len(fileCollections) == 0 {
if err != nil {
blog.Errorf("remote: get failed file collection by host(%s) failed: %v", req.Server.Server, err)
} else {
blog.Errorf("remote: retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server)
}
time.Sleep(m.toolChainRetryTick)
continue
}

if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections); err != nil {
blog.Errorf("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s), "+
"send tool chain files failed: %v", m.work.ID(), i, req.Pid, req.Server.Server, err)
time.Sleep(m.toolChainRetryTick)
} else {
// enable worker
m.resource.EnableWorker(req.Server)
blog.Infof("remote: success to retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server)
return
}
fileCollections, isNotTeminated, err := m.getFailedFileCollectionByHost(req.Server.Server)
if err != nil {
blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: %v", m.work.ID(), req.Pid, req.Server.Server, err)
return
}
if !isNotTeminated {
blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) all file collection in host(%s) is not finished", m.work.ID(), req.Pid, req.Server.Server)
return
}
if len(fileCollections) == 0 {
//blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: no filecollection found", m.work.ID(), req.Pid, req.Server.Server)
return
}

}
m.resource.SetWorkerStatus(req.Server, RetryFailed)
blog.Errorf("remote: already retry to send tool chain for work(%s) for %d times from pid(%d) to server(%s) failed, keep worker disable", m.work.ID(), toolChainRetryTimes, req.Pid, req.Server.Server)
}(handler, *req)
} else {
blog.Infof("remote: worker(%s) is alreay retry to send tool chain for work(%s) from pid(%d) to server(%s)",
req.Server.Server, m.work.ID(), req.Pid, req.Server.Server)
blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s)",
m.work.ID(), req.Pid, req.Server.Server)
if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections); err != nil {
blog.Errorf("remote: retry send tool chain for work(%s) from pid(%d) to server(%s), "+
"send tool chain files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err)
return
}
// enable worker
m.resource.EnableWorker(req.Server)
blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) succeed", m.work.ID(), req.Pid, req.Server.Server)

}

Expand Down
Loading

0 comments on commit 86888ea

Please sign in to comment.