Skip to content

Commit

Permalink
feat: worker选取及恢复机制优化 TencentBlueKing#311
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Nov 18, 2024
1 parent 28c8b16 commit ec99ec2
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 56 deletions.
101 changes: 63 additions & 38 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr {
conf: work.Config(),
resourceCheckTick: 5 * time.Second,
workerCheckTick: 5 * time.Second,
retryCheckTick: 3 * time.Second,
retryCheckTick: 10 * time.Second,
sendCorkTick: 10 * time.Millisecond,
corkSize: corkSize,
corkMaxSize: corkMaxSize,
Expand Down Expand Up @@ -388,7 +388,7 @@ func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool {
}
for _, ci := range *c {
if ci.Match(desc) {
return ci.SendStatus == types.FileSendFailed
return ci.SendStatus != types.FileSendSucceed
}
}
}
Expand Down Expand Up @@ -626,51 +626,73 @@ func (m *Mgr) workerCheck(ctx context.Context) {
}

func (m *Mgr) retrySendToolChains(ctx context.Context) {
ticker := time.NewTicker(m.workerCheckTick)
defer ticker.Stop()

var workerStatus sync.Map
for {
select {
case <-ctx.Done():
blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID())
return
default:
var wg sync.WaitGroup
case <-ticker.C:
handler := m.remoteWorker.Handler(0, nil, nil, nil)
hosts := m.work.Resource().GetHosts()
count := 0
wg := make(chan string, len(hosts))
for _, h := range hosts {
blog.Debugf("remote: try to retry send tool chain for work(%s) to server %s", m.work.ID(), h.Server)
fileCollections, err := m.getFailedFileCollectionByHost(h.Server)
if err != nil {
blog.Infof("remote: retry send tool chain for work(%s) to server(%s) failed: %v", m.work.ID(), h.Server, err)
workerNeedRetry := true
if v, ok := workerStatus.Load(h.Server); ok {
workerNeedRetry = v.(bool)
}
if !workerNeedRetry {
continue
}
fileCollections := m.getFailedFileCollectionByHost(h.Server)
if len(fileCollections) == 0 {
continue
}
wg.Add(1)
workerStatus.Store(h.Server, false)
count++
go m.retrySendToolChain(handler, &types.RemoteTaskExecuteRequest{
Pid: 0,
Server: h,
Sandbox: &dcSyscall.Sandbox{Dir: ""},
Stats: &dcSDK.ControllerJobStats{},
}, fileCollections, &wg)
}, fileCollections, wg)
}
wg.Wait() // 等待所有 goroutine 完成
time.Sleep(m.workerCheckTick)
go func() {
for i := 0; i < count; i++ {
host := <-wg
workerStatus.Store(host, true)
}
}()
}
}
}

func (m *Mgr) retryFailFiles(ctx context.Context) {
ticker := time.NewTicker(m.retryCheckTick)
defer ticker.Stop()

var workerStatus sync.Map
for {
select {
case <-ctx.Done():
blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID())
return
default:
var wg sync.WaitGroup
case <-ticker.C:
hosts := m.work.Resource().GetHosts()
//wg.Add(len(hosts))
wg := make(chan string, len(hosts))
count := 0
for _, h := range hosts {
blog.Debugf("remote: try to retry send fail file for work(%s) to server %s", m.work.ID(), h.Server)
workerNeedRetry := true
if v, ok := workerStatus.Load(h.Server); ok {
workerNeedRetry = v.(bool)
}
if !workerNeedRetry {
continue
}
m.failFileSendMutex.Lock()
sendMap := m.failFileSendMap[h.Server]
if sendMap == nil {
Expand All @@ -684,11 +706,16 @@ func (m *Mgr) retryFailFiles(ctx context.Context) {
if len(failFiles) == 0 {
continue
}
wg.Add(1)
go m.retrySendFiles(h, failFiles, &wg)
workerStatus.Store(h.Server, false)
count++
go m.retrySendFiles(h, failFiles, wg)
}
wg.Wait() // 等待所有 goroutine 完成
time.Sleep(m.retryCheckTick)
go func() {
for i := 0; i < count; i++ {
host := <-wg
workerStatus.Store(host, true)
}
}()
}
}
}
Expand Down Expand Up @@ -880,21 +907,21 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error)
)
}

