From b591ee77a7a8999f9675fc10958375b9025466f6 Mon Sep 17 00:00:00 2001 From: yanafu Date: Mon, 28 Oct 2024 17:13:10 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 50 +++++++++++++------ 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index ea9a534c..604243b6 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -135,7 +135,8 @@ type Mgr struct { type fileSendMap struct { sync.RWMutex - cache map[string]*[]*types.FileInfo + cache map[string]*[]*types.FileInfo + failedFiles map[string]*[]*types.FileInfo } func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.FileInfo, bool) { @@ -244,6 +245,23 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { return result } +func (fsm *fileSendMap) updateFailStatus(info *types.FileInfo) { + blog.Debugf("file: update failed files with:%v", fsm.failedFiles) + if fsm.failedFiles == nil { + fsm.failedFiles = make(map[string]*[]*types.FileInfo) + } + fc, ok := fsm.failedFiles[info.FullPath] + if !ok || fc == nil { + infoList := []*types.FileInfo{info} + fsm.failedFiles[info.FullPath] = &infoList + blog.Debugf("file: update failed files with:%v", fsm.failedFiles) + return + } + + blog.Debugf("file: update failed files with:%v", fsm.failedFiles) + *fc = append(*fc, info) +} + func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendStatus) { fsm.Lock() defer fsm.Unlock() @@ -267,37 +285,44 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS if !ok || c == nil || len(*c) == 0 { infoList := []*types.FileInfo{info} fsm.cache[desc.FilePath] = &infoList + if status == types.FileSendFailed { + fsm.updateFailStatus(info) + } return } for _, ci := range *c { if ci.Match(desc) { ci.SendStatus = status + //update failed files + if status == types.FileSendFailed { + fsm.updateFailStatus(info) + } return } } *c = append(*c, info) - return + if status == types.FileSendFailed { + fsm.updateFailStatus(info) + } } func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool { fsm.RLock() defer fsm.RUnlock() - if fsm.cache == nil { + if fsm.failedFiles == nil { return false } for _, desc := range descs { - c, ok := fsm.cache[desc.FilePath] + c, ok := fsm.failedFiles[desc.FilePath] if !ok || c == nil || len(*c) == 0 { continue } for _, ci := range *c { if ci.Match(desc) { - if ci.SendStatus == types.FileSendFailed { - return true - } + return true } } } @@ -629,14 +654,9 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas m.work.ID(), req.Pid, req.Server.Server, err, req.Server.Server) m.resource.DisableWorker(req.Server) - //m.retrySendToolChain(handler, req) return nil, err } - /*if m.isFilesAlreadySendFailed(req.Server.Server, req.Req.Commands) { - return nil, fmt.Errorf("remote: no need to send files for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) - }*/ - ret, err = checkHttpConn(req) if err != nil { return ret, err @@ -727,7 +747,7 @@ func (m *Mgr) retrySendFiles(h *dcProtocol.Host) { sendMap := m.fileSendMap[h.Server] if sendMap == nil { m.fileSendMutex.Unlock() - blog.Errorf("remote: send file for work(%s) with no send map", m.work.ID()) + blog.Infof("remote: send file for work(%s) with no send map", m.work.ID()) return } m.fileSendMutex.Unlock() @@ -839,8 +859,8 @@ func (m *Mgr) ensureFiles( settings := m.work.Basic().Settings() blog.Infof("remote: try to ensure multi %d files for work(%s) from pid(%d) dir(%s) to server", len(fileDetails), m.work.ID(), pid, sandbox.Dir) - blog.Debugf("remote: try to ensure multi %d files for work(%s) from pid(%d) dir(%s) to server: %v", - len(fileDetails), m.work.ID(), pid, sandbox.Dir, fileDetails) + //blog.Debugf("remote: try to ensure multi %d files for work(%s) from pid(%d) dir(%s) to server: %v", + // len(fileDetails), m.work.ID(), pid, sandbox.Dir, fileDetails) rules := settings.FilterRules // pump模式下,一次编译依赖的可能有上千个文件,现在的流程会随机的添加到cork发送队列