diff --git a/src/backend/booster/bk_dist/common/sdk/worker.go b/src/backend/booster/bk_dist/common/sdk/worker.go index 6c0c73c5..15ef9fb8 100644 --- a/src/backend/booster/bk_dist/common/sdk/worker.go +++ b/src/backend/booster/bk_dist/common/sdk/worker.go @@ -96,6 +96,7 @@ type FileDesc struct { NoDuplicated bool `json:"no_duplicated"` AllDistributed bool `json:"all_distributed"` Priority FileDescPriority `json:"priority"` + Retry bool `json:"retry"` } // UniqueKey define the file unique key diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 4432321f..cf4535a8 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -68,6 +68,7 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr { ), checkSendFileTick: 100 * time.Millisecond, fileSendMap: make(map[string]*fileSendMap), + failFileSendMap: make(map[string]*fileSendMap), fileCollectionSendMap: make(map[string]*[]*types.FileCollectionInfo), fileMessageBank: newFileMessageBank(), conf: work.Config(), @@ -106,6 +107,9 @@ type Mgr struct { fileSendMutex sync.RWMutex fileSendMap map[string]*fileSendMap + failFileSendMutex sync.RWMutex + failFileSendMap map[string]*fileSendMap + fileCollectionSendMutex sync.RWMutex fileCollectionSendMap map[string]*[]*types.FileCollectionInfo @@ -135,11 +139,10 @@ type Mgr struct { type fileSendMap struct { sync.RWMutex - cache map[string]*[]*types.FileInfo - failedFiles map[string]*[]*types.FileInfo + cache map[string]*[]*types.FileInfo } -func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.FileInfo, bool) { +func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc) (*types.FileInfo, bool) { fsm.Lock() defer fsm.Unlock() @@ -162,28 +165,85 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.F if !ok || c == nil || len(*c) == 0 { infoList := []*types.FileInfo{info} fsm.cache[desc.FilePath] = &infoList - fsm.updateFailStatus(info) return info, false } for _, ci := range *c { if ci.Match(desc) { - //if worker is send failed before, try to send it again - if ci.SendStatus == types.FileSendFailed && !query { - blog.Debugf("file: retry send file %s", desc.FilePath) - ci.SendStatus = types.FileSending - fsm.updateFailStatus(ci) - return ci, false - } return ci, true } } *c = append(*c, info) - fsm.updateFailStatus(info) return info, false } +// 仅匹配失败文件,不执行插入 +func (fsm *fileSendMap) matchFail(desc dcSDK.FileDesc, query bool) (*types.FileInfo, bool, error) { + fsm.Lock() + defer fsm.Unlock() + + if fsm.cache == nil { + return nil, false, errors.New("file cache not found") + } + + c, ok := fsm.cache[desc.FilePath] + if !ok || c == nil || len(*c) == 0 { + return nil, false, fmt.Errorf("file %s not found", desc.FilePath) + } + + for _, ci := range *c { + if ci.Match(desc) { + if ci.SendStatus == types.FileSendFailed && !query { + ci.SendStatus = types.FileSending + return ci, false, nil + } + return ci, true, nil + } + } + return nil, false, fmt.Errorf("file %s not found", desc.FilePath) +} + +// 仅匹配失败文件,不执行插入 +func (fsm *fileSendMap) matchFails(descs []*dcSDK.FileDesc) []matchResult { + fsm.Lock() + defer fsm.Unlock() + + if fsm.cache == nil { + blog.Warnf("file cache not found") + return []matchResult{} + } + + result := make([]matchResult, 0, len(descs)) + for _, desc := range descs { + c, ok := fsm.cache[desc.FilePath] + if !ok || c == nil || len(*c) == 0 { + blog.Warnf("file %s not found", desc.FilePath) + continue + } + matched := false + for _, ci := range *c { + if ci.Match(*desc) { + fileMatched := true + if ci.SendStatus == types.FileSendFailed { + ci.SendStatus = types.FileSending + fileMatched = false + } + result = append(result, matchResult{ + info: ci, + match: fileMatched, + }) + matched = true + break + } + } + if matched { + continue + } + } + return result +} + func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { fsm.Lock() defer fsm.Unlock() @@ -213,24 +273,15 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { info: info, match: false, }) - fsm.updateFailStatus(info) continue } matched := false for _, ci := range *c { if ci.Match(*desc) { - fileMatched := true - //if file is send failed before, try to send it again - if ci.SendStatus == types.FileSendFailed { - blog.Debugf("file: retry send file %s", desc.FilePath) - fileMatched = false - ci.SendStatus = types.FileSending - fsm.updateFailStatus(ci) - } result = append(result, matchResult{ info: ci, - match: fileMatched, + match: true, }) matched = true break @@ -245,40 +296,53 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { info: info, match: false, }) - fsm.updateFailStatus(info) } return result } -func (fsm *fileSendMap) updateFailStatus(info *types.FileInfo) { - //blog.Debugf("file: update failed files in status %s with:%v", info.SendStatus, fsm.failedFiles) - if info.SendStatus == types.FileSendFailed { //if file send failed - if fsm.failedFiles == nil { - fsm.failedFiles = make(map[string]*[]*types.FileInfo) - } - fc, ok := fsm.failedFiles[info.FullPath] - if !ok || fc == nil { - infoList := []*types.FileInfo{info} - fsm.failedFiles[info.FullPath] = &infoList - blog.Debugf("file: update failed files with:%v", *fsm.failedFiles[info.FullPath]) +func (fsm *fileSendMap) updateFailStatus(desc dcSDK.FileDesc, status types.FileSendStatus, server string) { + if status == types.FileSendSucceed && !desc.Retry { + return + } + fsm.Lock() + defer fsm.Unlock() + + info := &types.FileInfo{ + FullPath: desc.FilePath, + Size: desc.FileSize, + LastModifyTime: desc.Lastmodifytime, + Md5: desc.Md5, + TargetRelativePath: desc.Targetrelativepath, + FileMode: desc.Filemode, + LinkTarget: desc.LinkTarget, + SendStatus: status, + } + if fsm.cache == nil { + fsm.cache = make(map[string]*[]*types.FileInfo) + } + fc, ok := fsm.cache[info.FullPath] + if !ok || fc == nil || len(*fc) == 0 { + infoList := []*types.FileInfo{info} + fsm.cache[info.FullPath] = &infoList + blog.Debugf("file: update failed files with add:%v", info) + return + } + for _, ci := range *fc { + if ci.Match(desc) { + blog.Debugf("file: update failed files with refresh before:%v", ci) + ci.SendStatus = status + blog.Debugf("file: update failed files with refresh:%v", ci) return } - blog.Debugf("file: update failed files with:%v", *fsm.failedFiles[info.FullPath]) - *fc = append(*fc, info) - } else { //if file send succeed or sending - if fsm.failedFiles == nil { - return //not update - } - if _, ok := fsm.failedFiles[info.FullPath]; !ok { - return //not update - } - delete(fsm.failedFiles, info.FullPath) - blog.Debugf("file: update failed files with:%v", fsm.failedFiles) } + *fc = append(*fc, info) } func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendStatus) { + if status != types.FileSendSucceed && desc.Retry { + return + } fsm.Lock() defer fsm.Unlock() @@ -301,50 +365,47 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS if !ok || c == nil || len(*c) == 0 { infoList := []*types.FileInfo{info} fsm.cache[desc.FilePath] = &infoList - fsm.updateFailStatus(info) return } for _, ci := range *c { if ci.Match(desc) { ci.SendStatus = status - //update failed files - fsm.updateFailStatus(info) return } } *c = append(*c, info) - fsm.updateFailStatus(info) } func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool { fsm.RLock() defer fsm.RUnlock() - if fsm.failedFiles == nil { + if fsm.cache == nil { return false } for _, desc := range descs { - c, ok := fsm.failedFiles[desc.FilePath] + c, ok := fsm.cache[desc.FilePath] if !ok || c == nil || len(*c) == 0 { continue } for _, ci := range *c { if ci.Match(desc) { - return true + return ci.SendStatus == types.FileSendFailed } } } return false } + func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc { fsm.RLock() defer fsm.RUnlock() failFiles := make([]dcSDK.FileDesc, 0) - for _, v := range fsm.failedFiles { + for _, v := range fsm.cache { for _, ci := range *v { failFiles = append(failFiles, dcSDK.FileDesc{ FilePath: ci.FullPath, @@ -356,6 +417,7 @@ func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc { Filemode: ci.FileMode, LinkTarget: ci.LinkTarget, NoDuplicated: true, + Retry: true, }) } } @@ -611,9 +673,9 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas for _, c := range req.Req.Commands { for _, s := range m.work.Resource().GetHosts() { - m.fileSendMutex.Lock() - f := m.fileSendMap[s.Server] - m.fileSendMutex.Unlock() + m.failFileSendMutex.Lock() + f := m.failFileSendMap[s.Server] + m.failFileSendMutex.Unlock() if f == nil { continue } @@ -751,23 +813,20 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error) } func (m *Mgr) retrySendFiles(h *dcProtocol.Host) { - m.fileSendMutex.Lock() - sendMap := m.fileSendMap[h.Server] + m.failFileSendMutex.Lock() + sendMap := m.failFileSendMap[h.Server] if sendMap == nil { - m.fileSendMutex.Unlock() + m.failFileSendMutex.Unlock() blog.Infof("remote: send file for work(%s) with no send map", m.work.ID()) return } - m.fileSendMutex.Unlock() + m.failFileSendMutex.Unlock() + failFiles := sendMap.getFailFiles() if len(failFiles) == 0 { return } - var testStr []string - for _, r := range failFiles { - testStr = append(testStr, r.FilePath) - } - blog.Debugf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, strings.Join(testStr, ",")) + blog.Debugf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) if _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{ Pid: 1, Req: failFiles, @@ -778,27 +837,9 @@ func (m *Mgr) retrySendFiles(h *dcProtocol.Host) { blog.Errorf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s failed: %v", m.work.ID(), 1, h.Server, err) return } - blog.Debugf("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(sendMap.getFailFiles())) + blog.Debugf("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) } -// check if files send to remote worker failed and no need to send again -/*func (m *Mgr) isFilesAlreadySendFailed(server string, commands []dcSDK.BKCommand) bool { - m.fileSendMutex.Lock() - target, ok := m.fileSendMap[server] - if !ok { - m.fileSendMutex.Unlock() - return false - } - m.fileSendMutex.Unlock() - - for _, c := range commands { - if target.hasReachedFailCount(c.Inputfiles) { - return true - } - } - return false -}*/ - func (m *Mgr) ensureFilesWithPriority( handler dcSDK.RemoteWorkerHandler, pid int, @@ -1029,7 +1070,8 @@ func (m *Mgr) ensureFiles( if v.info.SendStatus == types.FileSendSucceed { wg <- nil continue - } else if v.info.SendStatus == types.FileSendFailed { + } else if v.info.SendStatus == types.FileSendFailed || + v.info.SendStatus == types.FileSendUnknown { wg <- types.ErrSendFileFailed continue } @@ -1101,11 +1143,19 @@ func (m *Mgr) ensureSingleFile( } req.Files = req.Files[:1] desc := req.Files[0] - blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s)", - desc.FilePath, m.work.ID(), host.Server) - - status, ok := m.checkOrLockSendFile(host.Server, desc, false) - + blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s) with retry %v", + desc.FilePath, m.work.ID(), host.Server, desc.Retry) + var status types.FileSendStatus + var ok bool + if desc.Retry { + status, ok, err = m.checkOrLockSendFailFile(host.Server, desc, false) + if err != nil { // 没找到文件不处理,直接返回不影响其他失败文件发送 + blog.Warnf("remote: checkOrLockSendFailFile(%s) failed: %v", host.Server, err) + return err + } + } else { + status, ok = m.checkOrLockSendFile(host.Server, desc) + } // 已经有人发送了文件, 等待文件就绪 if ok { blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), "+ @@ -1117,14 +1167,18 @@ func (m *Mgr) ensureSingleFile( select { case <-tick.C: // 不是发送文件的goroutine,不需要修改状态,仅查询状态 - status, _ = m.checkOrLockSendFile(host.Server, desc, true) + if desc.Retry { + status, _, _ = m.checkOrLockSendFailFile(host.Server, desc, true) + } else { + status, _ = m.checkOrLockSendFile(host.Server, desc) + } } } switch status { case types.FileSendFailed: blog.Errorf("remote: failed to ensure single file(%s) for work(%s) to server(%s), "+ - "file already sent and failed", desc.FilePath, m.work.ID(), host.Server) + "file already sent and failed with retry %v", desc.FilePath, m.work.ID(), host.Server, desc.Retry) return types.ErrSendFileFailed case types.FileSendSucceed: blog.Debugf("remote: success to ensure single file(%s) for work(%s) to server(%s)", @@ -1153,8 +1207,8 @@ func (m *Mgr) ensureSingleFile( // } // } - blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), going to send this file", - desc.FilePath, m.work.ID(), host.Server) + blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), going to send this file with retry %v", + desc.FilePath, m.work.ID(), host.Server, desc.Retry) req.Messages = m.fileMessageBank.get(desc) // 同步发送文件 @@ -1189,8 +1243,8 @@ func (m *Mgr) ensureSingleFile( desc.FilePath, m.work.ID(), host.Server, retCode) } - blog.Debugf("remote: success to execute send file(%s) for work(%s) to server(%s)", - desc.FilePath, m.work.ID(), host.Server) + blog.Debugf("remote: success to execute send file(%s) for work(%s) to server(%s) with retry %v", + desc.FilePath, m.work.ID(), host.Server, desc.Retry) return nil } @@ -1214,7 +1268,11 @@ func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult) (err error) { select { case <-tick.C: // 不是发送文件的goroutine,不能修改状态 - status, _ = m.checkOrLockSendFile(host.Server, *desc, true) + if desc.Retry { + status, _, _ = m.checkOrLockSendFailFile(host.Server, *desc, true) + } else { + status, _ = m.checkOrLockSendFile(host.Server, *desc) + } } } @@ -1325,8 +1383,9 @@ func (m *Mgr) checkBatchCache( } // checkOrLockFile 检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false -func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc, query bool) (types.FileSendStatus, bool) { +func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc) (types.FileSendStatus, bool) { t1 := time.Now().Local() + m.fileSendMutex.Lock() t2 := time.Now().Local() @@ -1347,10 +1406,29 @@ func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc, query bool } m.fileSendMutex.Unlock() - info, match := target.matchOrInsert(desc, query) + info, match := target.matchOrInsert(desc) return info.SendStatus, match } +func (m *Mgr) checkOrLockSendFailFile(server string, desc dcSDK.FileDesc, query bool) (types.FileSendStatus, bool, error) { + m.failFileSendMutex.Lock() + target, ok := m.failFileSendMap[server] + if !ok { + target = &fileSendMap{} + m.failFileSendMap[server] = target + } + m.failFileSendMutex.Unlock() + + info, match, err := target.matchFail(desc, query) + if err != nil { + return types.FileSendUnknown, false, errors.New("file not found") + } + if info == nil { + return types.FileSendUnknown, false, errors.New("file is nil") + } + return info.SendStatus, match, nil +} + type matchResult struct { info *types.FileInfo match bool @@ -1358,15 +1436,27 @@ type matchResult struct { // checkOrLockCorkFiles 批量检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []matchResult { - m.fileSendMutex.Lock() - target, ok := m.fileSendMap[server] - if !ok { - target = &fileSendMap{} - m.fileSendMap[server] = target - } - m.fileSendMutex.Unlock() + if len(descs) == 0 || !descs[0].Retry { // 普通的文件 + m.fileSendMutex.Lock() + target, ok := m.fileSendMap[server] + if !ok { + target = &fileSendMap{} + m.fileSendMap[server] = target + } + m.fileSendMutex.Unlock() + + return target.matchOrInserts(descs) + } else { // 失败的文件 + m.fileSendMutex.Lock() + target, ok := m.failFileSendMap[server] + if !ok { + target = &fileSendMap{} + m.failFileSendMap[server] = target + } + m.fileSendMutex.Unlock() - return target.matchOrInserts(descs) + return target.matchFails(descs) + } } func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.FileSendStatus) { @@ -1377,9 +1467,17 @@ func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.Fi m.fileSendMap[server] = target } m.fileSendMutex.Unlock() - target.updateStatus(desc, status) - return + + m.failFileSendMutex.Lock() + failTarget, ok := m.failFileSendMap[server] + if !ok { + failTarget = &fileSendMap{} + m.failFileSendMap[server] = failTarget + } + m.failFileSendMutex.Unlock() + + failTarget.updateFailStatus(desc, status, server) } func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) error { @@ -1425,40 +1523,32 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote return nil } -// getFailedFileCollectionByHost 返回文件集合,如果文件集没有全部完成,返回false,否则返回true -func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollectionInfo, bool, error) { +// getFailedFileCollectionByHost 返回失败文件集合 +func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollectionInfo, error) { m.fileCollectionSendMutex.RLock() defer m.fileCollectionSendMutex.RUnlock() target, ok := m.fileCollectionSendMap[server] if !ok { - return nil, true, fmt.Errorf("remote: no found host(%s) in file send cache", server) + return nil, fmt.Errorf("remote: no found host(%s) in file send cache", server) } fcs := make([]*types.FileCollectionInfo, 0) for _, re := range *target { - //如果有fc未到终结状态,则直接返回 - if !re.SendStatus.IsFinished() { - blog.Infof("remote: found file collection(%s) in file send cache, but not finished, status:%s", re.UniqID, re.SendStatus) - return nil, false, nil - } + re.Retry = true if re.SendStatus == types.FileSendFailed { fcs = append(fcs, re) } } - return fcs, true, nil + return fcs, nil } // retry send failed tool chain func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) { - fileCollections, isNotTeminated, err := m.getFailedFileCollectionByHost(req.Server.Server) + fileCollections, err := m.getFailedFileCollectionByHost(req.Server.Server) if err != nil { blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) return } - if !isNotTeminated { - blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) all file collection in host(%s) is not finished", m.work.ID(), req.Pid, req.Server.Server) - return - } if len(fileCollections) == 0 { //blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: no filecollection found", m.work.ID(), req.Pid, req.Server.Server) return @@ -1473,7 +1563,7 @@ func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.R } // enable worker m.resource.EnableWorker(req.Server) - blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) succeed", m.work.ID(), req.Pid, req.Server.Server) + blog.Infof("remote: success to retry send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) } @@ -1608,6 +1698,9 @@ func (m *Mgr) ensureOneFileCollection( fileDetails := make([]*types.FilesDetails, 0, len(fc.Files)) for _, f := range fc.Files { f.NoDuplicated = true + /*if fc.Retry { + f.Retry = true + }*/ fileDetails = append(fileDetails, &types.FilesDetails{ Servers: Servers, File: f, @@ -1649,7 +1742,7 @@ func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionI for _, f := range *target { if f.UniqID == fc.UniqID { // set status to sending if fc send failed - if f.SendStatus == types.FileSendFailed { + if f.SendStatus == types.FileSendFailed && fc.Retry { f.SendStatus = types.FileSending return f.SendStatus, false } diff --git a/src/backend/booster/bk_dist/controller/pkg/types/manager.go b/src/backend/booster/bk_dist/controller/pkg/types/manager.go index 8f14c607..cc06b055 100644 --- a/src/backend/booster/bk_dist/controller/pkg/types/manager.go +++ b/src/backend/booster/bk_dist/controller/pkg/types/manager.go @@ -485,6 +485,7 @@ type FileCollectionInfo struct { SendStatus FileSendStatus `json:"send_status"` Files []dcSDK.FileDesc `json:"files"` Timestamp int64 `json:"timestamp"` + Retry bool `json:"retry"` } // FileInfo record file info