diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index a1697d3f..a441ba24 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -72,7 +72,7 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr { conf: work.Config(), resourceCheckTick: 5 * time.Second, workerCheckTick: 5 * time.Second, - retryCheckTick: 3 * time.Second, + retryCheckTick: 10 * time.Second, sendCorkTick: 10 * time.Millisecond, corkSize: corkSize, corkMaxSize: corkMaxSize, @@ -388,7 +388,7 @@ func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool { } for _, ci := range *c { if ci.Match(desc) { - return ci.SendStatus == types.FileSendFailed + return ci.SendStatus != types.FileSendSucceed } } } @@ -626,51 +626,73 @@ func (m *Mgr) workerCheck(ctx context.Context) { } func (m *Mgr) retrySendToolChains(ctx context.Context) { + ticker := time.NewTicker(m.workerCheckTick) + defer ticker.Stop() + + var workerStatus sync.Map for { select { case <-ctx.Done(): blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID()) return - default: - var wg sync.WaitGroup + case <-ticker.C: handler := m.remoteWorker.Handler(0, nil, nil, nil) hosts := m.work.Resource().GetHosts() + count := 0 + wg := make(chan string, len(hosts)) for _, h := range hosts { - blog.Debugf("remote: try to retry send tool chain for work(%s) to server %s", m.work.ID(), h.Server) - fileCollections, err := m.getFailedFileCollectionByHost(h.Server) - if err != nil { - blog.Infof("remote: retry send tool chain for work(%s) to server(%s) failed: %v", m.work.ID(), h.Server, err) + workerNeedRetry := true + if v, ok := workerStatus.Load(h.Server); ok { + workerNeedRetry = v.(bool) + } + if !workerNeedRetry { continue } + fileCollections := m.getFailedFileCollectionByHost(h.Server) if len(fileCollections) == 0 { continue } - wg.Add(1) + workerStatus.Store(h.Server, false) + count++ go m.retrySendToolChain(handler, &types.RemoteTaskExecuteRequest{ Pid: 0, Server: h, Sandbox: &dcSyscall.Sandbox{Dir: ""}, Stats: &dcSDK.ControllerJobStats{}, - }, fileCollections, &wg) + }, fileCollections, wg) } - wg.Wait() // 等待所有 goroutine 完成 - time.Sleep(m.workerCheckTick) + go func() { + for i := 0; i < count; i++ { + host := <-wg + workerStatus.Store(host, true) + } + }() } } } func (m *Mgr) retryFailFiles(ctx context.Context) { + ticker := time.NewTicker(m.retryCheckTick) + defer ticker.Stop() + + var workerStatus sync.Map for { select { case <-ctx.Done(): blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID()) return - default: - var wg sync.WaitGroup + case <-ticker.C: hosts := m.work.Resource().GetHosts() - //wg.Add(len(hosts)) + wg := make(chan string, len(hosts)) + count := 0 for _, h := range hosts { - blog.Debugf("remote: try to retry send fail file for work(%s) to server %s", m.work.ID(), h.Server) + workerNeedRetry := true + if v, ok := workerStatus.Load(h.Server); ok { + workerNeedRetry = v.(bool) + } + if !workerNeedRetry { + continue + } m.failFileSendMutex.Lock() sendMap := m.failFileSendMap[h.Server] if sendMap == nil { @@ -684,11 +706,16 @@ func (m *Mgr) retryFailFiles(ctx context.Context) { if len(failFiles) == 0 { continue } - wg.Add(1) - go m.retrySendFiles(h, failFiles, &wg) + workerStatus.Store(h.Server, false) + count++ + go m.retrySendFiles(h, failFiles, wg) } - wg.Wait() // 等待所有 goroutine 完成 - time.Sleep(m.retryCheckTick) + go func() { + for i := 0; i < count; i++ { + host := <-wg + workerStatus.Store(host, true) + } + }() } } } @@ -880,21 +907,21 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error) ) } -func (m *Mgr) retrySendFiles(h *dcProtocol.Host, failFiles []dcSDK.FileDesc, wg *sync.WaitGroup) { - defer wg.Done() // 在函数结束时调用 Done - +func (m *Mgr) retrySendFiles(h *dcProtocol.Host, failFiles []dcSDK.FileDesc, host chan string) { blog.Debugf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) - if _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{ + _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{ Pid: 1, Req: failFiles, Server: h, Sandbox: &dcSyscall.Sandbox{Dir: ""}, Stats: &dcSDK.ControllerJobStats{}, - }); err != nil { + }) + if err != nil { blog.Errorf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s failed: %v", m.work.ID(), 1, h.Server, err) - return + } else { + blog.Debugf("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) } - blog.Debugf("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) + host <- h.Server } func (m *Mgr) ensureFilesWithPriority( @@ -997,7 +1024,7 @@ func (m *Mgr) ensureFiles( _, t, _ := rules.Satisfy(fd.File.FilePath) - blog.Debugf("remote: ensure file %s and match rule %d", fd.File.FilePath, t) + //blog.Debugf("remote: ensure file %s and match rule %d", fd.File.FilePath, t) if f.AllDistributed { t = dcSDK.FilterRuleHandleAllDistribution } @@ -1040,7 +1067,7 @@ func (m *Mgr) ensureFiles( } r = append(r, f.Targetrelativepath) - blog.Debugf("remote: debug ensure into fd.Servers") + //blog.Debugf("remote: debug ensure into fd.Servers") for _, s := range fd.Servers { if s == nil { continue @@ -1151,7 +1178,7 @@ func (m *Mgr) ensureFiles( _ = m.appendCorkFiles(server, needSendCorkFiles) // notify send - //m.sendCorkChan <- true + m.sendCorkChan <- true } } else { // 单个文件发送模式 @@ -1441,9 +1468,7 @@ func (m *Mgr) checkBatchCache( // checkOrLockFile 检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc) (types.FileSendStatus, bool) { t1 := time.Now().Local() - m.fileSendMutex.Lock() - t2 := time.Now().Local() if d1 := t2.Sub(t1); d1 > 50*time.Millisecond { // blog.Debugf("check cache lock wait too long server(%s): %s", server, d1.String()) @@ -1580,13 +1605,14 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote } // getFailedFileCollectionByHost 返回失败文件集合 -func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollectionInfo, error) { +func (m *Mgr) getFailedFileCollectionByHost(server string) []*types.FileCollectionInfo { m.fileCollectionSendMutex.RLock() defer m.fileCollectionSendMutex.RUnlock() target, ok := m.fileCollectionSendMap[server] if !ok { - return nil, fmt.Errorf("remote: no found host(%s) in file send cache", server) + blog.Infof("remote: no found host(%s) in file send cache") + return nil } fcs := make([]*types.FileCollectionInfo, 0) for _, re := range *target { @@ -1595,13 +1621,11 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect fcs = append(fcs, re) } } - return fcs, nil + return fcs } // retry send failed tool chain -func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, fileCollections []*types.FileCollectionInfo, wg *sync.WaitGroup) { - defer wg.Done() // 在函数结束时调用 Done - +func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, fileCollections []*types.FileCollectionInfo, host chan string) { blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections); err != nil { @@ -1611,6 +1635,7 @@ func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.R } // enable worker m.resource.EnableWorker(req.Server) + host <- req.Server.Server blog.Infof("remote: success to retry send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) } diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go index 6273946e..127441a8 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go @@ -513,12 +513,16 @@ func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host) if worker.disabled || worker.dead { continue } + matched := false for _, host := range banWorkerList { if worker.host.Equal(host) { - continue + matched = true + break } } - + if matched { + continue + } free := worker.totalSlots - worker.occupiedSlots if free >= max { max = free @@ -542,12 +546,17 @@ func (wr *resource) getWorkerLargeFileFirst(f string, banWorkerList []*dcProtoco if worker.disabled || worker.dead { continue } + matched := false for _, host := range banWorkerList { if worker.host.Equal(host) { - continue + matched = true + break } } + if matched { + continue + } free := worker.totalSlots - worker.occupiedSlots // 在资源空闲时,大文件优先 @@ -666,13 +675,14 @@ func (wr *resource) getSlot(msg lockWorkerMessage) { if wr.occupiedSlots < wr.totalSlots || wr.totalSlots <= 0 { set := wr.getUsageSet(usage) if wr.isIdle(set) { - set.occupied++ - wr.occupiedSlots++ - blog.Infof("remote slot: total slots:%d occupied slots:%d, remote slot available", - wr.totalSlots, wr.occupiedSlots) - - msg.result <- wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList) - satisfied = true + if h := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil { + set.occupied++ + wr.occupiedSlots++ + blog.Infof("remote slot: total slots:%d occupied slots:%d, remote slot available", + wr.totalSlots, wr.occupiedSlots) + msg.result <- h + satisfied = true + } } } @@ -688,24 +698,25 @@ func (wr *resource) getSlot(msg lockWorkerMessage) { func (wr *resource) putSlot(msg lockWorkerMessage) { wr.freeWorkerSlots(msg.toward) wr.occupiedSlots-- + blog.Debugf("remote slot: free slot for worker %v, %v", wr.occupiedSlots, wr.totalSlots) usage := msg.jobUsage set := wr.getUsageSet(usage) set.occupied-- // check whether other waiting is satisfied now if wr.waitingList.Len() > 0 { + blog.Debugf("remote slot: free slot for worker %v, %v", wr.occupiedSlots, wr.waitingList.Len()) for e := wr.waitingList.Front(); e != nil; e = e.Next() { msg := e.Value.(*lockWorkerMessage) set := wr.getUsageSet(msg.jobUsage) if wr.isIdle(set) { - set.occupied++ - wr.occupiedSlots++ - - msg.result <- wr.occupyWorkerSlots(msg.largeFile, []*dcProtocol.Host{}) - - wr.waitingList.Remove(e) - - break + if h := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil { + set.occupied++ + wr.occupiedSlots++ + msg.result <- h + wr.waitingList.Remove(e) + break + } } } } diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go index 496b612c..2523d70e 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go @@ -80,11 +80,13 @@ type worker struct { func (wr *worker) occupySlot() error { wr.occupiedSlots++ + blog.Debugf("worker: occupy slot for worker %s, %v, %v", wr.host.Server, wr.occupiedSlots, wr.totalSlots) return nil } func (wr *worker) freeSlot() error { wr.occupiedSlots-- + blog.Debugf("worker: free slot for worker %s, %v, %v", wr.host.Server, wr.occupiedSlots, wr.totalSlots) return nil }