Skip to content

Commit

Permalink
feat: worker选取及恢复机制优化 TencentBlueKing#311
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Oct 24, 2024
1 parent 86888ea commit 27ea2f5
Showing 1 changed file with 11 additions and 37 deletions.
48 changes: 11 additions & 37 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,30 +281,6 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS
return
}

/*func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool {
fsm.RLock()
defer fsm.RUnlock()
if fsm.cache == nil {
return false
}
for _, desc := range descs {
c, ok := fsm.cache[desc.FilePath]
if !ok || c == nil || len(*c) == 0 {
continue
}
for _, ci := range *fsm.cache[desc.FilePath] {
if ci.Match(desc) {
if ci.FailCount > fileMaxFailCount {
return true
}
}
}
}
return false
}*/

func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool {
fsm.RLock()
defer fsm.RUnlock()
Expand All @@ -317,7 +293,7 @@ func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool {
if !ok || c == nil || len(*c) == 0 {
continue
}
for _, ci := range *fsm.cache[desc.FilePath] {
for _, ci := range *c {
if ci.Match(desc) {
if ci.SendStatus == types.FileSendFailed {
return true
Expand All @@ -341,12 +317,11 @@ func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc {
Compresstype: dcProtocol.CompressLZ4,
FileSize: ci.Size,
Lastmodifytime: ci.LastModifyTime,
Md5: "",
Md5: ci.Md5,
Targetrelativepath: ci.TargetRelativePath,
Filemode: ci.FileMode,
LinkTarget: ci.LinkTarget,
NoDuplicated: true,
//Priority: dcSDK.GetPriority(ci),
})
}
}
Expand Down Expand Up @@ -591,21 +566,19 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas
defer dcSDK.StatsTimeNow(&req.Stats.RemoteWorkLeaveTime)
m.work.Basic().UpdateJobStats(req.Stats)

servers := m.work.Resource().GetHosts()
m.fileSendMutex.Lock()
for _, c := range req.Req.Commands {
for server, f := range m.fileSendMap {
for _, s := range m.work.Resource().GetHosts() {
m.fileSendMutex.Lock()
f := m.fileSendMap[s.Server]
m.fileSendMutex.Unlock()
if f == nil {
continue
}
if f.isFilesSendFailed(c.Inputfiles) {
for _, s := range servers {
if s.Server == server {
req.BanWorkerList = append(req.BanWorkerList, s)
break
}
}
req.BanWorkerList = append(req.BanWorkerList, s)
}
}
}
m.fileSendMutex.Unlock()
blog.Debugf("remote: try to execute remote task for work(%s) from pid(%d) with ban worker list %v", m.work.ID(), req.Pid, req.BanWorkerList)
// 如果有超过100MB的大文件,则在选择host时,作为选择条件
fpath, _ := getMaxSizeFile(req, m.largeFileSize)
Expand Down Expand Up @@ -728,6 +701,7 @@ func (m *Mgr) retrySendFiles(h *dcProtocol.Host) {
sendMap := m.fileSendMap[h.Server]
if sendMap == nil {
m.fileSendMutex.Unlock()
blog.Errorf("remote: send file for work(%s) with no send map", m.work.ID())
return
}
m.fileSendMutex.Unlock()
Expand Down

0 comments on commit 27ea2f5

Please sign in to comment.