Skip to content

Commit

Permalink
feat: worker选取及恢复机制优化 TencentBlueKing#311
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Oct 28, 2024
1 parent e151518 commit b591ee7
Showing 1 changed file with 35 additions and 15 deletions.
50 changes: 35 additions & 15 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type Mgr struct {

type fileSendMap struct {
sync.RWMutex
cache map[string]*[]*types.FileInfo
cache map[string]*[]*types.FileInfo
failedFiles map[string]*[]*types.FileInfo
}

func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.FileInfo, bool) {
Expand Down Expand Up @@ -244,6 +245,23 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult {
return result
}

func (fsm *fileSendMap) updateFailStatus(info *types.FileInfo) {
blog.Debugf("file: update failed files with:%v", fsm.failedFiles)
if fsm.failedFiles == nil {
fsm.failedFiles = make(map[string]*[]*types.FileInfo)
}
fc, ok := fsm.failedFiles[info.FullPath]
if !ok || fc == nil {
infoList := []*types.FileInfo{info}
fsm.failedFiles[info.FullPath] = &infoList
blog.Debugf("file: update failed files with:%v", fsm.failedFiles)
return
}

blog.Debugf("file: update failed files with:%v", fsm.failedFiles)
*fc = append(*fc, info)
}

func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendStatus) {
fsm.Lock()
defer fsm.Unlock()
Expand All @@ -267,37 +285,44 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS
if !ok || c == nil || len(*c) == 0 {
infoList := []*types.FileInfo{info}
fsm.cache[desc.FilePath] = &infoList
if status == types.FileSendFailed {
fsm.updateFailStatus(info)
}
return
}

for _, ci := range *c {
if ci.Match(desc) {
ci.SendStatus = status
//update failed files
if status == types.FileSendFailed {
fsm.updateFailStatus(info)
}
return
}
}

*c = append(*c, info)
return
if status == types.FileSendFailed {
fsm.updateFailStatus(info)
}
}

func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool {
fsm.RLock()
defer fsm.RUnlock()

if fsm.cache == nil {
if fsm.failedFiles == nil {
return false
}
for _, desc := range descs {
c, ok := fsm.cache[desc.FilePath]
c, ok := fsm.failedFiles[desc.FilePath]
if !ok || c == nil || len(*c) == 0 {
continue
}
for _, ci := range *c {
if ci.Match(desc) {
if ci.SendStatus == types.FileSendFailed {
return true
}
return true
}
}
}
Expand Down Expand Up @@ -629,14 +654,9 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas
m.work.ID(), req.Pid, req.Server.Server, err, req.Server.Server)

m.resource.DisableWorker(req.Server)
//m.retrySendToolChain(handler, req)
return nil, err
}

/*if m.isFilesAlreadySendFailed(req.Server.Server, req.Req.Commands) {
return nil, fmt.Errorf("remote: no need to send files for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server)
}*/

ret, err = checkHttpConn(req)
if err != nil {
return ret, err
Expand Down Expand Up @@ -727,7 +747,7 @@ func (m *Mgr) retrySendFiles(h *dcProtocol.Host) {
sendMap := m.fileSendMap[h.Server]
if sendMap == nil {
m.fileSendMutex.Unlock()
blog.Errorf("remote: send file for work(%s) with no send map", m.work.ID())
blog.Infof("remote: send file for work(%s) with no send map", m.work.ID())
return
}
m.fileSendMutex.Unlock()
Expand Down Expand Up @@ -839,8 +859,8 @@ func (m *Mgr) ensureFiles(
settings := m.work.Basic().Settings()
blog.Infof("remote: try to ensure multi %d files for work(%s) from pid(%d) dir(%s) to server",
len(fileDetails), m.work.ID(), pid, sandbox.Dir)
blog.Debugf("remote: try to ensure multi %d files for work(%s) from pid(%d) dir(%s) to server: %v",
len(fileDetails), m.work.ID(), pid, sandbox.Dir, fileDetails)
//blog.Debugf("remote: try to ensure multi %d files for work(%s) from pid(%d) dir(%s) to server: %v",
// len(fileDetails), m.work.ID(), pid, sandbox.Dir, fileDetails)
rules := settings.FilterRules

// pump模式下,一次编译依赖的可能有上千个文件,现在的流程会随机的添加到cork发送队列
Expand Down

0 comments on commit b591ee7

Please sign in to comment.