Skip to content

Commit

Permalink
feat: worker选取及恢复机制优化 TencentBlueKing#311
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Nov 20, 2024
1 parent e25a544 commit 885fb00
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 21 deletions.
14 changes: 9 additions & 5 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr {
}

const (
syncHostTimeTimes = 3
toolChainRetryTimes = 10
syncHostTimeTimes = 3
)

// Mgr describe the remote manager
Expand Down Expand Up @@ -633,7 +632,7 @@ func (m *Mgr) retrySendToolChains(ctx context.Context) {
for {
select {
case <-ctx.Done():
blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID())
blog.Infof("remote: run toolchain check for work(%s) canceled by context", m.work.ID())
return
case <-ticker.C:
handler := m.remoteWorker.Handler(0, nil, nil, nil)
Expand Down Expand Up @@ -679,7 +678,7 @@ func (m *Mgr) retryFailFiles(ctx context.Context) {
for {
select {
case <-ctx.Done():
blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID())
blog.Infof("remote: run failfiles check for work(%s) canceled by context", m.work.ID())
return
case <-ticker.C:
hosts := m.work.Resource().GetHosts()
Expand Down Expand Up @@ -748,8 +747,9 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas
defer dcSDK.StatsTimeNow(&req.Stats.RemoteWorkLeaveTime)
m.work.Basic().UpdateJobStats(req.Stats)

hosts := m.work.Resource().GetHosts()
for _, c := range req.Req.Commands {
for _, s := range m.work.Resource().GetHosts() {
for _, s := range hosts {
m.failFileSendMutex.Lock()
f := m.failFileSendMap[s.Server]
m.failFileSendMutex.Unlock()
Expand All @@ -770,6 +770,10 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas
}
}
}
if len(req.BanWorkerList) == len(hosts) {
return nil, errors.New("no available worker, all worker are banned")
}

blog.Debugf("remote: try to execute remote task for work(%s) from pid(%d) with ban worker list %d, %v", m.work.ID(), req.Pid, len(req.BanWorkerList), req.BanWorkerList)
// 如果有超过100MB的大文件,则在选择host时,作为选择条件
fpath, _ := getMaxSizeFile(req, m.largeFileSize)
Expand Down
67 changes: 55 additions & 12 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,9 +506,10 @@ func (wr *resource) addWorker(host *dcProtocol.Host) {
return
}

func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host) *worker {
func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host) (*worker, bool) {
var w *worker
max := 0
hasAvailableWorker := false
for _, worker := range wr.worker {
if worker.disabled || worker.dead {
continue
Expand All @@ -523,6 +524,8 @@ func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host)
if matched {
continue
}

hasAvailableWorker = true
free := worker.totalSlots - worker.occupiedSlots
if free >= max {
max = free
Expand All @@ -534,14 +537,15 @@ func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host)
// w = wr.worker[0]
// }

return w
return w, hasAvailableWorker
}

// 大文件优先
func (wr *resource) getWorkerLargeFileFirst(f string, banWorkerList []*dcProtocol.Host) *worker {
func (wr *resource) getWorkerLargeFileFirst(f string, banWorkerList []*dcProtocol.Host) (*worker, bool) {
var w *worker
max := 0
inlargequeue := false
hasAvailableWorker := false
for _, worker := range wr.worker {
if worker.disabled || worker.dead {
continue
Expand All @@ -557,6 +561,8 @@ func (wr *resource) getWorkerLargeFileFirst(f string, banWorkerList []*dcProtoco
if matched {
continue
}

hasAvailableWorker = true
free := worker.totalSlots - worker.occupiedSlots

// 在资源空闲时,大文件优先
Expand All @@ -582,33 +588,34 @@ func (wr *resource) getWorkerLargeFileFirst(f string, banWorkerList []*dcProtoco

if w == nil {
// w = wr.worker[0]
return w
return w, hasAvailableWorker
}

if f != "" && !w.hasFile(f) {
w.largefiles = append(w.largefiles, f)
}

return w
return w, hasAvailableWorker
}

func (wr *resource) occupyWorkerSlots(f string, banWorkerList []*dcProtocol.Host) *dcProtocol.Host {
func (wr *resource) occupyWorkerSlots(f string, banWorkerList []*dcProtocol.Host) (*dcProtocol.Host, bool) {
wr.workerLock.Lock()
defer wr.workerLock.Unlock()

var w *worker
var hasWorkerAvailable bool
if f == "" {
w = wr.getWorkerWithMostFreeSlots(banWorkerList)
w, hasWorkerAvailable = wr.getWorkerWithMostFreeSlots(banWorkerList)
} else {
w = wr.getWorkerLargeFileFirst(f, banWorkerList)
w, hasWorkerAvailable = wr.getWorkerLargeFileFirst(f, banWorkerList)
}

if w == nil {
return nil
return nil, hasWorkerAvailable
}

_ = w.occupySlot()
return w.host
return w.host, hasWorkerAvailable
}

func (wr *resource) freeWorkerSlots(host *dcProtocol.Host) {
Expand All @@ -626,6 +633,9 @@ func (wr *resource) freeWorkerSlots(host *dcProtocol.Host) {
}

func (wr *resource) handleLock(ctx context.Context) {
ticker := time.NewTicker(time.Second * 20)

defer ticker.Stop()
wr.ctx = ctx

for {
Expand All @@ -638,6 +648,10 @@ func (wr *resource) handleLock(ctx context.Context) {
wr.getSlot(msg)
case <-wr.emptyChan:
wr.onSlotEmpty()
case <-ticker.C:
if wr.waitingList.Len() > 0 {
go wr.occupyWaitList()
}
}
}
}
Expand Down Expand Up @@ -675,7 +689,7 @@ func (wr *resource) getSlot(msg lockWorkerMessage) {
if wr.occupiedSlots < wr.totalSlots || wr.totalSlots <= 0 {
set := wr.getUsageSet(usage)
if wr.isIdle(set) {
if h := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil {
if h, _ := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil {
set.occupied++
wr.occupiedSlots++
blog.Infof("remote slot: total slots:%d occupied slots:%d, remote slot available",
Expand Down Expand Up @@ -710,12 +724,18 @@ func (wr *resource) putSlot(msg lockWorkerMessage) {
msg := e.Value.(*lockWorkerMessage)
set := wr.getUsageSet(msg.jobUsage)
if wr.isIdle(set) {
if h := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil {
if h, hasAvailableWorker := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil {
set.occupied++
wr.occupiedSlots++
msg.result <- h
wr.waitingList.Remove(e)
break
} else if !hasAvailableWorker {
msg.result <- nil
wr.waitingList.Remove(e)
blog.Debugf("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList)
} else {
blog.Debugf("remote slot: occupy waiting list, but no slot available %v", msg.banWorkerList)
}
}
}
Expand All @@ -732,3 +752,26 @@ func (wr *resource) onSlotEmpty() {
wr.waitingList.Remove(e)
}
}

func (wr *resource) occupyWaitList() {
for e := wr.waitingList.Front(); e != nil; e = e.Next() {
msg := e.Value.(*lockWorkerMessage)
set := wr.getUsageSet(msg.jobUsage)
if wr.isIdle(set) {
h, hasAvailableWorker := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList)
if h != nil {
set.occupied++
wr.occupiedSlots++
msg.result <- h
wr.waitingList.Remove(e)
blog.Debugf("remote slot: occupy waiting list")
} else if !hasAvailableWorker { // no slot available for ban worker list, turn it local
blog.Infof("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList)
msg.result <- nil
wr.waitingList.Remove(e)
} else {
blog.Debugf("remote slot: occupy waiting list, but no slot available %v", msg.banWorkerList)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,9 @@ func (r *CommonRemoteHandler) ExecuteSendFile(
// 加本地资源锁
locallocked := false
if mgr != nil {
if mgr.LockSlots(dcSDK.JobUsageLocalExe, 1) {
if mgr.LockSlots(dcSDK.JobUsageDefault, 1) {
locallocked = true
blog.Debugf("remotehandle: succeed to get one local lock")
blog.Debugf("remotehandle: succeed to get one default lock")
}
}

Expand Down Expand Up @@ -466,8 +466,8 @@ func (r *CommonRemoteHandler) ExecuteSendFile(
t1 = t2

if locallocked {
mgr.UnlockSlots(dcSDK.JobUsageLocalExe, 1)
blog.Debugf("remotehandle: succeed to release one local lock")
mgr.UnlockSlots(dcSDK.JobUsageDefault, 1)
blog.Debugf("remotehandle: succeed to release one default lock")
}

if err != nil {
Expand Down

0 comments on commit 885fb00

Please sign in to comment.