Skip to content

Commit

Permalink
feat: worker选取及恢复机制优化 TencentBlueKing#311
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Nov 27, 2024
1 parent 3e00c6d commit 3619c6f
Showing 1 changed file with 141 additions and 92 deletions.
233 changes: 141 additions & 92 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,30 @@ func (fsm *fileSendMap) matchFails(descs []*dcSDK.FileDesc) []matchResult {
defer fsm.Unlock()

if fsm.cache == nil {
blog.Warnf("file cache not found")
return []matchResult{}
fsm.cache = make(map[string]*[]*types.FileInfo)
blog.Warnf("file: fail cache not found")
}

result := make([]matchResult, 0, len(descs))
for _, desc := range descs {
info := &types.FileInfo{
FullPath: desc.FilePath,
Size: desc.FileSize,
LastModifyTime: desc.Lastmodifytime,
Md5: desc.Md5,
TargetRelativePath: desc.Targetrelativepath,
FileMode: desc.Filemode,
LinkTarget: desc.LinkTarget,
SendStatus: types.FileSendFailed,
}
c, ok := fsm.cache[desc.FilePath]
if !ok || c == nil || len(*c) == 0 {
blog.Warnf("file %s not found", desc.FilePath)
//失败文件未找到,直接返回失败,不插入
result = append(result, matchResult{
info: info,
match: true,
})
blog.Warnf("file: fail file %s not found", desc.FilePath)
continue
}
matched := false
Expand All @@ -235,6 +250,12 @@ func (fsm *fileSendMap) matchFails(descs []*dcSDK.FileDesc) []matchResult {
if matched {
continue
}
//失败文件未找到,直接返回失败,不插入
blog.Warnf("fail: fail file %s not found", desc.FilePath)
result = append(result, matchResult{
info: info,
match: true,
})
}
return result
}
Expand Down Expand Up @@ -994,6 +1015,50 @@ func (m *Mgr) ensureFilesWithPriority(
return result, nil
}

func (m *Mgr) checkAndSendCorkFiles(fs []*corkFile, server string, wg chan error, retry bool) {
totalFileNum := len(fs)
descs := make([]*dcSDK.FileDesc, 0, totalFileNum)
for _, v := range fs {
descs = append(descs, v.file)
}
//results文件需要保证发送顺序和fs相同,否则无法使用fs对应的resultchan
results := m.checkOrLockCorkFiles(server, descs, retry)
blog.Debugf("remote: got %d results for %d cork files count:%d for work(%s) to server(%s)",
len(results), len(descs), m.work.ID(), server)
needSendCorkFiles := make([]*corkFile, 0, totalFileNum)
for i, v := range results {
if v.match {
// 已发送完成的不启动协程了
if v.info.SendStatus == types.FileSendSucceed {
wg <- nil
continue
} else if v.info.SendStatus == types.FileSendFailed {
wg <- types.ErrSendFileFailed
continue
}
} else {
// 不在缓存,意味着之前没有发送过
(fs)[i].resultchan = make(chan corkFileResult, 1)
needSendCorkFiles = append(needSendCorkFiles, (fs)[i])
}
blog.Debugf("remote: start to ensure single cork file %s:%s for work(%s) to server(%s)", results[i].info.FullPath, (fs)[i].file.FilePath, m.work.ID(), server)
// 启动协程跟踪未发送完成的文件
c := (fs)[i]
go func(err chan<- error, c *corkFile, r matchResult, i int) {
err <- m.ensureSingleCorkFile(c, r)
}(wg, c, v, i)
}

// TODO : 检查是否在server端有缓存了,如果有,则无需发送,调用 checkBatchCache

blog.Debugf("total %d cork files, need send %d files to server(%s)", totalFileNum, len(needSendCorkFiles), server)
// append to cork files queue
_ = m.appendCorkFiles(server, needSendCorkFiles)

// notify send
m.sendCorkChan <- true
}

// ensureFiles 确保提供的文件被传输到目标worker的目标目录上
// 同时结合settings.FilterRules来防止相同的文件被重复传输
// 返回一个列表, 表示文件在远程的目标目录
Expand Down Expand Up @@ -1150,45 +1215,19 @@ func (m *Mgr) ensureFiles(

for server, fs := range corkFiles {
totalFileNum := len(*fs)
descs := make([]*dcSDK.FileDesc, 0, totalFileNum)
for _, v := range *fs {
descs = append(descs, v.file)
}
results := m.checkOrLockCorkFiles(server, descs)
blog.Debugf("remote: got %d results for %d cork files count:%d for work(%s) from pid(%d) to server",
len(results), len(descs), count, m.work.ID(), pid)
needSendCorkFiles := make([]*corkFile, 0, totalFileNum)
for i, v := range results {
if v.match {
// 已发送完成的不启动协程了
if v.info.SendStatus == types.FileSendSucceed {
wg <- nil
continue
} else if v.info.SendStatus == types.FileSendFailed {
wg <- types.ErrSendFileFailed
continue
}
fsRetry := make([]*corkFile, 0, totalFileNum)
fsNew := make([]*corkFile, 0, totalFileNum)
for _, f := range *fs {
if f.file.Retry {
fsRetry = append(fsRetry, f)
} else {
// 不在缓存,意味着之前没有发送过
(*fs)[i].resultchan = make(chan corkFileResult, 1)
needSendCorkFiles = append(needSendCorkFiles, (*fs)[i])
fsNew = append(fsNew, f)
}

// 启动协程跟踪未发送完成的文件
c := (*fs)[i]
go func(err chan<- error, c *corkFile, r matchResult) {
err <- m.ensureSingleCorkFile(c, r)
}(wg, c, v)
}

// TODO : 检查是否在server端有缓存了,如果有,则无需发送,调用 checkBatchCache

blog.Debugf("total %d cork files, need send %d files", totalFileNum, len(needSendCorkFiles))
// append to cork files queue
_ = m.appendCorkFiles(server, needSendCorkFiles)

// notify send
m.sendCorkChan <- true
//批量发送重试文件
m.checkAndSendCorkFiles(fsRetry, server, wg, true)
//批量发送新文件
m.checkAndSendCorkFiles(fsNew, server, wg, false)
}
} else {
// 单个文件发送模式
Expand Down Expand Up @@ -1512,9 +1551,11 @@ func (m *Mgr) checkOrLockSendFailFile(server string, desc dcSDK.FileDesc, query

info, match, err := target.matchFail(desc, query)
if err != nil {
blog.Errorf("remote: check or lock send fail file failed: %v", err)
return types.FileSendUnknown, false, err
}
if info == nil {
blog.Errorf("remote: check or lock send fail file failed: file is nil")
return types.FileSendUnknown, false, errors.New("file is nil")
}
return info.SendStatus, match, nil
Expand All @@ -1526,41 +1567,29 @@ type matchResult struct {
}

// checkOrLockCorkFiles 批量检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false
func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []matchResult {
var result []matchResult
var retryDescs, newDescs []*dcSDK.FileDesc
for _, desc := range descs {
if desc.Retry {
retryDescs = append(retryDescs, desc)
} else {
newDescs = append(newDescs, desc)
}
}
//批量检查重试文件
if len(retryDescs) > 0 {
m.fileSendMutex.Lock()
target, ok := m.failFileSendMap[server]
if !ok {
target = &fileSendMap{}
m.failFileSendMap[server] = target
}
m.fileSendMutex.Unlock()

result = append(result, target.matchFails(descs)...)
}
func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc, retry bool) []matchResult {
//批量检查首次发送文件
if len(newDescs) > 0 {
if !retry {
//blog.Debugf("remote: execute remote task to server(%s) for descs(%d)", server, len(newDescs))
m.fileSendMutex.Lock()
target, ok := m.fileSendMap[server]
if !ok {
target = &fileSendMap{}
m.fileSendMap[server] = target
}
m.fileSendMutex.Unlock()

result = append(result, target.matchOrInserts(descs)...)
return target.matchOrInserts(descs)
} else { //批量检查重试文件
//blog.Debugf("remote: execute remote task to server(%s) for retry descs(%d)", server, len(retryDescs))
m.failFileSendMutex.Lock()
target, ok := m.failFileSendMap[server]
if !ok {
target = &fileSendMap{}
m.failFileSendMap[server] = target
}
m.failFileSendMutex.Unlock()
return target.matchFails(descs)
}
return result
}

func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.FileSendStatus) {
Expand Down Expand Up @@ -1638,6 +1667,7 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) []*types.FileCollecti
return nil
}
fcs := make([]*types.FileCollectionInfo, 0)

for _, re := range *target {
re.Retry = true
if re.SendStatus == types.FileSendFailed {
Expand Down Expand Up @@ -1705,6 +1735,42 @@ func (m *Mgr) sendFileCollectionOnce(
return nil
}

func (m *Mgr) getRetryFileDetails(server string, fc *types.FileCollectionInfo, Servers []*dcProtocol.Host) ([]*types.FilesDetails, error) {
fileDetails := make([]*types.FilesDetails, 0, len(fc.Files))
m.failFileSendMutex.Lock()
failsendMap := m.failFileSendMap[server]
if failsendMap == nil {
m.failFileSendMutex.Unlock()
err := fmt.Errorf("remote: send file for work(%s) with no send map", m.work.ID())
blog.Errorf(err.Error())
return fileDetails, err
}
m.failFileSendMutex.Unlock()

failsendMap.Lock()
defer failsendMap.Unlock()

for _, f := range fc.Files {
f.NoDuplicated = true
for _, c := range failsendMap.cache {
for _, d := range *c {
if d.Match(f) {
f.Retry = fc.Retry
break
}
}
if f.Retry {
break
}
}
fileDetails = append(fileDetails, &types.FilesDetails{
Servers: Servers,
File: f,
})
}
return fileDetails, nil
}

// ensureOneFileCollection 保证给到的第一个文件集合被正确分发到目标机器上
func (m *Mgr) ensureOneFileCollection(
handler dcSDK.RemoteWorkerHandler,
Expand Down Expand Up @@ -1794,38 +1860,21 @@ func (m *Mgr) ensureOneFileCollection(

fileDetails := make([]*types.FilesDetails, 0, len(fc.Files))

var failsendMap *fileSendMap
if fc.Retry {
m.failFileSendMutex.Lock()
failsendMap = m.failFileSendMap[host.Server]
if failsendMap == nil {
m.failFileSendMutex.Unlock()
err = fmt.Errorf("remote: send file for work(%s) with no send map", m.work.ID())
blog.Errorf(err.Error())
fileDetails, err = m.getRetryFileDetails(host.Server, fc, Servers)
if err != nil {
return err
}
m.failFileSendMutex.Unlock()
}
for _, f := range fc.Files {
f.NoDuplicated = true
if fc.Retry {
for _, c := range failsendMap.cache {
for _, d := range *c {
if d.Match(f) {
f.Retry = fc.Retry
break
}
}
if f.Retry {
break
}
}
} else {
for _, f := range fc.Files {
f.NoDuplicated = true
fileDetails = append(fileDetails, &types.FilesDetails{
Servers: Servers,
File: f,
})
}
fileDetails = append(fileDetails, &types.FilesDetails{
Servers: Servers,
File: f,
})
}

_, err = m.ensureFilesWithPriority(handler, pid, sandbox, fileDetails)
defer func() {
status := types.FileSendSucceed
Expand Down

0 comments on commit 3619c6f

Please sign in to comment.