From 27ea2f5eba2611baaac82e78db5673d1c5024554 Mon Sep 17 00:00:00 2001 From: yanafu Date: Thu, 24 Oct 2024 09:42:46 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 48 +++++-------------- 1 file changed, 11 insertions(+), 37 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 6a0566b8..f7e4cd44 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -281,30 +281,6 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS return } -/*func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool { - fsm.RLock() - defer fsm.RUnlock() - - if fsm.cache == nil { - return false - } - for _, desc := range descs { - c, ok := fsm.cache[desc.FilePath] - if !ok || c == nil || len(*c) == 0 { - continue - } - for _, ci := range *fsm.cache[desc.FilePath] { - if ci.Match(desc) { - if ci.FailCount > fileMaxFailCount { - return true - } - } - } - } - - return false -}*/ - func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool { fsm.RLock() defer fsm.RUnlock() @@ -317,7 +293,7 @@ func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool { if !ok || c == nil || len(*c) == 0 { continue } - for _, ci := range *fsm.cache[desc.FilePath] { + for _, ci := range *c { if ci.Match(desc) { if ci.SendStatus == types.FileSendFailed { return true @@ -341,12 +317,11 @@ func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc { Compresstype: dcProtocol.CompressLZ4, FileSize: ci.Size, Lastmodifytime: ci.LastModifyTime, - Md5: "", + Md5: ci.Md5, Targetrelativepath: ci.TargetRelativePath, Filemode: ci.FileMode, LinkTarget: ci.LinkTarget, NoDuplicated: true, - //Priority: dcSDK.GetPriority(ci), }) } } @@ -591,21 +566,19 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas defer dcSDK.StatsTimeNow(&req.Stats.RemoteWorkLeaveTime) m.work.Basic().UpdateJobStats(req.Stats) - servers := m.work.Resource().GetHosts() - m.fileSendMutex.Lock() for _, c := range req.Req.Commands { - for server, f := range m.fileSendMap { + for _, s := range m.work.Resource().GetHosts() { + m.fileSendMutex.Lock() + f := m.fileSendMap[s.Server] + m.fileSendMutex.Unlock() + if f == nil { + continue + } if f.isFilesSendFailed(c.Inputfiles) { - for _, s := range servers { - if s.Server == server { - req.BanWorkerList = append(req.BanWorkerList, s) - break - } - } + req.BanWorkerList = append(req.BanWorkerList, s) } } } - m.fileSendMutex.Unlock() blog.Debugf("remote: try to execute remote task for work(%s) from pid(%d) with ban worker list %v", m.work.ID(), req.Pid, req.BanWorkerList) // 如果有超过100MB的大文件,则在选择host时,作为选择条件 fpath, _ := getMaxSizeFile(req, m.largeFileSize) @@ -728,6 +701,7 @@ func (m *Mgr) retrySendFiles(h *dcProtocol.Host) { sendMap := m.fileSendMap[h.Server] if sendMap == nil { m.fileSendMutex.Unlock() + blog.Errorf("remote: send file for work(%s) with no send map", m.work.ID()) return } m.fileSendMutex.Unlock()