Skip to content

Commit

Permalink
feat: worker选取及恢复机制优化 TencentBlueKing#311
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Nov 21, 2024
1 parent 7693da3 commit 4642326
Showing 1 changed file with 33 additions and 2 deletions.
35 changes: 33 additions & 2 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (fsm *fileSendMap) matchFail(desc dcSDK.FileDesc, query bool) (*types.FileI

c, ok := fsm.cache[desc.FilePath]
if !ok || c == nil || len(*c) == 0 {
return nil, false, fmt.Errorf("file %s not found", desc.FilePath)
return nil, false, fmt.Errorf("file %s not found, file cache is nil", desc.FilePath)
}

for _, ci := range *c {
Expand Down Expand Up @@ -635,6 +635,9 @@ func (m *Mgr) retrySendToolChains(ctx context.Context) {
blog.Infof("remote: run toolchain check for work(%s) canceled by context", m.work.ID())
return
case <-ticker.C:
if m.failFileSendMap == nil {
continue
}
handler := m.remoteWorker.Handler(0, nil, nil, nil)
hosts := m.work.Resource().GetHosts()
count := 0
Expand Down Expand Up @@ -681,6 +684,9 @@ func (m *Mgr) retryFailFiles(ctx context.Context) {
blog.Infof("remote: run failfiles check for work(%s) canceled by context", m.work.ID())
return
case <-ticker.C:
if m.failFileSendMap == nil {
continue
}
hosts := m.work.Resource().GetHosts()
wg := make(chan string, len(hosts))
count := 0
Expand Down Expand Up @@ -1506,7 +1512,7 @@ func (m *Mgr) checkOrLockSendFailFile(server string, desc dcSDK.FileDesc, query

info, match, err := target.matchFail(desc, query)
if err != nil {
return types.FileSendUnknown, false, errors.New("file not found")
return types.FileSendUnknown, false, err
}
if info == nil {
return types.FileSendUnknown, false, errors.New("file is nil")
Expand Down Expand Up @@ -1774,8 +1780,33 @@ func (m *Mgr) ensureOneFileCollection(
Servers = append(Servers, host)

fileDetails := make([]*types.FilesDetails, 0, len(fc.Files))

var failsendMap *fileSendMap
if fc.Retry {
m.failFileSendMutex.Lock()
failsendMap = m.failFileSendMap[host.Server]
if failsendMap == nil {
m.failFileSendMutex.Unlock()
blog.Errorf("remote: send file for work(%s) with no send map", m.work.ID())
return errors.New("remote: send file with no send map")
}
m.failFileSendMutex.Unlock()
}
for _, f := range fc.Files {
f.NoDuplicated = true
if fc.Retry {
for _, c := range failsendMap.cache {
for _, d := range *c {
if d.Match(f) {
f.Retry = fc.Retry
break
}
}
if f.Retry {
break
}
}
}
fileDetails = append(fileDetails, &types.FilesDetails{
Servers: Servers,
File: f,
Expand Down

0 comments on commit 4642326

Please sign in to comment.