From 9656ca0f9c3390eeedcff5048eeed11ea7344086 Mon Sep 17 00:00:00 2001 From: yanafu Date: Wed, 30 Oct 2024 09:39:49 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 70 +++++++++++-------- 1 file changed, 42 insertions(+), 28 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 3c72d120..4432321f 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -162,6 +162,7 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.F if !ok || c == nil || len(*c) == 0 { infoList := []*types.FileInfo{info} fsm.cache[desc.FilePath] = &infoList + fsm.updateFailStatus(info) return info, false } @@ -171,6 +172,7 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.F if ci.SendStatus == types.FileSendFailed && !query { blog.Debugf("file: retry send file %s", desc.FilePath) ci.SendStatus = types.FileSending + fsm.updateFailStatus(ci) return ci, false } return ci, true @@ -178,6 +180,7 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.F } *c = append(*c, info) + fsm.updateFailStatus(info) return info, false } @@ -210,6 +213,7 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { info: info, match: false, }) + fsm.updateFailStatus(info) continue } @@ -222,6 +226,7 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { blog.Debugf("file: retry send file %s", desc.FilePath) fileMatched = false ci.SendStatus = types.FileSending + fsm.updateFailStatus(ci) } result = append(result, matchResult{ info: ci, @@ -240,26 +245,37 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { info: info, match: false, }) + fsm.updateFailStatus(info) } return result } func (fsm *fileSendMap) updateFailStatus(info *types.FileInfo) { - blog.Debugf("file: update failed files with:%v", fsm.failedFiles) - if fsm.failedFiles == nil { - fsm.failedFiles = make(map[string]*[]*types.FileInfo) - } - fc, ok := fsm.failedFiles[info.FullPath] - if !ok || fc == nil { - infoList := []*types.FileInfo{info} - fsm.failedFiles[info.FullPath] = &infoList + //blog.Debugf("file: update failed files in status %s with:%v", info.SendStatus, fsm.failedFiles) + if info.SendStatus == types.FileSendFailed { //if file send failed + if fsm.failedFiles == nil { + fsm.failedFiles = make(map[string]*[]*types.FileInfo) + } + fc, ok := fsm.failedFiles[info.FullPath] + if !ok || fc == nil { + infoList := []*types.FileInfo{info} + fsm.failedFiles[info.FullPath] = &infoList + blog.Debugf("file: update failed files with:%v", *fsm.failedFiles[info.FullPath]) + return + } + blog.Debugf("file: update failed files with:%v", *fsm.failedFiles[info.FullPath]) + *fc = append(*fc, info) + } else { //if file send succeed or sending + if fsm.failedFiles == nil { + return //not update + } + if _, ok := fsm.failedFiles[info.FullPath]; !ok { + return //not update + } + delete(fsm.failedFiles, info.FullPath) blog.Debugf("file: update failed files with:%v", fsm.failedFiles) - return } - - blog.Debugf("file: update failed files with:%v", fsm.failedFiles) - *fc = append(*fc, info) } func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendStatus) { @@ -285,9 +301,7 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS if !ok || c == nil || len(*c) == 0 { infoList := []*types.FileInfo{info} fsm.cache[desc.FilePath] = &infoList - if status == types.FileSendFailed { - fsm.updateFailStatus(info) - } + fsm.updateFailStatus(info) return } @@ -295,17 +309,13 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS if ci.Match(desc) { ci.SendStatus = status //update failed files - if status == types.FileSendFailed { - fsm.updateFailStatus(info) - } + fsm.updateFailStatus(info) return } } *c = append(*c, info) - if status == types.FileSendFailed { - fsm.updateFailStatus(info) - } + fsm.updateFailStatus(info) } func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool { @@ -551,8 +561,8 @@ func (m *Mgr) workerCheck(ctx context.Context) { } // retry send fail file for all workers case <-fileRetryTicker.C: - blog.Debugf("remote: try to retry send fail file for work(%s)", m.work.ID()) for _, h := range m.work.Resource().GetHosts() { + blog.Debugf("remote: try to retry send fail file for work(%s) to server %s", m.work.ID(), h.Server) go m.retrySendFiles(h) } //retry failed tool chain for all workers, recover disable worker @@ -753,18 +763,22 @@ func (m *Mgr) retrySendFiles(h *dcProtocol.Host) { if len(failFiles) == 0 { return } - - blog.Debugf("remote: try to retry send fail file for work(%s) with fail files %v", m.work.ID(), failFiles) + var testStr []string + for _, r := range failFiles { + testStr = append(testStr, r.FilePath) + } + blog.Debugf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, strings.Join(testStr, ",")) if _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{ - Pid: 0, + Pid: 1, Req: failFiles, Server: h, Sandbox: &dcSyscall.Sandbox{Dir: ""}, Stats: &dcSDK.ControllerJobStats{}, }); err != nil { - blog.Errorf("mgr: send remote file failed: %v", err) + blog.Errorf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s failed: %v", m.work.ID(), 1, h.Server, err) + return } - blog.Debugf("remote: try to retry send fail file for work(%s) succeed", m.work.ID()) + blog.Debugf("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(sendMap.getFailFiles())) } // check if files send to remote worker failed and no need to send again @@ -1039,7 +1053,7 @@ func (m *Mgr) ensureFiles( _ = m.appendCorkFiles(server, needSendCorkFiles) // notify send - m.sendCorkChan <- true + //m.sendCorkChan <- true } } else { // 单个文件发送模式