diff --git a/src/backend/booster/bk_dist/booster/pkg/booster.go b/src/backend/booster/bk_dist/booster/pkg/booster.go index d6492e98..4aef6524 100644 --- a/src/backend/booster/bk_dist/booster/pkg/booster.go +++ b/src/backend/booster/bk_dist/booster/pkg/booster.go @@ -24,7 +24,6 @@ import ( "time" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/env" - dcFile "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/file" dcProtocol "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/protocol" dcPump "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/pump" dcSDK "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/sdk" @@ -630,7 +629,7 @@ func (b *Booster) unregisterWork() error { return nil } -func (b *Booster) sendAdditionFile() { +/*func (b *Booster) sendAdditionFile() { if b.config.Works.Local || b.config.Works.Degraded { return } @@ -679,7 +678,7 @@ func (b *Booster) sendAdditionFile() { return } blog.Infof("booster: finish send addition files: %v", b.config.Works.AdditionFiles) -} +}*/ func (b *Booster) runWorks( ctx context.Context, diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 2c51bca2..6a0566b8 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -40,7 +40,7 @@ const ( // corkMaxSize = 1024 * 1024 * 1024 largeFileSize = 1024 * 1024 * 100 // 100MB - fileMaxFailCount = 5 + //fileMaxFailCount = 5 ) // NewMgr get a new Remote Mgr @@ -73,7 +73,8 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr { conf: work.Config(), resourceCheckTick: 5 * time.Second, workerCheckTick: 5 * time.Second, - toolChainRetryTick: 10 * time.Second, + toolChainRetryTick: 6 * time.Second, + fileRetryCheckTick: 7 * time.Second, sendCorkTick: 10 * time.Millisecond, corkSize: corkSize, corkMaxSize: corkMaxSize, @@ -116,6 +117,7 @@ type Mgr struct { resourceCheckTick time.Duration workerCheckTick time.Duration + fileRetryCheckTick time.Duration toolChainRetryTick time.Duration lastUsed uint64 // only accurate to second now lastApplied uint64 // only accurate to second now @@ -166,7 +168,7 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.F if ci.Match(desc) { //if worker is send failed before, try to send it again if ci.SendStatus == types.FileSendFailed && !query { - blog.Debugf("file: retry send file %s, fail count %d", desc.FilePath, ci.FailCount) + blog.Debugf("file: retry send file %s", desc.FilePath) ci.SendStatus = types.FileSending return ci, false } @@ -216,7 +218,7 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { fileMatched := true //if file is send failed before, try to send it again if ci.SendStatus == types.FileSendFailed { - blog.Debugf("file: retry send file %s, fail count %d", desc.FilePath, ci.FailCount) + blog.Debugf("file: retry send file %s", desc.FilePath) fileMatched = false ci.SendStatus = types.FileSending } @@ -259,7 +261,6 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS FileMode: desc.Filemode, LinkTarget: desc.LinkTarget, SendStatus: status, - FailCount: 0, } c, ok := fsm.cache[desc.FilePath] @@ -272,12 +273,6 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS for _, ci := range *c { if ci.Match(desc) { ci.SendStatus = status - if status == types.FileSendFailed { - ci.FailCount++ - } - if status == types.FileSendSucceed { - ci.FailCount = 0 - } return } } @@ -286,7 +281,7 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS return } -func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool { +/*func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool { fsm.RLock() defer fsm.RUnlock() @@ -307,8 +302,57 @@ func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool { } } + return false +}*/ + +func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool { + fsm.RLock() + defer fsm.RUnlock() + + if fsm.cache == nil { + return false + } + for _, desc := range descs { + c, ok := fsm.cache[desc.FilePath] + if !ok || c == nil || len(*c) == 0 { + continue + } + for _, ci := range *fsm.cache[desc.FilePath] { + if ci.Match(desc) { + if ci.SendStatus == types.FileSendFailed { + return true + } + } + } + } + return false } +func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc { + fsm.RLock() + defer fsm.RUnlock() + + failFiles := make([]dcSDK.FileDesc, 0) + for _, v := range fsm.cache { + for _, ci := range *v { + if ci.SendStatus == types.FileSendFailed { + failFiles = append(failFiles, dcSDK.FileDesc{ + FilePath: ci.FullPath, + Compresstype: dcProtocol.CompressLZ4, + FileSize: ci.Size, + Lastmodifytime: ci.LastModifyTime, + Md5: "", + Targetrelativepath: ci.TargetRelativePath, + Filemode: ci.FileMode, + LinkTarget: ci.LinkTarget, + NoDuplicated: true, + //Priority: dcSDK.GetPriority(ci), + }) + } + } + } + return failFiles +} // Init do the initialization for remote manager // !! only call once !! @@ -335,8 +379,9 @@ func (m *Mgr) Init() { if m.conf.AutoResourceMgr { go m.resourceCheck(ctx) } - - go m.workerCheck(ctx) + if m.work.ID() != "" { + go m.workerCheck(ctx) + } if m.conf.SendCork { m.sendCorkChan = make(chan bool, 1000) @@ -479,14 +524,19 @@ func (m *Mgr) resourceCheck(ctx context.Context) { func (m *Mgr) workerCheck(ctx context.Context) { blog.Infof("remote: run worker check tick for work: %s", m.work.ID()) ticker := time.NewTicker(m.workerCheckTick) + fileRetryTicker := time.NewTicker(m.fileRetryCheckTick) + toolChainRetryTicker := time.NewTicker(m.toolChainRetryTick) + defer ticker.Stop() + defer fileRetryTicker.Stop() + defer toolChainRetryTicker.Stop() for { select { case <-ctx.Done(): blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID()) return - + //recover dead worker case <-ticker.C: handler := m.remoteWorker.Handler(0, nil, nil, nil) for _, w := range m.resource.GetDeadWorkers() { @@ -501,7 +551,24 @@ func (m *Mgr) workerCheck(ctx context.Context) { }(w) } - + // retry send fail file for all workers + case <-fileRetryTicker.C: + blog.Debugf("remote: try to retry send fail file for work(%s)", m.work.ID()) + for _, h := range m.work.Resource().GetHosts() { + go m.retrySendFiles(h) + } + //retry failed tool chain for all workers, recover disable worker + case <-toolChainRetryTicker.C: + blog.Debugf("remote: try to retry send tool chain for work(%s)", m.work.ID()) + handler := m.remoteWorker.Handler(0, nil, nil, nil) + for _, h := range m.work.Resource().GetHosts() { + go m.retrySendToolChain(handler, &types.RemoteTaskExecuteRequest{ + Pid: 0, + Server: h, + Sandbox: &dcSyscall.Sandbox{Dir: ""}, + Stats: &dcSDK.ControllerJobStats{}, + }) + } } } } @@ -524,6 +591,22 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas defer dcSDK.StatsTimeNow(&req.Stats.RemoteWorkLeaveTime) m.work.Basic().UpdateJobStats(req.Stats) + servers := m.work.Resource().GetHosts() + m.fileSendMutex.Lock() + for _, c := range req.Req.Commands { + for server, f := range m.fileSendMap { + if f.isFilesSendFailed(c.Inputfiles) { + for _, s := range servers { + if s.Server == server { + req.BanWorkerList = append(req.BanWorkerList, s) + break + } + } + } + } + } + m.fileSendMutex.Unlock() + blog.Debugf("remote: try to execute remote task for work(%s) from pid(%d) with ban worker list %v", m.work.ID(), req.Pid, req.BanWorkerList) // 如果有超过100MB的大文件,则在选择host时,作为选择条件 fpath, _ := getMaxSizeFile(req, m.largeFileSize) req.Server = m.lockSlots(dcSDK.JobUsageRemoteExe, fpath, req.BanWorkerList) @@ -558,13 +641,13 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas m.work.ID(), req.Pid, req.Server.Server, err, req.Server.Server) m.resource.DisableWorker(req.Server) - m.retrySendToolChain(handler, req) + //m.retrySendToolChain(handler, req) return nil, err } - if m.isFilesAlreadySendFailed(req.Server.Server, req.Req.Commands) { + /*if m.isFilesAlreadySendFailed(req.Server.Server, req.Req.Commands) { return nil, fmt.Errorf("remote: no need to send files for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) - } + }*/ remoteDirs, err := m.ensureFilesWithPriority(handler, req.Pid, req.Sandbox, getFileDetailsFromExecuteRequest(req)) if err != nil { req.BanWorkerList = append(req.BanWorkerList, req.Server) @@ -640,8 +723,34 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error) ) } +func (m *Mgr) retrySendFiles(h *dcProtocol.Host) { + m.fileSendMutex.Lock() + sendMap := m.fileSendMap[h.Server] + if sendMap == nil { + m.fileSendMutex.Unlock() + return + } + m.fileSendMutex.Unlock() + failFiles := sendMap.getFailFiles() + if len(failFiles) == 0 { + return + } + + blog.Debugf("remote: try to retry send fail file for work(%s) with fail files %v", m.work.ID(), failFiles) + if _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{ + Pid: 0, + Req: failFiles, + Server: h, + Sandbox: &dcSyscall.Sandbox{Dir: ""}, + Stats: &dcSDK.ControllerJobStats{}, + }); err != nil { + blog.Errorf("mgr: send remote file failed: %v", err) + } + blog.Debugf("remote: try to retry send fail file for work(%s) succeed", m.work.ID()) +} + // check if files send to remote worker failed and no need to send again -func (m *Mgr) isFilesAlreadySendFailed(server string, commands []dcSDK.BKCommand) bool { +/*func (m *Mgr) isFilesAlreadySendFailed(server string, commands []dcSDK.BKCommand) bool { m.fileSendMutex.Lock() target, ok := m.fileSendMap[server] if !ok { @@ -656,7 +765,7 @@ func (m *Mgr) isFilesAlreadySendFailed(server string, commands []dcSDK.BKCommand } } return false -} +}*/ func (m *Mgr) ensureFilesWithPriority( handler dcSDK.RemoteWorkerHandler, @@ -768,7 +877,7 @@ func (m *Mgr) ensureFiles( if f.CompressedSize == -1 || f.FileSize == -1 { t = dcSDK.FilterRuleHandleDefault } - + blog.Debugf("remote: ensure file %s and match rule %d", fd.File.FilePath, t) servers := make([]*dcProtocol.Host, 0, 0) switch t { case dcSDK.FilterRuleHandleDefault: @@ -1307,47 +1416,32 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect return fcs, true, nil } +// retry send failed tool chain func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) { - if m.resource.CanWorkerRetry(req.Server) { - go func(handler dcSDK.RemoteWorkerHandler, req types.RemoteTaskExecuteRequest) { - for i := 0; i < toolChainRetryTimes; { - fileCollections, isNotTeminated, err := m.getFailedFileCollectionByHost(req.Server.Server) - if !isNotTeminated { - time.Sleep(m.toolChainRetryTick) - continue - } - i++ - blog.Infof("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s)", - m.work.ID(), i, req.Pid, req.Server.Server) - if err != nil || len(fileCollections) == 0 { - if err != nil { - blog.Errorf("remote: get failed file collection by host(%s) failed: %v", req.Server.Server, err) - } else { - blog.Errorf("remote: retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) - } - time.Sleep(m.toolChainRetryTick) - continue - } - - if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections); err != nil { - blog.Errorf("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s), "+ - "send tool chain files failed: %v", m.work.ID(), i, req.Pid, req.Server.Server, err) - time.Sleep(m.toolChainRetryTick) - } else { - // enable worker - m.resource.EnableWorker(req.Server) - blog.Infof("remote: success to retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) - return - } + fileCollections, isNotTeminated, err := m.getFailedFileCollectionByHost(req.Server.Server) + if err != nil { + blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) + return + } + if !isNotTeminated { + blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) all file collection in host(%s) is not finished", m.work.ID(), req.Pid, req.Server.Server) + return + } + if len(fileCollections) == 0 { + //blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: no filecollection found", m.work.ID(), req.Pid, req.Server.Server) + return + } - } - m.resource.SetWorkerStatus(req.Server, RetryFailed) - blog.Errorf("remote: already retry to send tool chain for work(%s) for %d times from pid(%d) to server(%s) failed, keep worker disable", m.work.ID(), toolChainRetryTimes, req.Pid, req.Server.Server) - }(handler, *req) - } else { - blog.Infof("remote: worker(%s) is alreay retry to send tool chain for work(%s) from pid(%d) to server(%s)", - req.Server.Server, m.work.ID(), req.Pid, req.Server.Server) + blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s)", + m.work.ID(), req.Pid, req.Server.Server) + if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections); err != nil { + blog.Errorf("remote: retry send tool chain for work(%s) from pid(%d) to server(%s), "+ + "send tool chain files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) + return } + // enable worker + m.resource.EnableWorker(req.Server) + blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) succeed", m.work.ID(), req.Pid, req.Server.Server) } diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go index 7bcbb79d..bd3b545d 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go @@ -282,7 +282,7 @@ func (wo *workerOffer) EnableWorker(host *dcProtocol.Host) { blog.Infof("remote slot: total slot:%d after enable host:%v", wo.validWorkerNum, *host) } -func (wo *workerOffer) CanWorkerRetry(host *dcProtocol.Host) bool { +/*func (wo *workerOffer) CanWorkerRetry(host *dcProtocol.Host) bool { if host == nil { return false } @@ -310,7 +310,7 @@ func (wo *workerOffer) CanWorkerRetry(host *dcProtocol.Host) bool { } return false -} +}*/ func (wo *workerOffer) SetWorkerStatus(host *dcProtocol.Host, s Status) { wo.workerLock.Lock() diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go index 5b48381e..1e309d32 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go @@ -28,7 +28,7 @@ type RemoteSlotMgr interface { RecoverDeadWorker(w *worker) DisableWorker(host *dcProtocol.Host) EnableWorker(host *dcProtocol.Host) - CanWorkerRetry(host *dcProtocol.Host) bool // check if worker can retry, if can set worker status to retrying + //CanWorkerRetry(host *dcProtocol.Host) bool // check if worker can retry, if can set worker status to retrying SetWorkerStatus(host *dcProtocol.Host, status Status) Lock(usage dcSDK.JobUsage, f string, banWorkerList []*dcProtocol.Host) *dcProtocol.Host Unlock(usage dcSDK.JobUsage, host *dcProtocol.Host) @@ -299,7 +299,7 @@ func (wr *resource) EnableWorker(host *dcProtocol.Host) { blog.Infof("remote slot: total slot:%d after enable host:%v", wr.totalSlots, *host) } -func (wr *resource) CanWorkerRetry(host *dcProtocol.Host) bool { +/*func (wr *resource) CanWorkerRetry(host *dcProtocol.Host) bool { if host == nil { return false } @@ -328,7 +328,7 @@ func (wr *resource) CanWorkerRetry(host *dcProtocol.Host) bool { } return false -} +}*/ func (wr *resource) SetWorkerStatus(host *dcProtocol.Host, s Status) { wr.workerLock.Lock() @@ -566,7 +566,7 @@ func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host) } // 大文件优先 -func (wr *resource) getWorkerLargeFileFirst(f string) *worker { +func (wr *resource) getWorkerLargeFileFirst(f string, banWorkerList []*dcProtocol.Host) *worker { var w *worker max := 0 inlargequeue := false @@ -574,6 +574,11 @@ func (wr *resource) getWorkerLargeFileFirst(f string) *worker { if worker.disabled || worker.dead { continue } + for _, host := range banWorkerList { + if worker.host.Equal(host) { + continue + } + } free := worker.totalSlots - worker.occupiedSlots @@ -618,7 +623,7 @@ func (wr *resource) occupyWorkerSlots(f string, banWorkerList []*dcProtocol.Host if f == "" { w = wr.getWorkerWithMostFreeSlots(banWorkerList) } else { - w = wr.getWorkerLargeFileFirst(f) + w = wr.getWorkerLargeFileFirst(f, banWorkerList) } if w == nil { diff --git a/src/backend/booster/bk_dist/controller/pkg/types/manager.go b/src/backend/booster/bk_dist/controller/pkg/types/manager.go index f37cb0c3..c35d4f3f 100644 --- a/src/backend/booster/bk_dist/controller/pkg/types/manager.go +++ b/src/backend/booster/bk_dist/controller/pkg/types/manager.go @@ -280,7 +280,6 @@ type FileInfo struct { FileMode uint32 `json:"file_mode"` LinkTarget string `json:"link_target"` SendStatus FileSendStatus `json:"send_status"` - FailCount int `json:"fail_count"` } // Match check if the FileDesc is point to some file as this FileInfo