From 4642326a286236fa2676d9ae418565f9ed254231 Mon Sep 17 00:00:00 2001 From: yanafu Date: Thu, 21 Nov 2024 16:51:36 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 35 +++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 0eda7806..3cea011c 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -184,7 +184,7 @@ func (fsm *fileSendMap) matchFail(desc dcSDK.FileDesc, query bool) (*types.FileI c, ok := fsm.cache[desc.FilePath] if !ok || c == nil || len(*c) == 0 { - return nil, false, fmt.Errorf("file %s not found", desc.FilePath) + return nil, false, fmt.Errorf("file %s not found, file cache is nil", desc.FilePath) } for _, ci := range *c { @@ -635,6 +635,9 @@ func (m *Mgr) retrySendToolChains(ctx context.Context) { blog.Infof("remote: run toolchain check for work(%s) canceled by context", m.work.ID()) return case <-ticker.C: + if m.failFileSendMap == nil { + continue + } handler := m.remoteWorker.Handler(0, nil, nil, nil) hosts := m.work.Resource().GetHosts() count := 0 @@ -681,6 +684,9 @@ func (m *Mgr) retryFailFiles(ctx context.Context) { blog.Infof("remote: run failfiles check for work(%s) canceled by context", m.work.ID()) return case <-ticker.C: + if m.failFileSendMap == nil { + continue + } hosts := m.work.Resource().GetHosts() wg := make(chan string, len(hosts)) count := 0 @@ -1506,7 +1512,7 @@ func (m *Mgr) checkOrLockSendFailFile(server string, desc dcSDK.FileDesc, query info, match, err := target.matchFail(desc, query) if err != nil { - return types.FileSendUnknown, false, errors.New("file not found") + return types.FileSendUnknown, false, err } if info == nil { return types.FileSendUnknown, false, errors.New("file is nil") @@ -1774,8 +1780,33 @@ func (m *Mgr) ensureOneFileCollection( Servers = append(Servers, host) fileDetails := make([]*types.FilesDetails, 0, len(fc.Files)) + + var failsendMap *fileSendMap + if fc.Retry { + m.failFileSendMutex.Lock() + failsendMap = m.failFileSendMap[host.Server] + if failsendMap == nil { + m.failFileSendMutex.Unlock() + blog.Errorf("remote: send file for work(%s) with no send map", m.work.ID()) + return errors.New("remote: send file with no send map") + } + m.failFileSendMutex.Unlock() + } for _, f := range fc.Files { f.NoDuplicated = true + if fc.Retry { + for _, c := range failsendMap.cache { + for _, d := range *c { + if d.Match(f) { + f.Retry = fc.Retry + break + } + } + if f.Retry { + break + } + } + } fileDetails = append(fileDetails, &types.FilesDetails{ Servers: Servers, File: f,