Skip to content

Commit

Permalink
feat: worker选取及恢复机制优化 TencentBlueKing#311
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Oct 30, 2024
1 parent 7828ae1 commit 9656ca0
Showing 1 changed file with 42 additions and 28 deletions.
70 changes: 42 additions & 28 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.F
if !ok || c == nil || len(*c) == 0 {
infoList := []*types.FileInfo{info}
fsm.cache[desc.FilePath] = &infoList
fsm.updateFailStatus(info)
return info, false
}

Expand All @@ -171,13 +172,15 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.F
if ci.SendStatus == types.FileSendFailed && !query {
blog.Debugf("file: retry send file %s", desc.FilePath)
ci.SendStatus = types.FileSending
fsm.updateFailStatus(ci)
return ci, false
}
return ci, true
}
}

*c = append(*c, info)
fsm.updateFailStatus(info)
return info, false
}

Expand Down Expand Up @@ -210,6 +213,7 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult {
info: info,
match: false,
})
fsm.updateFailStatus(info)
continue
}

Expand All @@ -222,6 +226,7 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult {
blog.Debugf("file: retry send file %s", desc.FilePath)
fileMatched = false
ci.SendStatus = types.FileSending
fsm.updateFailStatus(ci)
}
result = append(result, matchResult{
info: ci,
Expand All @@ -240,26 +245,37 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult {
info: info,
match: false,
})
fsm.updateFailStatus(info)
}

return result
}

func (fsm *fileSendMap) updateFailStatus(info *types.FileInfo) {
blog.Debugf("file: update failed files with:%v", fsm.failedFiles)
if fsm.failedFiles == nil {
fsm.failedFiles = make(map[string]*[]*types.FileInfo)
}
fc, ok := fsm.failedFiles[info.FullPath]
if !ok || fc == nil {
infoList := []*types.FileInfo{info}
fsm.failedFiles[info.FullPath] = &infoList
//blog.Debugf("file: update failed files in status %s with:%v", info.SendStatus, fsm.failedFiles)
if info.SendStatus == types.FileSendFailed { //if file send failed
if fsm.failedFiles == nil {
fsm.failedFiles = make(map[string]*[]*types.FileInfo)
}
fc, ok := fsm.failedFiles[info.FullPath]
if !ok || fc == nil {
infoList := []*types.FileInfo{info}
fsm.failedFiles[info.FullPath] = &infoList
blog.Debugf("file: update failed files with:%v", *fsm.failedFiles[info.FullPath])
return
}
blog.Debugf("file: update failed files with:%v", *fsm.failedFiles[info.FullPath])
*fc = append(*fc, info)
} else { //if file send succeed or sending
if fsm.failedFiles == nil {
return //not update
}
if _, ok := fsm.failedFiles[info.FullPath]; !ok {
return //not update
}
delete(fsm.failedFiles, info.FullPath)
blog.Debugf("file: update failed files with:%v", fsm.failedFiles)
return
}

blog.Debugf("file: update failed files with:%v", fsm.failedFiles)
*fc = append(*fc, info)
}

func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendStatus) {
Expand All @@ -285,27 +301,21 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS
if !ok || c == nil || len(*c) == 0 {
infoList := []*types.FileInfo{info}
fsm.cache[desc.FilePath] = &infoList
if status == types.FileSendFailed {
fsm.updateFailStatus(info)
}
fsm.updateFailStatus(info)
return
}

for _, ci := range *c {
if ci.Match(desc) {
ci.SendStatus = status
//update failed files
if status == types.FileSendFailed {
fsm.updateFailStatus(info)
}
fsm.updateFailStatus(info)
return
}
}

*c = append(*c, info)
if status == types.FileSendFailed {
fsm.updateFailStatus(info)
}
fsm.updateFailStatus(info)
}

func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool {
Expand Down Expand Up @@ -551,8 +561,8 @@ func (m *Mgr) workerCheck(ctx context.Context) {
}
// retry send fail file for all workers
case <-fileRetryTicker.C:
blog.Debugf("remote: try to retry send fail file for work(%s)", m.work.ID())
for _, h := range m.work.Resource().GetHosts() {
blog.Debugf("remote: try to retry send fail file for work(%s) to server %s", m.work.ID(), h.Server)
go m.retrySendFiles(h)
}
//retry failed tool chain for all workers, recover disable worker
Expand Down Expand Up @@ -753,18 +763,22 @@ func (m *Mgr) retrySendFiles(h *dcProtocol.Host) {
if len(failFiles) == 0 {
return
}

blog.Debugf("remote: try to retry send fail file for work(%s) with fail files %v", m.work.ID(), failFiles)
var testStr []string
for _, r := range failFiles {
testStr = append(testStr, r.FilePath)
}
blog.Debugf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, strings.Join(testStr, ","))
if _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{
Pid: 0,
Pid: 1,
Req: failFiles,
Server: h,
Sandbox: &dcSyscall.Sandbox{Dir: ""},
Stats: &dcSDK.ControllerJobStats{},
}); err != nil {
blog.Errorf("mgr: send remote file failed: %v", err)
blog.Errorf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s failed: %v", m.work.ID(), 1, h.Server, err)
return
}
blog.Debugf("remote: try to retry send fail file for work(%s) succeed", m.work.ID())
blog.Debugf("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(sendMap.getFailFiles()))
}

// check if files send to remote worker failed and no need to send again
Expand Down Expand Up @@ -1039,7 +1053,7 @@ func (m *Mgr) ensureFiles(
_ = m.appendCorkFiles(server, needSendCorkFiles)

// notify send
m.sendCorkChan <- true
//m.sendCorkChan <- true
}
} else {
// 单个文件发送模式
Expand Down

0 comments on commit 9656ca0

Please sign in to comment.