Skip to content

Commit

Permalink
core: when primary goes down it notifies
Browse files Browse the repository at this point in the history
* on a best-effort basis
* triggers reelection immediately, saves approx. 25s (keep-alive)
* except when cluster shuts down (admin)
* with minor refactoring

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jul 8, 2023
1 parent 4f7ae74 commit 1120e29
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 40 deletions.
13 changes: 5 additions & 8 deletions ais/htcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ type (
detail string
}
errNoUnregister struct {
detail string
action string
}
apiRequest struct {
bck *meta.Bck // out: initialized bucket
Expand Down Expand Up @@ -284,14 +284,11 @@ func (e *errNodeNotFound) Error() string {
// errNoUnregister //
/////////////////////

func (e *errNoUnregister) Error() string { return e.detail }
func (e *errNoUnregister) Error() string { return e.action }

func isErrNoUnregister(err error) bool {
if _, ok := err.(*errNoUnregister); ok {
return true
}
enu := &errNoUnregister{}
return errors.As(err, &enu)
func isErrNoUnregister(err error) (ok bool) {
_, ok = err.(*errNoUnregister)
return
}

//////////////////
Expand Down
2 changes: 1 addition & 1 deletion ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -1833,7 +1833,7 @@ func (h *htrun) unregisterSelf(ignoreErr bool) (err error) {
if err != nil {
f := nlog.Errorf
if ignoreErr {
f = nlog.Warningf
f = nlog.Infof
}
f("%s: failed to unreg self, err: %v(%d)", h.si, err, status)
}
Expand Down
4 changes: 2 additions & 2 deletions ais/kalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (tkr *talive) do() (stopped bool) {
return
}
if stopped = tkr.keepalive.do(smap, tkr.t.si); stopped {
tkr.t.onPrimaryFail(nil /*proxy*/)
tkr.t.onPrimaryDown(nil /*proxy*/, "")
}
return
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func (pkr *palive) do() (stopped bool) {
return
}
if stopped = pkr.keepalive.do(smap, pkr.p.si); stopped {
pkr.p.onPrimaryFail(pkr.p /*self*/)
pkr.p.onPrimaryDown(pkr.p /*self*/, "")
}
return
}
Expand Down
23 changes: 21 additions & 2 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3302,9 +3302,15 @@ func (p *proxy) Stop(err error) {
s = "Stopping " + p.String()
smap = p.owner.smap.get()
isPrimary = smap.isPrimary(p.si)
e, isEnu = err.(*errNoUnregister)
)
if isPrimary {
s += "(primary)"
if !isEnu || e.action != apc.ActShutdownCluster {
if npsi, err := cluster.HrwProxy(&smap.Smap, p.SID()); err == nil {
p.notifyCandidate(npsi, smap)
}
}
}
if err == nil {
nlog.Infoln(s)
Expand All @@ -3313,6 +3319,19 @@ func (p *proxy) Stop(err error) {
}
xreg.AbortAll(errors.New("p-stop"))

rmFromSmap := !isPrimary && smap.isValid() && !isErrNoUnregister(err)
p.htrun.stop(rmFromSmap)
p.htrun.stop(!isPrimary && smap.isValid() && !isEnu /*rmFromSmap*/)
}

// on a best-effort basis, ignoring errors and bodyclose
func (p *proxy) notifyCandidate(npsi *meta.Snode, smap *smapX) {
cargs := allocCargs()
cargs.si = npsi
cargs.req = cmn.HreqArgs{Method: http.MethodPut, Base: npsi.URL(cmn.NetIntraControl), Path: apc.URLPathVotePriStop.S}
req, err := cargs.req.Req()
if err != nil {
return
}
req.Header.Set(apc.HdrCallerID, p.SID())
req.Header.Set(apc.HdrCallerSmapVersion, smap.vstr)
p.client.control.Do(req) //nolint:bodyclose // exiting
}
41 changes: 33 additions & 8 deletions ais/vote.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,29 @@ func (p *proxy) voteHandler(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}
item := apiItems[0]
if !p.NodeStarted() {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
switch {
case r.Method == http.MethodGet && apiItems[0] == apc.Proxy:
// MethodGet
if r.Method == http.MethodGet {
if item != apc.Proxy {
p.writeErrURL(w, r)
return
}
p.httpgetvote(w, r)
case r.Method == http.MethodPut && apiItems[0] == apc.Voteres:
return
}
// MethodPut
switch item {
case apc.Voteres:
p.httpsetprimary(w, r)
case r.Method == http.MethodPut && apiItems[0] == apc.VoteInit:
case apc.VoteInit:
p.httpelect(w, r)
case apc.PriStop:
callerID := r.Header.Get(apc.HdrCallerID)
p.onPrimaryDown(p, callerID)
default:
p.writeErrURL(w, r)
}
Expand Down Expand Up @@ -383,24 +395,35 @@ func (t *target) voteHandler(w http.ResponseWriter, r *http.Request) {
// voting: common methods
//

func (h *htrun) onPrimaryFail(self *proxy) {
func (h *htrun) onPrimaryDown(self *proxy, callerID string) {
smap := h.owner.smap.get()
if smap.validate() != nil {
return
}
clone := smap.clone()
nlog.Infof("%s: primary %s has FAILED", h.si, clone.Primary.StringEx())
s := "via keepalive"
if callerID != "" {
s = "via direct call"
if callerID != clone.Primary.ID() {
nlog.Errorf("%s (%s): non-primary caller reporting primary down (%s, %s, %s)",
h, s, callerID, clone.Primary.StringEx(), smap)
return
}
}
nlog.Infof("%s (%s): primary %s is no longer online and must be reelected", h, s, clone.Primary.StringEx())

for {
if daemon.stopping.Load() {
return
}
// use HRW ordering
nextPrimaryProxy, err := cluster.HrwProxy(&clone.Smap, clone.Primary.ID())
if err != nil {
if !daemon.stopping.Load() {
nlog.Errorf("%s: failed to execute HRW selection, err: %v", h, err)
nlog.Errorf("%s failed to execute HRW selection: %v", h, err)
}
return
}
nlog.Infof("%s: trying %s as the new primary candidate", h, meta.Pname(nextPrimaryProxy.ID()))

// If this proxy is the next primary proxy candidate, it starts the election directly.
if nextPrimaryProxy.ID() == h.si.ID() {
Expand All @@ -418,6 +441,8 @@ func (h *htrun) onPrimaryFail(self *proxy) {
return
}

nlog.Infof("%s: trying %s as the new primary candidate", h, meta.Pname(nextPrimaryProxy.ID()))

// ask the candidate to start election
vr := &VoteInitiation{
Candidate: nextPrimaryProxy.ID(),
Expand Down
2 changes: 2 additions & 0 deletions api/apc/urlpaths.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (

Voteres = "result"
VoteInit = "init"
PriStop = "primary-stopping"

// (see the corresponding action messages above)
Keepalive = "keepalive"
Expand Down Expand Up @@ -130,6 +131,7 @@ var (
URLPathVoteInit = urlpath(Version, Vote, Init)
URLPathVoteProxy = urlpath(Version, Vote, Proxy)
URLPathVoteVoteres = urlpath(Version, Vote, Voteres)
URLPathVotePriStop = urlpath(Version, Vote, PriStop)

URLPathdSort = urlpath(Version, Sort)
URLPathdSortInit = urlpath(Version, Sort, Init)
Expand Down
4 changes: 3 additions & 1 deletion cmn/debug/debug_on.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ func _panic(a ...any) {
break
}
f := filepath.Base(file)

if l := len(f); l > 3 {
f = f[:l-3]
}
if buffer.Len() > len(msg) {
buffer.WriteString(" <- ")
}
Expand Down
22 changes: 5 additions & 17 deletions cmn/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,34 +909,22 @@ func (e *ErrHTTP) _trace() {

func IsStatusServiceUnavailable(err error) (yes bool) {
herr, ok := err.(*ErrHTTP)
if !ok {
return false
}
return herr.Status == http.StatusServiceUnavailable
return ok && herr.Status == http.StatusServiceUnavailable
}

func IsStatusNotFound(err error) (yes bool) {
herr, ok := err.(*ErrHTTP)
if !ok {
return false
}
return herr.Status == http.StatusNotFound
return ok && herr.Status == http.StatusNotFound
}

func IsStatusBadGateway(err error) (yes bool) {
herr, ok := err.(*ErrHTTP)
if !ok {
return false
}
return herr.Status == http.StatusBadGateway
return ok && herr.Status == http.StatusBadGateway
}

func IsStatusGone(err error) (yes bool) {
hErr, ok := err.(*ErrHTTP)
if !ok {
return false
}
return hErr.Status == http.StatusGone
herr, ok := err.(*ErrHTTP)
return ok && herr.Status == http.StatusGone
}

func Str2HTTPErr(msg string) *ErrHTTP {
Expand Down
2 changes: 1 addition & 1 deletion cmn/nlog/debug_on.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

func assert(cond bool, a ...any) {
if !cond {
msg := "DEBUG PANIC: "
msg := "nlog assertion failed: "
if len(a) > 0 {
msg += fmt.Sprint(a...)
}
Expand Down

0 comments on commit 1120e29

Please sign in to comment.