Skip to content

Commit

Permalink
feat: worker选取及恢复机制优化 TencentBlueKing#311
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Nov 5, 2024
1 parent 293515c commit b9bd1d1
Showing 1 changed file with 60 additions and 28 deletions.
88 changes: 60 additions & 28 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr {
conf: work.Config(),
resourceCheckTick: 5 * time.Second,
workerCheckTick: 5 * time.Second,
toolChainRetryTick: 6 * time.Second,
fileRetryCheckTick: 7 * time.Second,
retryCheckTick: 3 * time.Second,
sendCorkTick: 10 * time.Millisecond,
corkSize: corkSize,
corkMaxSize: corkMaxSize,
Expand Down Expand Up @@ -117,13 +116,12 @@ type Mgr struct {

conf *config.ServerConfig

resourceCheckTick time.Duration
workerCheckTick time.Duration
fileRetryCheckTick time.Duration
toolChainRetryTick time.Duration
lastUsed uint64 // only accurate to second now
lastApplied uint64 // only accurate to second now
remotejobs int64 // save job number which using remote worker
resourceCheckTick time.Duration
workerCheckTick time.Duration
retryCheckTick time.Duration
lastUsed uint64 // only accurate to second now
lastApplied uint64 // only accurate to second now
remotejobs int64 // save job number which using remote worker

sendCorkTick time.Duration
sendCorkChan chan bool
Expand Down Expand Up @@ -299,7 +297,7 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult {
return result
}

func (fsm *fileSendMap) updateFailStatus(desc dcSDK.FileDesc, status types.FileSendStatus, server string) {
func (fsm *fileSendMap) updateFailStatus(desc dcSDK.FileDesc, status types.FileSendStatus) {
if status == types.FileSendSucceed && !desc.Retry {
return
}
Expand Down Expand Up @@ -404,7 +402,13 @@ func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc {

failFiles := make([]dcSDK.FileDesc, 0)
for _, v := range fsm.cache {
if v == nil || len(*v) == 0 {
continue
}
for _, ci := range *v {
if ci.SendStatus != types.FileSendFailed {
continue
}
failFiles = append(failFiles, dcSDK.FileDesc{
FilePath: ci.FullPath,
Compresstype: dcProtocol.CompressLZ4,
Expand Down Expand Up @@ -449,6 +453,8 @@ func (m *Mgr) Init() {
}
if m.work.ID() != "" {
go m.workerCheck(ctx)
go m.retryFailFiles(ctx)
go m.retrySendToolChains(ctx)
}

if m.conf.SendCork {
Expand Down Expand Up @@ -592,12 +598,8 @@ func (m *Mgr) resourceCheck(ctx context.Context) {
func (m *Mgr) workerCheck(ctx context.Context) {
blog.Infof("remote: run worker check tick for work: %s", m.work.ID())
ticker := time.NewTicker(m.workerCheckTick)
fileRetryTicker := time.NewTicker(m.fileRetryCheckTick)
toolChainRetryTicker := time.NewTicker(m.toolChainRetryTick)

defer ticker.Stop()
defer fileRetryTicker.Stop()
defer toolChainRetryTicker.Stop()

for {
select {
Expand All @@ -619,24 +621,51 @@ func (m *Mgr) workerCheck(ctx context.Context) {
}(w)

}
// retry send fail file for all workers
case <-fileRetryTicker.C:
for _, h := range m.work.Resource().GetHosts() {
blog.Debugf("remote: try to retry send fail file for work(%s) to server %s", m.work.ID(), h.Server)
go m.retrySendFiles(h)
}
//retry failed tool chain for all workers, recover disable worker
case <-toolChainRetryTicker.C:
blog.Debugf("remote: try to retry send tool chain for work(%s)", m.work.ID())
}
}
}

func (m *Mgr) retrySendToolChains(ctx context.Context) {
for {
select {
case <-ctx.Done():
blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID())
return
default:
var wg sync.WaitGroup
handler := m.remoteWorker.Handler(0, nil, nil, nil)
for _, h := range m.work.Resource().GetHosts() {
hosts := m.work.Resource().GetHosts()
wg.Add(len(hosts))
for _, h := range hosts {
blog.Debugf("remote: try to retry send tool chain for work(%s) to server %s", m.work.ID(), h.Server)
go m.retrySendToolChain(handler, &types.RemoteTaskExecuteRequest{
Pid: 0,
Server: h,
Sandbox: &dcSyscall.Sandbox{Dir: ""},
Stats: &dcSDK.ControllerJobStats{},
})
}, &wg)
}
wg.Wait() // 等待所有 goroutine 完成
time.Sleep(m.workerCheckTick)
}
}
}

func (m *Mgr) retryFailFiles(ctx context.Context) {
for {
select {
case <-ctx.Done():
blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID())
default:
var wg sync.WaitGroup
hosts := m.work.Resource().GetHosts()
wg.Add(len(hosts))
for _, h := range hosts {
blog.Debugf("remote: try to retry send fail file for work(%s) to server %s", m.work.ID(), h.Server)
go m.retrySendFiles(h, &wg)
}
wg.Wait() // 等待所有 goroutine 完成
time.Sleep(m.retryCheckTick)
}
}
}
Expand Down Expand Up @@ -810,7 +839,9 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error)
)
}

func (m *Mgr) retrySendFiles(h *dcProtocol.Host) {
func (m *Mgr) retrySendFiles(h *dcProtocol.Host, wg *sync.WaitGroup) {
defer wg.Done() // 在函数结束时调用 Done

m.failFileSendMutex.Lock()
sendMap := m.failFileSendMap[h.Server]
if sendMap == nil {
Expand Down Expand Up @@ -1474,7 +1505,7 @@ func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.Fi
}
m.failFileSendMutex.Unlock()

failTarget.updateFailStatus(desc, status, server)
failTarget.updateFailStatus(desc, status)
}

func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) error {
Expand Down Expand Up @@ -1540,7 +1571,8 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect
}

// retry send failed tool chain
func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) {
func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, wg *sync.WaitGroup) {
defer wg.Done() // 在函数结束时调用 Done
fileCollections, err := m.getFailedFileCollectionByHost(req.Server.Server)
if err != nil {
blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: %v", m.work.ID(), req.Pid, req.Server.Server, err)
Expand Down

0 comments on commit b9bd1d1

Please sign in to comment.