diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 6b681f4b..a3a391a4 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -72,8 +72,7 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr { conf: work.Config(), resourceCheckTick: 5 * time.Second, workerCheckTick: 5 * time.Second, - toolChainRetryTick: 6 * time.Second, - fileRetryCheckTick: 7 * time.Second, + retryCheckTick: 3 * time.Second, sendCorkTick: 10 * time.Millisecond, corkSize: corkSize, corkMaxSize: corkMaxSize, @@ -117,13 +116,12 @@ type Mgr struct { conf *config.ServerConfig - resourceCheckTick time.Duration - workerCheckTick time.Duration - fileRetryCheckTick time.Duration - toolChainRetryTick time.Duration - lastUsed uint64 // only accurate to second now - lastApplied uint64 // only accurate to second now - remotejobs int64 // save job number which using remote worker + resourceCheckTick time.Duration + workerCheckTick time.Duration + retryCheckTick time.Duration + lastUsed uint64 // only accurate to second now + lastApplied uint64 // only accurate to second now + remotejobs int64 // save job number which using remote worker sendCorkTick time.Duration sendCorkChan chan bool @@ -299,7 +297,7 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { return result } -func (fsm *fileSendMap) updateFailStatus(desc dcSDK.FileDesc, status types.FileSendStatus, server string) { +func (fsm *fileSendMap) updateFailStatus(desc dcSDK.FileDesc, status types.FileSendStatus) { if status == types.FileSendSucceed && !desc.Retry { return } @@ -404,7 +402,13 @@ func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc { failFiles := make([]dcSDK.FileDesc, 0) for _, v := range fsm.cache { + if v == nil || len(*v) == 0 { + continue + } for _, ci := range *v { + if ci.SendStatus != types.FileSendFailed { + continue + } failFiles = append(failFiles, dcSDK.FileDesc{ FilePath: ci.FullPath, Compresstype: dcProtocol.CompressLZ4, @@ -449,6 +453,8 @@ func (m *Mgr) Init() { } if m.work.ID() != "" { go m.workerCheck(ctx) + go m.retryFailFiles(ctx) + go m.retrySendToolChains(ctx) } if m.conf.SendCork { @@ -592,12 +598,8 @@ func (m *Mgr) resourceCheck(ctx context.Context) { func (m *Mgr) workerCheck(ctx context.Context) { blog.Infof("remote: run worker check tick for work: %s", m.work.ID()) ticker := time.NewTicker(m.workerCheckTick) - fileRetryTicker := time.NewTicker(m.fileRetryCheckTick) - toolChainRetryTicker := time.NewTicker(m.toolChainRetryTick) defer ticker.Stop() - defer fileRetryTicker.Stop() - defer toolChainRetryTicker.Stop() for { select { @@ -619,24 +621,51 @@ func (m *Mgr) workerCheck(ctx context.Context) { }(w) } - // retry send fail file for all workers - case <-fileRetryTicker.C: - for _, h := range m.work.Resource().GetHosts() { - blog.Debugf("remote: try to retry send fail file for work(%s) to server %s", m.work.ID(), h.Server) - go m.retrySendFiles(h) - } - //retry failed tool chain for all workers, recover disable worker - case <-toolChainRetryTicker.C: - blog.Debugf("remote: try to retry send tool chain for work(%s)", m.work.ID()) + } + } +} + +func (m *Mgr) retrySendToolChains(ctx context.Context) { + for { + select { + case <-ctx.Done(): + blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID()) + return + default: + var wg sync.WaitGroup handler := m.remoteWorker.Handler(0, nil, nil, nil) - for _, h := range m.work.Resource().GetHosts() { + hosts := m.work.Resource().GetHosts() + wg.Add(len(hosts)) + for _, h := range hosts { + blog.Debugf("remote: try to retry send tool chain for work(%s) to server %s", m.work.ID(), h.Server) go m.retrySendToolChain(handler, &types.RemoteTaskExecuteRequest{ Pid: 0, Server: h, Sandbox: &dcSyscall.Sandbox{Dir: ""}, Stats: &dcSDK.ControllerJobStats{}, - }) + }, &wg) + } + wg.Wait() // 等待所有 goroutine 完成 + time.Sleep(m.workerCheckTick) + } + } +} + +func (m *Mgr) retryFailFiles(ctx context.Context) { + for { + select { + case <-ctx.Done(): + blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID()) + default: + var wg sync.WaitGroup + hosts := m.work.Resource().GetHosts() + wg.Add(len(hosts)) + for _, h := range hosts { + blog.Debugf("remote: try to retry send fail file for work(%s) to server %s", m.work.ID(), h.Server) + go m.retrySendFiles(h, &wg) } + wg.Wait() // 等待所有 goroutine 完成 + time.Sleep(m.retryCheckTick) } } } @@ -810,7 +839,9 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error) ) } -func (m *Mgr) retrySendFiles(h *dcProtocol.Host) { +func (m *Mgr) retrySendFiles(h *dcProtocol.Host, wg *sync.WaitGroup) { + defer wg.Done() // 在函数结束时调用 Done + m.failFileSendMutex.Lock() sendMap := m.failFileSendMap[h.Server] if sendMap == nil { @@ -1474,7 +1505,7 @@ func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.Fi } m.failFileSendMutex.Unlock() - failTarget.updateFailStatus(desc, status, server) + failTarget.updateFailStatus(desc, status) } func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) error { @@ -1540,7 +1571,8 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect } // retry send failed tool chain -func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) { +func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, wg *sync.WaitGroup) { + defer wg.Done() // 在函数结束时调用 Done fileCollections, err := m.getFailedFileCollectionByHost(req.Server.Server) if err != nil { blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: %v", m.work.ID(), req.Pid, req.Server.Server, err)