func (m *Mgr) retrySendFiles(h *dcProtocol.Host, failFiles []dcSDK.FileDesc, wg *sync.WaitGroup) {
defer wg.Done() // 在函数结束时调用 Done

func (m *Mgr) retrySendFiles(h *dcProtocol.Host, failFiles []dcSDK.FileDesc, host chan string) {
blog.Debugf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles))
if _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{
_, err := m.SendFiles(&types.RemoteTaskSendFileRequest{
Pid: 1,
Req: failFiles,
Server: h,
Sandbox: &dcSyscall.Sandbox{Dir: ""},
Stats: &dcSDK.ControllerJobStats{},
}); err != nil {
})
if err != nil {
blog.Errorf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s failed: %v", m.work.ID(), 1, h.Server, err)
return
} else {
blog.Debugf("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles))
}
blog.Debugf("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles))
host <- h.Server
}

func (m *Mgr) ensureFilesWithPriority(
Expand Down Expand Up @@ -997,7 +1024,7 @@ func (m *Mgr) ensureFiles(

_, t, _ := rules.Satisfy(fd.File.FilePath)

blog.Debugf("remote: ensure file %s and match rule %d", fd.File.FilePath, t)
//blog.Debugf("remote: ensure file %s and match rule %d", fd.File.FilePath, t)
if f.AllDistributed {
t = dcSDK.FilterRuleHandleAllDistribution
}
Expand Down Expand Up @@ -1040,7 +1067,7 @@ func (m *Mgr) ensureFiles(
}
r = append(r, f.Targetrelativepath)

blog.Debugf("remote: debug ensure into fd.Servers")
//blog.Debugf("remote: debug ensure into fd.Servers")
for _, s := range fd.Servers {
if s == nil {
continue
Expand Down Expand Up @@ -1151,7 +1178,7 @@ func (m *Mgr) ensureFiles(
_ = m.appendCorkFiles(server, needSendCorkFiles)

// notify send
//m.sendCorkChan <- true
m.sendCorkChan <- true
}
} else {
// 单个文件发送模式
Expand Down Expand Up @@ -1441,9 +1468,7 @@ func (m *Mgr) checkBatchCache(
// checkOrLockFile 检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false
func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc) (types.FileSendStatus, bool) {
t1 := time.Now().Local()

m.fileSendMutex.Lock()

t2 := time.Now().Local()
if d1 := t2.Sub(t1); d1 > 50*time.Millisecond {
// blog.Debugf("check cache lock wait too long server(%s): %s", server, d1.String())
Expand Down Expand Up @@ -1580,13 +1605,14 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote
}

// getFailedFileCollectionByHost 返回失败文件集合
func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollectionInfo, error) {
func (m *Mgr) getFailedFileCollectionByHost(server string) []*types.FileCollectionInfo {
m.fileCollectionSendMutex.RLock()
defer m.fileCollectionSendMutex.RUnlock()

target, ok := m.fileCollectionSendMap[server]
if !ok {
return nil, fmt.Errorf("remote: no found host(%s) in file send cache", server)
blog.Infof("remote: no found host(%s) in file send cache")
return nil
}
fcs := make([]*types.FileCollectionInfo, 0)
for _, re := range *target {
Expand All @@ -1595,13 +1621,11 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect
fcs = append(fcs, re)
}
}
return fcs, nil
return fcs
}

// retry send failed tool chain
func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, fileCollections []*types.FileCollectionInfo, wg *sync.WaitGroup) {
defer wg.Done() // 在函数结束时调用 Done

func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, fileCollections []*types.FileCollectionInfo, host chan string) {
blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s)",
m.work.ID(), req.Pid, req.Server.Server)
if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections); err != nil {
Expand All @@ -1611,6 +1635,7 @@ func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.R
}
// enable worker
m.resource.EnableWorker(req.Server)
host <- req.Server.Server
blog.Infof("remote: success to retry send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server)

}
Expand Down
47 changes: 29 additions & 18 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,12 +513,16 @@ func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host)
if worker.disabled || worker.dead {
continue
}
matched := false
for _, host := range banWorkerList {
if worker.host.Equal(host) {
continue
matched = true
break
}
}

if matched {
continue
}
free := worker.totalSlots - worker.occupiedSlots
if free >= max {
max = free
Expand All @@ -542,12 +546,17 @@ func (wr *resource) getWorkerLargeFileFirst(f string, banWorkerList []*dcProtoco
if worker.disabled || worker.dead {
continue
}
matched := false
for _, host := range banWorkerList {
if worker.host.Equal(host) {
continue
matched = true
break
}
}

if matched {
continue
}
free := worker.totalSlots - worker.occupiedSlots

// 在资源空闲时,大文件优先
Expand Down Expand Up @@ -666,13 +675,14 @@ func (wr *resource) getSlot(msg lockWorkerMessage) {
if wr.occupiedSlots < wr.totalSlots || wr.totalSlots <= 0 {
set := wr.getUsageSet(usage)
if wr.isIdle(set) {
set.occupied++
wr.occupiedSlots++
blog.Infof("remote slot: total slots:%d occupied slots:%d, remote slot available",
wr.totalSlots, wr.occupiedSlots)

msg.result <- wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList)
satisfied = true
if h := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil {
set.occupied++
wr.occupiedSlots++
blog.Infof("remote slot: total slots:%d occupied slots:%d, remote slot available",
wr.totalSlots, wr.occupiedSlots)
msg.result <- h
satisfied = true
}
}
}

Expand All @@ -688,24 +698,25 @@ func (wr *resource) getSlot(msg lockWorkerMessage) {
func (wr *resource) putSlot(msg lockWorkerMessage) {
wr.freeWorkerSlots(msg.toward)
wr.occupiedSlots--
blog.Debugf("remote slot: free slot for worker %v, %v", wr.occupiedSlots, wr.totalSlots)
usage := msg.jobUsage
set := wr.getUsageSet(usage)
set.occupied--

// check whether other waiting is satisfied now
if wr.waitingList.Len() > 0 {
blog.Debugf("remote slot: free slot for worker %v, %v", wr.occupiedSlots, wr.waitingList.Len())
for e := wr.waitingList.Front(); e != nil; e = e.Next() {
msg := e.Value.(*lockWorkerMessage)
set := wr.getUsageSet(msg.jobUsage)
if wr.isIdle(set) {
set.occupied++
wr.occupiedSlots++

msg.result <- wr.occupyWorkerSlots(msg.largeFile, []*dcProtocol.Host{})

wr.waitingList.Remove(e)

break
if h := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil {
set.occupied++
wr.occupiedSlots++
msg.result <- h
wr.waitingList.Remove(e)
break
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,13 @@ type worker struct {

func (wr *worker) occupySlot() error {
wr.occupiedSlots++
blog.Debugf("worker: occupy slot for worker %s, %v, %v", wr.host.Server, wr.occupiedSlots, wr.totalSlots)
return nil
}

func (wr *worker) freeSlot() error {
wr.occupiedSlots--
blog.Debugf("worker: free slot for worker %s, %v, %v", wr.host.Server, wr.occupiedSlots, wr.totalSlots)
return nil
}

Expand Down

0 comments on commit ec99ec2

Please sign in to comment.