Skip to content

Commit

Permalink
Merge pull request TencentBlueKing#324 from tbs60/dev_yanafu
Browse files Browse the repository at this point in the history
bug: worker选取及恢复机制优化
  • Loading branch information
tming authored Nov 27, 2024
2 parents 9a92a3b + 3619c6f commit 18dd8b7
Showing 1 changed file with 137 additions and 74 deletions.
211 changes: 137 additions & 74 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,30 @@ func (fsm *fileSendMap) matchFails(descs []*dcSDK.FileDesc) []matchResult {
defer fsm.Unlock()

if fsm.cache == nil {
blog.Warnf("file cache not found")
return []matchResult{}
fsm.cache = make(map[string]*[]*types.FileInfo)
blog.Warnf("file: fail cache not found")
}

result := make([]matchResult, 0, len(descs))
for _, desc := range descs {
info := &types.FileInfo{
FullPath: desc.FilePath,
Size: desc.FileSize,
LastModifyTime: desc.Lastmodifytime,
Md5: desc.Md5,
TargetRelativePath: desc.Targetrelativepath,
FileMode: desc.Filemode,
LinkTarget: desc.LinkTarget,
SendStatus: types.FileSendFailed,
}
c, ok := fsm.cache[desc.FilePath]
if !ok || c == nil || len(*c) == 0 {
blog.Warnf("file %s not found", desc.FilePath)
//失败文件未找到,直接返回失败,不插入
result = append(result, matchResult{
info: info,
match: true,
})
blog.Warnf("file: fail file %s not found", desc.FilePath)
continue
}
matched := false
Expand All @@ -235,6 +250,12 @@ func (fsm *fileSendMap) matchFails(descs []*dcSDK.FileDesc) []matchResult {
if matched {
continue
}
//失败文件未找到,直接返回失败,不插入
blog.Warnf("fail: fail file %s not found", desc.FilePath)
result = append(result, matchResult{
info: info,
match: true,
})
}
return result
}
Expand Down Expand Up @@ -994,6 +1015,50 @@ func (m *Mgr) ensureFilesWithPriority(
return result, nil
}

func (m *Mgr) checkAndSendCorkFiles(fs []*corkFile, server string, wg chan error, retry bool) {
totalFileNum := len(fs)
descs := make([]*dcSDK.FileDesc, 0, totalFileNum)
for _, v := range fs {
descs = append(descs, v.file)
}
//results文件需要保证发送顺序和fs相同,否则无法使用fs对应的resultchan
results := m.checkOrLockCorkFiles(server, descs, retry)
blog.Debugf("remote: got %d results for %d cork files count:%d for work(%s) to server(%s)",
len(results), len(descs), m.work.ID(), server)
needSendCorkFiles := make([]*corkFile, 0, totalFileNum)
for i, v := range results {
if v.match {
// 已发送完成的不启动协程了
if v.info.SendStatus == types.FileSendSucceed {
wg <- nil
continue
} else if v.info.SendStatus == types.FileSendFailed {
wg <- types.ErrSendFileFailed
continue
}
} else {
// 不在缓存,意味着之前没有发送过
(fs)[i].resultchan = make(chan corkFileResult, 1)
needSendCorkFiles = append(needSendCorkFiles, (fs)[i])
}
blog.Debugf("remote: start to ensure single cork file %s:%s for work(%s) to server(%s)", results[i].info.FullPath, (fs)[i].file.FilePath, m.work.ID(), server)
// 启动协程跟踪未发送完成的文件
c := (fs)[i]
go func(err chan<- error, c *corkFile, r matchResult, i int) {
err <- m.ensureSingleCorkFile(c, r)
}(wg, c, v, i)
}

// TODO : 检查是否在server端有缓存了,如果有,则无需发送,调用 checkBatchCache

blog.Debugf("total %d cork files, need send %d files to server(%s)", totalFileNum, len(needSendCorkFiles), server)
// append to cork files queue
_ = m.appendCorkFiles(server, needSendCorkFiles)

// notify send
m.sendCorkChan <- true
}

// ensureFiles 确保提供的文件被传输到目标worker的目标目录上
// 同时结合settings.FilterRules来防止相同的文件被重复传输
// 返回一个列表, 表示文件在远程的目标目录
Expand Down Expand Up @@ -1150,45 +1215,19 @@ func (m *Mgr) ensureFiles(

for server, fs := range corkFiles {
totalFileNum := len(*fs)
descs := make([]*dcSDK.FileDesc, 0, totalFileNum)
for _, v := range *fs {
descs = append(descs, v.file)
}
results := m.checkOrLockCorkFiles(server, descs)
blog.Debugf("remote: got %d results for %d cork files count:%d for work(%s) from pid(%d) to server",
len(results), len(descs), count, m.work.ID(), pid)
needSendCorkFiles := make([]*corkFile, 0, totalFileNum)
for i, v := range results {
if v.match {
// 已发送完成的不启动协程了
if v.info.SendStatus == types.FileSendSucceed {
wg <- nil
continue
} else if v.info.SendStatus == types.FileSendFailed {
wg <- types.ErrSendFileFailed
continue
}
fsRetry := make([]*corkFile, 0, totalFileNum)
fsNew := make([]*corkFile, 0, totalFileNum)
for _, f := range *fs {
if f.file.Retry {
fsRetry = append(fsRetry, f)
} else {
// 不在缓存,意味着之前没有发送过
(*fs)[i].resultchan = make(chan corkFileResult, 1)
needSendCorkFiles = append(needSendCorkFiles, (*fs)[i])
fsNew = append(fsNew, f)
}

// 启动协程跟踪未发送完成的文件
c := (*fs)[i]
go func(err chan<- error, c *corkFile, r matchResult) {
err <- m.ensureSingleCorkFile(c, r)
}(wg, c, v)
}

// TODO : 检查是否在server端有缓存了,如果有,则无需发送,调用 checkBatchCache

blog.Debugf("total %d cork files, need send %d files", totalFileNum, len(needSendCorkFiles))
// append to cork files queue
_ = m.appendCorkFiles(server, needSendCorkFiles)

// notify send
m.sendCorkChan <- true
//批量发送重试文件
m.checkAndSendCorkFiles(fsRetry, server, wg, true)
//批量发送新文件
m.checkAndSendCorkFiles(fsNew, server, wg, false)
}
} else {
// 单个文件发送模式
Expand Down Expand Up @@ -1512,9 +1551,11 @@ func (m *Mgr) checkOrLockSendFailFile(server string, desc dcSDK.FileDesc, query

info, match, err := target.matchFail(desc, query)
if err != nil {
blog.Errorf("remote: check or lock send fail file failed: %v", err)
return types.FileSendUnknown, false, err
}
if info == nil {
blog.Errorf("remote: check or lock send fail file failed: file is nil")
return types.FileSendUnknown, false, errors.New("file is nil")
}
return info.SendStatus, match, nil
Expand All @@ -1526,26 +1567,27 @@ type matchResult struct {
}

// checkOrLockCorkFiles 批量检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false
func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []matchResult {
if len(descs) == 0 || !descs[0].Retry { // 第一次发送的文件
func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc, retry bool) []matchResult {
//批量检查首次发送文件
if !retry {
//blog.Debugf("remote: execute remote task to server(%s) for descs(%d)", server, len(newDescs))
m.fileSendMutex.Lock()
target, ok := m.fileSendMap[server]
if !ok {
target = &fileSendMap{}
m.fileSendMap[server] = target
}
m.fileSendMutex.Unlock()

return target.matchOrInserts(descs)
} else { // 失败重试的文件
m.fileSendMutex.Lock()
} else { //批量检查重试文件
//blog.Debugf("remote: execute remote task to server(%s) for retry descs(%d)", server, len(retryDescs))
m.failFileSendMutex.Lock()
target, ok := m.failFileSendMap[server]
if !ok {
target = &fileSendMap{}
m.failFileSendMap[server] = target
}
m.fileSendMutex.Unlock()

m.failFileSendMutex.Unlock()
return target.matchFails(descs)
}
}
Expand Down Expand Up @@ -1621,10 +1663,11 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) []*types.FileCollecti

target, ok := m.fileCollectionSendMap[server]
if !ok {
blog.Infof("remote: no found host(%s) in file send cache")
blog.Infof("remote: no found host(%s) in file send cache", server)
return nil
}
fcs := make([]*types.FileCollectionInfo, 0)

for _, re := range *target {
re.Retry = true
if re.SendStatus == types.FileSendFailed {
Expand Down Expand Up @@ -1692,6 +1735,42 @@ func (m *Mgr) sendFileCollectionOnce(
return nil
}

func (m *Mgr) getRetryFileDetails(server string, fc *types.FileCollectionInfo, Servers []*dcProtocol.Host) ([]*types.FilesDetails, error) {
fileDetails := make([]*types.FilesDetails, 0, len(fc.Files))
m.failFileSendMutex.Lock()
failsendMap := m.failFileSendMap[server]
if failsendMap == nil {
m.failFileSendMutex.Unlock()
err := fmt.Errorf("remote: send file for work(%s) with no send map", m.work.ID())
blog.Errorf(err.Error())
return fileDetails, err
}
m.failFileSendMutex.Unlock()

failsendMap.Lock()
defer failsendMap.Unlock()

for _, f := range fc.Files {
f.NoDuplicated = true
for _, c := range failsendMap.cache {
for _, d := range *c {
if d.Match(f) {
f.Retry = fc.Retry
break
}
}
if f.Retry {
break
}
}
fileDetails = append(fileDetails, &types.FilesDetails{
Servers: Servers,
File: f,
})
}
return fileDetails, nil
}

// ensureOneFileCollection 保证给到的第一个文件集合被正确分发到目标机器上
func (m *Mgr) ensureOneFileCollection(
handler dcSDK.RemoteWorkerHandler,
Expand Down Expand Up @@ -1781,37 +1860,21 @@ func (m *Mgr) ensureOneFileCollection(

fileDetails := make([]*types.FilesDetails, 0, len(fc.Files))

var failsendMap *fileSendMap
if fc.Retry {
m.failFileSendMutex.Lock()
failsendMap = m.failFileSendMap[host.Server]
if failsendMap == nil {
m.failFileSendMutex.Unlock()
blog.Errorf("remote: send file for work(%s) with no send map", m.work.ID())
return errors.New("remote: send file with no send map")
fileDetails, err = m.getRetryFileDetails(host.Server, fc, Servers)
if err != nil {
return err
}
m.failFileSendMutex.Unlock()
}
for _, f := range fc.Files {
f.NoDuplicated = true
if fc.Retry {
for _, c := range failsendMap.cache {
for _, d := range *c {
if d.Match(f) {
f.Retry = fc.Retry
break
}
}
if f.Retry {
break
}
}
} else {
for _, f := range fc.Files {
f.NoDuplicated = true
fileDetails = append(fileDetails, &types.FilesDetails{
Servers: Servers,
File: f,
})
}
fileDetails = append(fileDetails, &types.FilesDetails{
Servers: Servers,
File: f,
})
}

_, err = m.ensureFilesWithPriority(handler, pid, sandbox, fileDetails)
defer func() {
status := types.FileSendSucceed
Expand Down

0 comments on commit 18dd8b7

Please sign in to comment.