Skip to content

Commit

Permalink
control plane: consistently propagate cluster map
Browse files Browse the repository at this point in the history
* h.call and friends

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jul 8, 2023
1 parent 6dda52d commit b68f5a3
Show file tree
Hide file tree
Showing 15 changed files with 98 additions and 100 deletions.
2 changes: 1 addition & 1 deletion ais/htcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ func (cii *clusterInfo) smapEqual(other *clusterInfo) (ok bool) {

func (c *getMaxCii) do(si *meta.Snode, wg cos.WG, smap *smapX) {
var cii *clusterInfo
body, _, err := c.h.Health(si, c.timeout, c.query)
body, _, err := c.h.reqHealth(si, c.timeout, c.query, smap)
if err != nil {
goto ret
}
Expand Down
30 changes: 17 additions & 13 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func (h *htrun) _call(si *meta.Snode, bargs *bcastArgs, results *bcastResults) {
cargs.req.BodyR, _ = bargs.req.BodyR.(cos.ReadOpenCloser).Open()
}
cargs.cresv = bargs.cresv
res := h.call(cargs)
res := h.call(cargs, bargs.smap)
if bargs.async {
freeCR(res) // discard right away
} else {
Expand All @@ -589,7 +589,7 @@ func (h *htrun) _call(si *meta.Snode, bargs *bcastArgs, results *bcastResults) {
freeCargs(cargs)
}

func (h *htrun) call(args *callArgs) (res *callResult) {
func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) {
var (
req *http.Request
resp *http.Response
Expand Down Expand Up @@ -653,7 +653,8 @@ func (h *htrun) call(args *callArgs) (res *callResult) {

req.Header.Set(apc.HdrCallerID, h.si.ID())
req.Header.Set(apc.HdrCallerName, h.si.Name())
if smap := h.owner.smap.get(); smap != nil && smap.vstr != "" {
debug.Assert(smap != nil)
if smap.vstr != "" {
req.Header.Set(apc.HdrCallerSmapVersion, smap.vstr)
}
req.Header.Set(cos.HdrUserAgent, ua)
Expand Down Expand Up @@ -749,6 +750,7 @@ func (h *htrun) _nfy(n cluster.Notif, err error, upon string) {
args.timeout = cmn.Timeout.MaxKeepalive()
args.selected = nodes
args.nodeCount = len(nodes)
args.smap = smap
args.async = true
_ = h.bcastSelected(args)
freeBcArgs(args)
Expand Down Expand Up @@ -866,7 +868,7 @@ func (h *htrun) bcastAsyncIC(msg *aisMsg) {
cargs.req = args.req
cargs.timeout = args.timeout
}
res := h.call(cargs)
res := h.call(cargs, smap)
freeCargs(cargs)
freeCR(res) // discard right away
wg.Done()
Expand Down Expand Up @@ -1213,11 +1215,8 @@ func (h *htrun) isValidObjname(w http.ResponseWriter, r *http.Request, name stri
return true
}

//
// health client
//

func (h *htrun) Health(si *meta.Snode, timeout time.Duration, query url.Values) (b []byte, status int, err error) {
func (h *htrun) reqHealth(si *meta.Snode, timeout time.Duration, query url.Values, smap *smapX) (b []byte, status int, err error) {
var (
path = apc.URLPathHealth.S
url = si.URL(cmn.NetIntraControl)
Expand All @@ -1228,14 +1227,15 @@ func (h *htrun) Health(si *meta.Snode, timeout time.Duration, query url.Values)
cargs.req = cmn.HreqArgs{Method: http.MethodGet, Base: url, Path: path, Query: query}
cargs.timeout = timeout
}
res := h.call(cargs)
res := h.call(cargs, smap)
b, status, err = res.bytes, res.status, res.err
freeCargs(cargs)
freeCR(res)
return
}

// - utilizes internal API: Health(clusterInfo) to discover a _better_ Smap, if exists
// - utilizes reqHealth (above) to discover a _better_ Smap, if exists
// - via getMaxCii.do()
// - checkAll: query all nodes
// - consider adding max-ver BMD bit here as well (TODO)
func (h *htrun) bcastHealth(smap *smapX, checkAll bool) (*clusterInfo, int /*num confirmations*/) {
Expand Down Expand Up @@ -1716,7 +1716,11 @@ func (h *htrun) regTo(url string, psi *meta.Snode, tout time.Duration, q url.Val
cargs.req = cmn.HreqArgs{Method: http.MethodPost, Base: url, Path: path, Query: q, Body: cos.MustMarshal(cm)}
cargs.timeout = tout
}
res := h.call(cargs)
smap := cm.Smap
if smap == nil {
smap = h.owner.smap.get()
}
res := h.call(cargs, smap)
freeCargs(cargs)
return res
}
Expand Down Expand Up @@ -1776,7 +1780,7 @@ func (h *htrun) pollClusterStarted(config *cmn.Config, psi *meta.Snode) (maxCii
nlog.Warningf("%s: started as a non-primary and got ELECTED during startup", h.si)
return
}
if _, _, err := h.Health(smap.Primary, healthTimeout, query /*ask primary*/); err == nil {
if _, _, err := h.reqHealth(smap.Primary, healthTimeout, query /*ask primary*/, smap); err == nil {
// log
s := fmt.Sprintf("%s via primary health: cluster startup ok, %s", h.si, smap.StringEx())
if self := smap.GetNode(h.si.ID()); self == nil {
Expand Down Expand Up @@ -1824,7 +1828,7 @@ func (h *htrun) unregisterSelf(ignoreErr bool) (err error) {
cargs.req = cmn.HreqArgs{Method: http.MethodDelete, Path: apc.URLPathCluDaemon.Join(h.si.ID())}
cargs.timeout = apc.DefaultTimeout
}
res := h.call(cargs)
res := h.call(cargs, smap)
status, err = res.status, res.err
if err != nil {
f := nlog.Errorf
Expand Down
6 changes: 3 additions & 3 deletions ais/ic.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (ic *ic) bcastListenIC(nl nl.Listener) {
ic.p.bcastAsyncIC(msg)
}

func (ic *ic) sendOwnershipTbl(si *meta.Snode) error {
func (ic *ic) sendOwnershipTbl(si *meta.Snode, smap *smapX) error {
if ic.p.notifs.size() == 0 {
if cmn.FastV(4, cos.SmoduleAIS) {
nlog.Infof("%s: notifs empty, not sending to %s", ic.p, si)
Expand All @@ -405,7 +405,7 @@ func (ic *ic) sendOwnershipTbl(si *meta.Snode) error {
cargs.req = cmn.HreqArgs{Method: http.MethodPost, Path: apc.URLPathIC.S, Body: cos.MustMarshal(msg)}
cargs.timeout = cmn.Timeout.CplaneOperation()
}
res := ic.p.call(cargs)
res := ic.p.call(cargs, smap)
freeCargs(cargs)
return res.err
}
Expand Down Expand Up @@ -435,7 +435,7 @@ func (ic *ic) syncICBundle() error {
cargs.timeout = cmn.Timeout.CplaneOperation()
cargs.cresv = cresIC{} // -> icBundle
}
res := ic.p.call(cargs)
res := ic.p.call(cargs, smap)
freeCargs(cargs)
if res.err != nil {
return res.err
Expand Down
30 changes: 16 additions & 14 deletions ais/kalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (pkr *palive) updateSmap() (stopped bool) {
}
// do keepalive
wg.Add(1)
go pkr.ping(si, wg)
go pkr.ping(si, wg, smap)
}
}
wg.Wait()
Expand All @@ -256,12 +256,12 @@ func (pkr *palive) updateSmap() (stopped bool) {
return
}

func (pkr *palive) ping(si *meta.Snode, wg cos.WG) {
func (pkr *palive) ping(si *meta.Snode, wg cos.WG, smap *smapX) {
defer wg.Done()
if len(pkr.stoppedCh) > 0 {
return
}
ok, stopped := pkr._pingRetry(si)
ok, stopped := pkr._pingRetry(si, smap)
if stopped {
pkr.stoppedCh <- struct{}{}
}
Expand All @@ -270,11 +270,11 @@ func (pkr *palive) ping(si *meta.Snode, wg cos.WG) {
}
}

func (pkr *palive) _pingRetry(to *meta.Snode) (ok, stopped bool) {
func (pkr *palive) _pingRetry(to *meta.Snode, smap *smapX) (ok, stopped bool) {
var (
timeout = time.Duration(pkr.timeoutStats(to.ID()).timeout)
t = mono.NanoTime()
_, status, err = pkr.p.Health(to, timeout, nil)
_, status, err = pkr.p.reqHealth(to, timeout, nil, smap)
)
delta := mono.Since(t)
pkr.updateTimeoutFor(to.ID(), delta)
Expand All @@ -283,8 +283,12 @@ func (pkr *palive) _pingRetry(to *meta.Snode) (ok, stopped bool) {
if err == nil {
return true, false
}
nlog.Warningf("%s fails to respond, err: %v(%d) - retrying...", to.StringEx(), err, status)
ok, stopped = pkr.retry(to)

nlog.Warningf("%s fails to respond: %v(%d) - retrying...", to.StringEx(), err, status)
ticker := time.NewTicker(cmn.KeepaliveRetryDuration())
ok, stopped = pkr.retry(to, ticker)
ticker.Stop()

return ok, stopped
}

Expand Down Expand Up @@ -350,28 +354,26 @@ func (pkr *palive) _final(ctx *smapModifier, clone *smapX) {
_ = pkr.p.metasyncer.sync(revsPair{clone, msg})
}

func (pkr *palive) retry(si *meta.Snode) (ok, stopped bool) {
func (pkr *palive) retry(si *meta.Snode, ticker *time.Ticker) (ok, stopped bool) {
var (
timeout = time.Duration(pkr.timeoutStats(si.ID()).timeout)
ticker = time.NewTicker(cmn.KeepaliveRetryDuration())
i int
)
defer ticker.Stop()
for {
if !pkr.isTimeToPing(si.ID()) {
return true, false
}
select {
case <-ticker.C:
t := mono.NanoTime()
_, status, err := pkr.p.Health(si, timeout, nil)
timeout = pkr.updateTimeoutFor(si.ID(), mono.Since(t))
now := mono.NanoTime()
smap := pkr.p.owner.smap.get()
_, status, err := pkr.p.reqHealth(si, timeout, nil, smap)
timeout = pkr.updateTimeoutFor(si.ID(), mono.Since(now))
if err == nil {
return true, false
}
i++
if i == kaNumRetries {
smap := pkr.p.owner.smap.get()
sname := si.StringEx()
nlog.Warningf("Failed to keepalive %s after %d attempts - removing %s from the %s",
sname, i, sname, smap)
Expand Down
5 changes: 3 additions & 2 deletions ais/metasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,14 +449,14 @@ func (y *metasyncer) syncDone(si *meta.Snode, pairs []revsPair) {
}
}

func (y *metasyncer) handleRefused(method, urlPath string, body io.Reader, refused meta.NodeMap, pairs []revsPair,
smap *smapX) (ok bool) {
func (y *metasyncer) handleRefused(method, urlPath string, body io.Reader, refused meta.NodeMap, pairs []revsPair, smap *smapX) (ok bool) {
args := allocBcArgs()
args.req = cmn.HreqArgs{Method: method, Path: urlPath, BodyR: body}
args.network = cmn.NetIntraControl
args.timeout = cmn.Timeout.MaxKeepalive()
args.nodes = []meta.NodeMap{refused}
args.nodeCount = len(refused)
args.smap = smap
results := y.p.bcastNodes(args)
freeBcArgs(args)
for _, res := range results {
Expand Down Expand Up @@ -563,6 +563,7 @@ func (y *metasyncer) handlePending() (failedCnt int) {
args.timeout = cmn.Timeout.MaxKeepalive()
args.nodes = []meta.NodeMap{pending}
args.nodeCount = len(pending)
args.smap = smap
defer body.Free()
results := y.p.bcastNodes(args)
freeBcArgs(args)
Expand Down
18 changes: 10 additions & 8 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ func (p *proxy) syncNewICOwners(smap, newSmap *smapX) {
for _, psi := range newSmap.Pmap {
if p.SID() != psi.ID() && newSmap.IsIC(psi) && !smap.IsIC(psi) {
go func(psi *meta.Snode) {
if err := p.ic.sendOwnershipTbl(psi); err != nil {
if err := p.ic.sendOwnershipTbl(psi, newSmap); err != nil {
nlog.Errorf("%s: failed to send ownership table to %s, err:%v", p, psi, err)
}
}(psi)
Expand Down Expand Up @@ -2029,7 +2029,8 @@ func (p *proxy) listBuckets(w http.ResponseWriter, r *http.Request, qbck *cmn.Qu
}

// via random target
si, err := p.owner.smap.get().GetRandTarget()
smap := p.owner.smap.get()
si, err := smap.GetRandTarget()
if err != nil {
p.writeErr(w, r, err)
return
Expand All @@ -2047,7 +2048,7 @@ func (p *proxy) listBuckets(w http.ResponseWriter, r *http.Request, qbck *cmn.Qu
}
cargs.timeout = apc.DefaultTimeout
}
res := p.call(cargs)
res := p.call(cargs, smap)
freeCargs(cargs)

if res.err != nil {
Expand Down Expand Up @@ -2223,7 +2224,7 @@ func (p *proxy) lsObjsR(bck *meta.Bck, lsmsg *apc.LsoMsg, smap *smapX, tsi *meta
cargs.timeout = reqTimeout
cargs.cresv = cresLso{} // -> cmn.LsoResult
}
res := p.call(cargs)
res := p.call(cargs, smap)
freeCargs(cargs)
results = make(sliceResults, 1)
results[0] = res
Expand Down Expand Up @@ -2646,7 +2647,7 @@ func (p *proxy) smapFromURL(baseURL string) (smap *smapX, err error) {
cargs.timeout = apc.DefaultTimeout
cargs.cresv = cresSM{} // -> smapX
}
res := p.call(cargs)
res := p.call(cargs, p.owner.smap.get())
if res.err != nil {
err = res.errorf("failed to get Smap from %s", baseURL)
} else {
Expand Down Expand Up @@ -3131,7 +3132,7 @@ func (p *proxy) getDaemonInfo(osi *meta.Snode) (si *meta.Snode, err error) {
cargs.timeout = cmn.Timeout.CplaneOperation()
cargs.cresv = cresND{} // -> meta.Snode
}
res := p.call(cargs)
res := p.call(cargs, p.owner.smap.get())
if res.err != nil {
err = res.err
} else {
Expand All @@ -3146,8 +3147,9 @@ func (p *proxy) headRemoteBck(bck *cmn.Bck, q url.Values) (header http.Header, s
var (
tsi *meta.Snode
path = apc.URLPathBuckets.Join(bck.Name)
smap = p.owner.smap.get()
)
if tsi, err = p.owner.smap.get().GetRandTarget(); err != nil {
if tsi, err = smap.GetRandTarget(); err != nil {
return
}
if bck.IsCloud() {
Expand All @@ -3166,7 +3168,7 @@ func (p *proxy) headRemoteBck(bck *cmn.Bck, q url.Values) (header http.Header, s
cargs.req = cmn.HreqArgs{Method: http.MethodHead, Path: path, Query: q}
cargs.timeout = apc.DefaultTimeout
}
res := p.call(cargs)
res := p.call(cargs, smap)
if res.status == http.StatusNotFound {
err = cmn.NewErrRemoteBckNotFound(bck)
} else if res.status == http.StatusGone {
Expand Down
Loading

0 comments on commit b68f5a3

Please sign in to comment.