From b68f5a3931ba982bbd1d75acd4f46096f0ab7e49 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Sat, 8 Jul 2023 09:51:05 -0400 Subject: [PATCH] control plane: consistently propagate cluster map * h.call and friends Signed-off-by: Alex Aizman --- ais/htcommon.go | 2 +- ais/htrun.go | 30 +++++++++++++++------------- ais/ic.go | 6 +++--- ais/kalive.go | 30 +++++++++++++++------------- ais/metasync.go | 5 +++-- ais/proxy.go | 18 +++++++++-------- ais/prxclu.go | 24 ++++++++++++----------- ais/prxetl.go | 7 ++++--- ais/prxnotif.go | 4 +++- ais/prxs3.go | 2 +- ais/tgtcp.go | 17 +++++++--------- ais/tgtimpl.go | 39 +++++++++++++------------------------ ais/tgtobj.go | 2 +- ais/vote.go | 11 ++++++----- cluster/mock/target_mock.go | 1 - 15 files changed, 98 insertions(+), 100 deletions(-) diff --git a/ais/htcommon.go b/ais/htcommon.go index a1c4e406d4..c9170c5c40 100644 --- a/ais/htcommon.go +++ b/ais/htcommon.go @@ -708,7 +708,7 @@ func (cii *clusterInfo) smapEqual(other *clusterInfo) (ok bool) { func (c *getMaxCii) do(si *meta.Snode, wg cos.WG, smap *smapX) { var cii *clusterInfo - body, _, err := c.h.Health(si, c.timeout, c.query) + body, _, err := c.h.reqHealth(si, c.timeout, c.query, smap) if err != nil { goto ret } diff --git a/ais/htrun.go b/ais/htrun.go index 8712b31d48..31f78ab58d 100644 --- a/ais/htrun.go +++ b/ais/htrun.go @@ -578,7 +578,7 @@ func (h *htrun) _call(si *meta.Snode, bargs *bcastArgs, results *bcastResults) { cargs.req.BodyR, _ = bargs.req.BodyR.(cos.ReadOpenCloser).Open() } cargs.cresv = bargs.cresv - res := h.call(cargs) + res := h.call(cargs, bargs.smap) if bargs.async { freeCR(res) // discard right away } else { @@ -589,7 +589,7 @@ func (h *htrun) _call(si *meta.Snode, bargs *bcastArgs, results *bcastResults) { freeCargs(cargs) } -func (h *htrun) call(args *callArgs) (res *callResult) { +func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) { var ( req *http.Request resp *http.Response @@ -653,7 +653,8 @@ func (h *htrun) call(args *callArgs) (res *callResult) { req.Header.Set(apc.HdrCallerID, h.si.ID()) req.Header.Set(apc.HdrCallerName, h.si.Name()) - if smap := h.owner.smap.get(); smap != nil && smap.vstr != "" { + debug.Assert(smap != nil) + if smap.vstr != "" { req.Header.Set(apc.HdrCallerSmapVersion, smap.vstr) } req.Header.Set(cos.HdrUserAgent, ua) @@ -749,6 +750,7 @@ func (h *htrun) _nfy(n cluster.Notif, err error, upon string) { args.timeout = cmn.Timeout.MaxKeepalive() args.selected = nodes args.nodeCount = len(nodes) + args.smap = smap args.async = true _ = h.bcastSelected(args) freeBcArgs(args) @@ -866,7 +868,7 @@ func (h *htrun) bcastAsyncIC(msg *aisMsg) { cargs.req = args.req cargs.timeout = args.timeout } - res := h.call(cargs) + res := h.call(cargs, smap) freeCargs(cargs) freeCR(res) // discard right away wg.Done() @@ -1213,11 +1215,8 @@ func (h *htrun) isValidObjname(w http.ResponseWriter, r *http.Request, name stri return true } -// // health client -// - -func (h *htrun) Health(si *meta.Snode, timeout time.Duration, query url.Values) (b []byte, status int, err error) { +func (h *htrun) reqHealth(si *meta.Snode, timeout time.Duration, query url.Values, smap *smapX) (b []byte, status int, err error) { var ( path = apc.URLPathHealth.S url = si.URL(cmn.NetIntraControl) @@ -1228,14 +1227,15 @@ func (h *htrun) Health(si *meta.Snode, timeout time.Duration, query url.Values) cargs.req = cmn.HreqArgs{Method: http.MethodGet, Base: url, Path: path, Query: query} cargs.timeout = timeout } - res := h.call(cargs) + res := h.call(cargs, smap) b, status, err = res.bytes, res.status, res.err freeCargs(cargs) freeCR(res) return } -// - utilizes internal API: Health(clusterInfo) to discover a _better_ Smap, if exists +// - utilizes reqHealth (above) to discover a _better_ Smap, if exists +// - via getMaxCii.do() // - checkAll: query all nodes // - consider adding max-ver BMD bit here as well (TODO) func (h *htrun) bcastHealth(smap *smapX, checkAll bool) (*clusterInfo, int /*num confirmations*/) { @@ -1716,7 +1716,11 @@ func (h *htrun) regTo(url string, psi *meta.Snode, tout time.Duration, q url.Val cargs.req = cmn.HreqArgs{Method: http.MethodPost, Base: url, Path: path, Query: q, Body: cos.MustMarshal(cm)} cargs.timeout = tout } - res := h.call(cargs) + smap := cm.Smap + if smap == nil { + smap = h.owner.smap.get() + } + res := h.call(cargs, smap) freeCargs(cargs) return res } @@ -1776,7 +1780,7 @@ func (h *htrun) pollClusterStarted(config *cmn.Config, psi *meta.Snode) (maxCii nlog.Warningf("%s: started as a non-primary and got ELECTED during startup", h.si) return } - if _, _, err := h.Health(smap.Primary, healthTimeout, query /*ask primary*/); err == nil { + if _, _, err := h.reqHealth(smap.Primary, healthTimeout, query /*ask primary*/, smap); err == nil { // log s := fmt.Sprintf("%s via primary health: cluster startup ok, %s", h.si, smap.StringEx()) if self := smap.GetNode(h.si.ID()); self == nil { @@ -1824,7 +1828,7 @@ func (h *htrun) unregisterSelf(ignoreErr bool) (err error) { cargs.req = cmn.HreqArgs{Method: http.MethodDelete, Path: apc.URLPathCluDaemon.Join(h.si.ID())} cargs.timeout = apc.DefaultTimeout } - res := h.call(cargs) + res := h.call(cargs, smap) status, err = res.status, res.err if err != nil { f := nlog.Errorf diff --git a/ais/ic.go b/ais/ic.go index 21f732d796..4656ed0561 100644 --- a/ais/ic.go +++ b/ais/ic.go @@ -391,7 +391,7 @@ func (ic *ic) bcastListenIC(nl nl.Listener) { ic.p.bcastAsyncIC(msg) } -func (ic *ic) sendOwnershipTbl(si *meta.Snode) error { +func (ic *ic) sendOwnershipTbl(si *meta.Snode, smap *smapX) error { if ic.p.notifs.size() == 0 { if cmn.FastV(4, cos.SmoduleAIS) { nlog.Infof("%s: notifs empty, not sending to %s", ic.p, si) @@ -405,7 +405,7 @@ func (ic *ic) sendOwnershipTbl(si *meta.Snode) error { cargs.req = cmn.HreqArgs{Method: http.MethodPost, Path: apc.URLPathIC.S, Body: cos.MustMarshal(msg)} cargs.timeout = cmn.Timeout.CplaneOperation() } - res := ic.p.call(cargs) + res := ic.p.call(cargs, smap) freeCargs(cargs) return res.err } @@ -435,7 +435,7 @@ func (ic *ic) syncICBundle() error { cargs.timeout = cmn.Timeout.CplaneOperation() cargs.cresv = cresIC{} // -> icBundle } - res := ic.p.call(cargs) + res := ic.p.call(cargs, smap) freeCargs(cargs) if res.err != nil { return res.err diff --git a/ais/kalive.go b/ais/kalive.go index 57828306ca..a4727aad82 100644 --- a/ais/kalive.go +++ b/ais/kalive.go @@ -233,7 +233,7 @@ func (pkr *palive) updateSmap() (stopped bool) { } // do keepalive wg.Add(1) - go pkr.ping(si, wg) + go pkr.ping(si, wg, smap) } } wg.Wait() @@ -256,12 +256,12 @@ func (pkr *palive) updateSmap() (stopped bool) { return } -func (pkr *palive) ping(si *meta.Snode, wg cos.WG) { +func (pkr *palive) ping(si *meta.Snode, wg cos.WG, smap *smapX) { defer wg.Done() if len(pkr.stoppedCh) > 0 { return } - ok, stopped := pkr._pingRetry(si) + ok, stopped := pkr._pingRetry(si, smap) if stopped { pkr.stoppedCh <- struct{}{} } @@ -270,11 +270,11 @@ func (pkr *palive) ping(si *meta.Snode, wg cos.WG) { } } -func (pkr *palive) _pingRetry(to *meta.Snode) (ok, stopped bool) { +func (pkr *palive) _pingRetry(to *meta.Snode, smap *smapX) (ok, stopped bool) { var ( timeout = time.Duration(pkr.timeoutStats(to.ID()).timeout) t = mono.NanoTime() - _, status, err = pkr.p.Health(to, timeout, nil) + _, status, err = pkr.p.reqHealth(to, timeout, nil, smap) ) delta := mono.Since(t) pkr.updateTimeoutFor(to.ID(), delta) @@ -283,8 +283,12 @@ func (pkr *palive) _pingRetry(to *meta.Snode) (ok, stopped bool) { if err == nil { return true, false } - nlog.Warningf("%s fails to respond, err: %v(%d) - retrying...", to.StringEx(), err, status) - ok, stopped = pkr.retry(to) + + nlog.Warningf("%s fails to respond: %v(%d) - retrying...", to.StringEx(), err, status) + ticker := time.NewTicker(cmn.KeepaliveRetryDuration()) + ok, stopped = pkr.retry(to, ticker) + ticker.Stop() + return ok, stopped } @@ -350,28 +354,26 @@ func (pkr *palive) _final(ctx *smapModifier, clone *smapX) { _ = pkr.p.metasyncer.sync(revsPair{clone, msg}) } -func (pkr *palive) retry(si *meta.Snode) (ok, stopped bool) { +func (pkr *palive) retry(si *meta.Snode, ticker *time.Ticker) (ok, stopped bool) { var ( timeout = time.Duration(pkr.timeoutStats(si.ID()).timeout) - ticker = time.NewTicker(cmn.KeepaliveRetryDuration()) i int ) - defer ticker.Stop() for { if !pkr.isTimeToPing(si.ID()) { return true, false } select { case <-ticker.C: - t := mono.NanoTime() - _, status, err := pkr.p.Health(si, timeout, nil) - timeout = pkr.updateTimeoutFor(si.ID(), mono.Since(t)) + now := mono.NanoTime() + smap := pkr.p.owner.smap.get() + _, status, err := pkr.p.reqHealth(si, timeout, nil, smap) + timeout = pkr.updateTimeoutFor(si.ID(), mono.Since(now)) if err == nil { return true, false } i++ if i == kaNumRetries { - smap := pkr.p.owner.smap.get() sname := si.StringEx() nlog.Warningf("Failed to keepalive %s after %d attempts - removing %s from the %s", sname, i, sname, smap) diff --git a/ais/metasync.go b/ais/metasync.go index af9b6c4678..14587258f2 100644 --- a/ais/metasync.go +++ b/ais/metasync.go @@ -449,14 +449,14 @@ func (y *metasyncer) syncDone(si *meta.Snode, pairs []revsPair) { } } -func (y *metasyncer) handleRefused(method, urlPath string, body io.Reader, refused meta.NodeMap, pairs []revsPair, - smap *smapX) (ok bool) { +func (y *metasyncer) handleRefused(method, urlPath string, body io.Reader, refused meta.NodeMap, pairs []revsPair, smap *smapX) (ok bool) { args := allocBcArgs() args.req = cmn.HreqArgs{Method: method, Path: urlPath, BodyR: body} args.network = cmn.NetIntraControl args.timeout = cmn.Timeout.MaxKeepalive() args.nodes = []meta.NodeMap{refused} args.nodeCount = len(refused) + args.smap = smap results := y.p.bcastNodes(args) freeBcArgs(args) for _, res := range results { @@ -563,6 +563,7 @@ func (y *metasyncer) handlePending() (failedCnt int) { args.timeout = cmn.Timeout.MaxKeepalive() args.nodes = []meta.NodeMap{pending} args.nodeCount = len(pending) + args.smap = smap defer body.Free() results := y.p.bcastNodes(args) freeBcArgs(args) diff --git a/ais/proxy.go b/ais/proxy.go index 1addb3307c..574f4834ed 100644 --- a/ais/proxy.go +++ b/ais/proxy.go @@ -920,7 +920,7 @@ func (p *proxy) syncNewICOwners(smap, newSmap *smapX) { for _, psi := range newSmap.Pmap { if p.SID() != psi.ID() && newSmap.IsIC(psi) && !smap.IsIC(psi) { go func(psi *meta.Snode) { - if err := p.ic.sendOwnershipTbl(psi); err != nil { + if err := p.ic.sendOwnershipTbl(psi, newSmap); err != nil { nlog.Errorf("%s: failed to send ownership table to %s, err:%v", p, psi, err) } }(psi) @@ -2029,7 +2029,8 @@ func (p *proxy) listBuckets(w http.ResponseWriter, r *http.Request, qbck *cmn.Qu } // via random target - si, err := p.owner.smap.get().GetRandTarget() + smap := p.owner.smap.get() + si, err := smap.GetRandTarget() if err != nil { p.writeErr(w, r, err) return @@ -2047,7 +2048,7 @@ func (p *proxy) listBuckets(w http.ResponseWriter, r *http.Request, qbck *cmn.Qu } cargs.timeout = apc.DefaultTimeout } - res := p.call(cargs) + res := p.call(cargs, smap) freeCargs(cargs) if res.err != nil { @@ -2223,7 +2224,7 @@ func (p *proxy) lsObjsR(bck *meta.Bck, lsmsg *apc.LsoMsg, smap *smapX, tsi *meta cargs.timeout = reqTimeout cargs.cresv = cresLso{} // -> cmn.LsoResult } - res := p.call(cargs) + res := p.call(cargs, smap) freeCargs(cargs) results = make(sliceResults, 1) results[0] = res @@ -2646,7 +2647,7 @@ func (p *proxy) smapFromURL(baseURL string) (smap *smapX, err error) { cargs.timeout = apc.DefaultTimeout cargs.cresv = cresSM{} // -> smapX } - res := p.call(cargs) + res := p.call(cargs, p.owner.smap.get()) if res.err != nil { err = res.errorf("failed to get Smap from %s", baseURL) } else { @@ -3131,7 +3132,7 @@ func (p *proxy) getDaemonInfo(osi *meta.Snode) (si *meta.Snode, err error) { cargs.timeout = cmn.Timeout.CplaneOperation() cargs.cresv = cresND{} // -> meta.Snode } - res := p.call(cargs) + res := p.call(cargs, p.owner.smap.get()) if res.err != nil { err = res.err } else { @@ -3146,8 +3147,9 @@ func (p *proxy) headRemoteBck(bck *cmn.Bck, q url.Values) (header http.Header, s var ( tsi *meta.Snode path = apc.URLPathBuckets.Join(bck.Name) + smap = p.owner.smap.get() ) - if tsi, err = p.owner.smap.get().GetRandTarget(); err != nil { + if tsi, err = smap.GetRandTarget(); err != nil { return } if bck.IsCloud() { @@ -3166,7 +3168,7 @@ func (p *proxy) headRemoteBck(bck *cmn.Bck, q url.Values) (header http.Header, s cargs.req = cmn.HreqArgs{Method: http.MethodHead, Path: path, Query: q} cargs.timeout = apc.DefaultTimeout } - res := p.call(cargs) + res := p.call(cargs, smap) if res.status == http.StatusNotFound { err = cmn.NewErrRemoteBckNotFound(bck) } else if res.status == http.StatusGone { diff --git a/ais/prxclu.go b/ais/prxclu.go index 1997c726d4..a7c12476e3 100644 --- a/ais/prxclu.go +++ b/ais/prxclu.go @@ -244,7 +244,7 @@ func (p *proxy) getRemAises(refresh bool) (*cluster.Remotes, error) { } var ( v *cluster.Remotes - res = p.call(cargs) + res = p.call(cargs, smap) err = res.toErr() ) if err == nil { @@ -458,7 +458,7 @@ func (p *proxy) httpclupost(w http.ResponseWriter, r *http.Request) { // handshake | check dup if apiOp == apc.AdminJoin { // call the node with cluster-metadata included - if errCode, err := p.adminJoinHandshake(nsi, apiOp); err != nil { + if errCode, err := p.adminJoinHandshake(smap, nsi, apiOp); err != nil { p.writeErr(w, r, err, errCode) return } @@ -537,7 +537,7 @@ func (p *proxy) httpclupost(w http.ResponseWriter, r *http.Request) { // when joining manually: update the node with cluster meta that does not include Smap // (the later gets finalized and metasync-ed upon success) -func (p *proxy) adminJoinHandshake(nsi *meta.Snode, apiOp string) (int, error) { +func (p *proxy) adminJoinHandshake(smap *smapX, nsi *meta.Snode, apiOp string) (int, error) { cm, err := p.cluMeta(cmetaFillOpt{skipSmap: true}) if err != nil { return http.StatusInternalServerError, err @@ -550,7 +550,7 @@ func (p *proxy) adminJoinHandshake(nsi *meta.Snode, apiOp string) (int, error) { cargs.req = cmn.HreqArgs{Method: http.MethodPost, Path: apc.URLPathDaeAdminJoin.S, Body: cos.MustMarshal(cm)} cargs.timeout = cmn.Timeout.CplaneOperation() } - res := p.call(cargs) + res := p.call(cargs, smap) err = res.err status := res.status if err != nil { @@ -722,6 +722,7 @@ func (p *proxy) cleanupMark(ctx *smapModifier) { } msg = apc.ActMsg{Action: apc.ActCleanupMarkers, Value: &val} cargs = allocCargs() + smap = p.owner.smap.get() timeout = cmn.Timeout.CplaneOperation() sleep = timeout >> 1 ) @@ -732,7 +733,7 @@ func (p *proxy) cleanupMark(ctx *smapModifier) { } time.Sleep(sleep) for i := 0; i < 4; i++ { // retry - res := p.call(cargs) + res := p.call(cargs, smap) err := res.err freeCR(res) if err == nil { @@ -740,6 +741,7 @@ func (p *proxy) cleanupMark(ctx *smapModifier) { } if cos.IsRetriableConnErr(err) { time.Sleep(sleep) + smap = p.owner.smap.get() nlog.Warningf("%s: %v (cleanmark #%d)", p, err, i+1) continue } @@ -1151,7 +1153,7 @@ func (p *proxy) resilverOne(w http.ResponseWriter, r *http.Request, msg *apc.Act cargs.si = si cargs.req = cmn.HreqArgs{Method: http.MethodPut, Path: apc.URLPathXactions.S, Body: body} } - res := p.call(cargs) + res := p.call(cargs, smap) freeCargs(cargs) if res.err != nil { p.writeErr(w, r, res.toErr()) @@ -1184,7 +1186,7 @@ func (p *proxy) sendOwnTbl(w http.ResponseWriter, r *http.Request, msg *apc.ActM } if smap.IsIC(p.si) && !p.si.Equals(dst) { // node has older version than dst node handle locally - if err := p.ic.sendOwnershipTbl(dst); err != nil { + if err := p.ic.sendOwnershipTbl(dst, smap); err != nil { p.writeErr(w, r, err) } return @@ -1203,7 +1205,7 @@ func (p *proxy) sendOwnTbl(w http.ResponseWriter, r *http.Request, msg *apc.ActM continue } cargs.si = psi - res := p.call(cargs) + res := p.call(cargs, smap) if res.err != nil { err = res.toErr() } @@ -1415,12 +1417,12 @@ func (p *proxy) stopMaintenance(w http.ResponseWriter, r *http.Request, msg *apc return } timeout := cmn.GCO.Get().Timeout.CplaneOperation.D() - if _, status, err := p.Health(si, timeout, nil); err != nil { + if _, status, err := p.reqHealth(si, timeout, nil, smap); err != nil { sleep, retries := timeout/2, 5 time.Sleep(sleep) for i := 0; i < retries; i++ { time.Sleep(sleep) - _, status, err = p.Health(si, timeout, nil) + _, status, err = p.reqHealth(si, timeout, nil, smap) if err == nil { break } @@ -1833,7 +1835,7 @@ func (p *proxy) rmNodeFinal(msg *apc.ActMsg, si *meta.Snode, ctx *smapModifier) } nlog.Infof("%s: %s %s", p, msg.Action, sname) - res := p.call(cargs) + res := p.call(cargs, smap) err = res.unwrap() freeCargs(cargs) freeCR(res) diff --git a/ais/prxetl.go b/ais/prxetl.go index 7e3e88b233..c677468ff0 100644 --- a/ais/prxetl.go +++ b/ais/prxetl.go @@ -323,8 +323,9 @@ func (p *proxy) logsETL(w http.ResponseWriter, r *http.Request, etlName string, if len(apiItems) > 0 { // specific target var ( - tid = apiItems[0] - si = p.owner.smap.get().GetTarget(tid) + tid = apiItems[0] + smap = p.owner.smap.get() + si = smap.GetTarget(tid) ) if si == nil { p.writeErrf(w, r, "unknown target %q", tid) @@ -338,7 +339,7 @@ func (p *proxy) logsETL(w http.ResponseWriter, r *http.Request, etlName string, cargs.timeout = apc.DefaultTimeout cargs.cresv = cresEL{} // -> etl.Logs } - results[0] = p.call(cargs) + results[0] = p.call(cargs, smap) freeCargs(cargs) } else { // all targets diff --git a/ais/prxnotif.go b/ais/prxnotif.go index aefe1456cf..2164399d79 100644 --- a/ais/prxnotif.go +++ b/ais/prxnotif.go @@ -321,6 +321,7 @@ func (n *notifs) done(nl nl.Listener) { args.timeout = cmn.Timeout.MaxKeepalive() args.nodes = []meta.NodeMap{nl.Notifiers()} args.nodeCount = len(args.nodes[0]) + args.smap = smap args.async = true _ = n.p.bcastNodes(args) // args.async: result is already discarded/freed freeBcArgs(args) @@ -384,7 +385,7 @@ func (n *notifs) housekeep() time.Duration { return hk.PruneActiveIval } -// conditional: ask targets iff they delayed updating +// conditional: query targets iff they delayed updating func (n *notifs) bcastGetStats(nl nl.Listener, dur time.Duration) { var ( config = cmn.GCO.Get() @@ -403,6 +404,7 @@ func (n *notifs) bcastGetStats(nl nl.Listener, dur time.Duration) { args.req = nl.QueryArgs() // nodes to fetch stats from args.nodes = []meta.NodeMap{nodesTardy} args.nodeCount = len(args.nodes[0]) + args.smap = n.p.owner.smap.get() debug.Assert(args.nodeCount > 0) // Ensure that there is at least one node to fetch. results := n.p.bcastNodes(args) diff --git a/ais/prxs3.go b/ais/prxs3.go index b7241738f0..a9ca74f7a5 100644 --- a/ais/prxs3.go +++ b/ais/prxs3.go @@ -499,7 +499,7 @@ func (p *proxy) listMultipart(w http.ResponseWriter, r *http.Request, bck *meta. ) cargs.si = si cargs.req = cmn.HreqArgs{Method: http.MethodGet, Base: url, Path: r.URL.Path, Query: q} - res := p.call(cargs) + res := p.call(cargs, smap) b, err := res.bytes, res.err freeCargs(cargs) freeCR(res) diff --git a/ais/tgtcp.go b/ais/tgtcp.go index 5e21b04ccd..c15c74ffe9 100644 --- a/ais/tgtcp.go +++ b/ais/tgtcp.go @@ -776,10 +776,11 @@ func (t *target) getPrimaryBMD(renamed string) (bmd *bucketMD, err error) { cargs.timeout = timeout cargs.cresv = cresBM{} } - res := t.call(cargs) + res := t.call(cargs, smap) if res.err != nil { time.Sleep(timeout / 2) - res = t.call(cargs) + smap = t.owner.smap.get() + res = t.call(cargs, smap) if res.err != nil { err = res.errorf("%s: failed to GET(%q)", t.si, what) } @@ -1083,13 +1084,9 @@ func (t *target) enable() error { return nil } -// -// HeadObj* where target acts as a client -// - -// HeadObjT2T checks with a given target to see if it has the object. -// (compare with api.HeadObject) -func (t *target) HeadObjT2T(lom *cluster.LOM, tsi *meta.Snode) (ok bool) { +// checks with a given target to see if it has the object. +// target acts as a client - compare with api.HeadObject +func (t *target) headt2t(lom *cluster.LOM, tsi *meta.Snode, smap *smapX) (ok bool) { q := lom.Bck().AddToQuery(nil) q.Set(apc.QparamSilent, "true") q.Set(apc.QparamFltPresence, strconv.Itoa(apc.FltPresent)) @@ -1108,7 +1105,7 @@ func (t *target) HeadObjT2T(lom *cluster.LOM, tsi *meta.Snode) (ok bool) { } cargs.timeout = cmn.Timeout.CplaneOperation() } - res := t.call(cargs) + res := t.call(cargs, smap) ok = res.err == nil freeCargs(cargs) freeCR(res) diff --git a/ais/tgtimpl.go b/ais/tgtimpl.go index 98f2a7f89f..4f8542fd26 100644 --- a/ais/tgtimpl.go +++ b/ais/tgtimpl.go @@ -36,6 +36,10 @@ func (*target) GetAllRunning(xactKind string, separateIdle bool) (running, idle return xreg.GetAllRunning(xactKind, separateIdle) } +func (t *target) Health(si *meta.Snode, timeout time.Duration, query url.Values) ([]byte, int, error) { + return t.reqHealth(si, timeout, query, t.owner.smap.get()) +} + func (t *target) Backend(bck *meta.Bck) cluster.BackendProvider { if bck.IsRemoteAIS() { return t.backend[apc.AIS] @@ -103,6 +107,10 @@ func (t *target) EvictObject(lom *cluster.LOM) (errCode int, err error) { return } +func (t *target) HeadObjT2T(lom *cluster.LOM, si *meta.Snode) bool { + return t.headt2t(lom, si, t.owner.smap.get()) +} + // CopyObject: // - either creates a full replica of the source object (the `lom` argument) // - or transforms the object @@ -214,7 +222,8 @@ func (t *target) Promote(params cluster.PromoteParams) (errCode int, err error) } func (t *target) _promote(params *cluster.PromoteParams, lom *cluster.LOM) (errCode int, err error) { - tsi, local, erh := lom.HrwTarget(t.owner.smap.Get()) + smap := t.owner.smap.get() + tsi, local, erh := lom.HrwTarget(&smap.Smap) if erh != nil { return 0, erh } @@ -222,7 +231,7 @@ func (t *target) _promote(params *cluster.PromoteParams, lom *cluster.LOM) (errC if local { size, errCode, err = t._promLocal(params, lom) } else { - size, err = t._promRemote(params, lom, tsi) + size, err = t._promRemote(params, lom, tsi, smap) if err == nil && size >= 0 && params.Xact != nil { params.Xact.OutObjsAdd(1, size) } @@ -319,11 +328,11 @@ func (t *target) _promLocal(params *cluster.PromoteParams, lom *cluster.LOM) (fi // TODO: use DM streams // TODO: Xact.InObjsAdd on the receive side -func (t *target) _promRemote(params *cluster.PromoteParams, lom *cluster.LOM, tsi *meta.Snode) (int64, error) { +func (t *target) _promRemote(params *cluster.PromoteParams, lom *cluster.LOM, tsi *meta.Snode, smap *smapX) (int64, error) { lom.FQN = params.SrcFQN // when not overwriting check w/ remote target first (and separately) - if !params.OverwriteDst && t.HeadObjT2T(lom, tsi) { + if !params.OverwriteDst && t.headt2t(lom, tsi, smap) { return -1, nil } @@ -348,25 +357,3 @@ func (t *target) DisableMpath(mpath, reason string) (err error) { _, err = t.fsprg.disableMpath(mpath, true /*dont-resilver*/) // NOTE: not resilvering upon FSCH calling return } - -func (t *target) RebalanceNamespace(si *meta.Snode) (b []byte, status int, err error) { - // pull the data - query := url.Values{} - query.Set(apc.QparamRebData, "true") - cargs := allocCargs() - { - cargs.si = si - cargs.req = cmn.HreqArgs{ - Method: http.MethodGet, - Base: si.URL(cmn.NetIntraData), - Path: apc.URLPathRebalance.S, - Query: query, - } - cargs.timeout = apc.DefaultTimeout - } - res := t.call(cargs) - b, status, err = res.bytes, res.status, res.err - freeCargs(cargs) - freeCR(res) - return -} diff --git a/ais/tgtobj.go b/ais/tgtobj.go index ed6d545696..acc5b9ed60 100644 --- a/ais/tgtobj.go +++ b/ais/tgtobj.go @@ -734,7 +734,7 @@ func (goi *getOI) restoreFromAny(skipLomRestore bool) (doubleCheck bool, errCode doubleCheck = true } if running && tsi.ID() != goi.t.SID() { - if goi.t.HeadObjT2T(goi.lom, tsi) { + if goi.t.headt2t(goi.lom, tsi, smap) { gfnNode = tsi goto gfn } diff --git a/ais/vote.go b/ais/vote.go index 92e5038d4b..cd823ab139 100644 --- a/ais/vote.go +++ b/ais/vote.go @@ -195,6 +195,7 @@ func (p *proxy) startElection(vr *VoteRecord) { func (p *proxy) elect(vr *VoteRecord, xele *xs.Election) { var ( + smap *smapX err error curPrimary = vr.Smap.Primary config = cmn.GCO.Get() @@ -205,12 +206,12 @@ func (p *proxy) elect(vr *VoteRecord, xele *xs.Election) { if i > 0 { runtime.Gosched() } - smap := p.owner.smap.get() + smap = p.owner.smap.get() if smap.version() > vr.Smap.version() { nlog.Warningf("%s: %s updated from %s, moving back to idle", p, smap, vr.Smap) return } - _, _, err = p.Health(curPrimary, timeout, nil /*ask primary*/) + _, _, err = p.reqHealth(curPrimary, timeout, nil /*ask primary*/, smap) if err == nil { break } @@ -219,7 +220,7 @@ func (p *proxy) elect(vr *VoteRecord, xele *xs.Election) { if err == nil { // move back to idle query := url.Values{apc.QparamAskPrimary: []string{"true"}} - _, _, err = p.Health(curPrimary, timeout, query /*ask primary*/) + _, _, err = p.reqHealth(curPrimary, timeout, query /*ask primary*/, smap) if err == nil { nlog.Infof("%s: current primary %s is up, moving back to idle", p, curPrimary) } else { @@ -560,7 +561,7 @@ func (h *htrun) sendElectionRequest(vr *VoteInitiation, nextPrimaryProxy *meta.S } cargs.timeout = apc.DefaultTimeout } - res := h.call(cargs) + res := h.call(cargs, vr.Smap) err = res.err freeCR(res) defer freeCargs(cargs) @@ -571,7 +572,7 @@ func (h *htrun) sendElectionRequest(vr *VoteInitiation, nextPrimaryProxy *meta.S sleep := cmn.Timeout.CplaneOperation() / 2 for i := 0; i < maxRetryElectReq; i++ { time.Sleep(sleep) - res = h.call(cargs) + res = h.call(cargs, vr.Smap) err = res.err freeCR(res) if err == nil { diff --git a/cluster/mock/target_mock.go b/cluster/mock/target_mock.go index 6a6abdcb04..f8f2072780 100644 --- a/cluster/mock/target_mock.go +++ b/cluster/mock/target_mock.go @@ -52,7 +52,6 @@ func (*TargetMock) DeleteObject(*cluster.LOM, bool) (int, error) func (*TargetMock) Promote(cluster.PromoteParams) (int, error) { return 0, nil } func (*TargetMock) Backend(*meta.Bck) cluster.BackendProvider { return nil } func (*TargetMock) HeadObjT2T(*cluster.LOM, *meta.Snode) bool { return false } -func (*TargetMock) RebalanceNamespace(*meta.Snode) ([]byte, int, error) { return nil, 0, nil } func (*TargetMock) BMDVersionFixup(*http.Request, ...cmn.Bck) {} func (*TargetMock) FSHC(error, string) {} func (*TargetMock) OOS(*fs.CapStatus) fs.CapStatus { return fs.CapStatus{} }