diff --git a/src/main/java/io/openmessaging/storage/dledger/DLedgerLeaderElector.java b/src/main/java/io/openmessaging/storage/dledger/DLedgerLeaderElector.java index a126109f..fe6eaf70 100644 --- a/src/main/java/io/openmessaging/storage/dledger/DLedgerLeaderElector.java +++ b/src/main/java/io/openmessaging/storage/dledger/DLedgerLeaderElector.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -111,9 +112,7 @@ public CompletableFuture handleHeartBeat(HeartBeatRequest req return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.UNEXPECTED_MEMBER.getCode())); } - if (request.getTerm() < memberState.currTerm()) { - return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode())); - } else if (request.getTerm() == memberState.currTerm()) { + if (request.getTerm() == memberState.currTerm()) { if (request.getLeaderId().equals(memberState.getLeaderId())) { lastLeaderHeartBeatTime = System.currentTimeMillis(); return CompletableFuture.completedFuture(new HeartBeatResponse()); @@ -124,8 +123,15 @@ public CompletableFuture handleHeartBeat(HeartBeatRequest req //hold the lock to get the latest term and leaderId synchronized (memberState) { if (request.getTerm() < memberState.currTerm()) { + if (memberState.isCandidate() && request.isNeedCheckMemberState()) { + logger.warn("[CHECK_MEMBER_STATE] [HandleHeartBeat] remoteId={} need check member state", request.getLeaderId()); + memberState.recoveryToFollower(request.getTerm(), request.getLeaderId()); + return CompletableFuture.completedFuture(new HeartBeatResponse()); + } return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode())); - } else if (request.getTerm() == memberState.currTerm()) { + } + + if (request.getTerm() == memberState.currTerm()) { if (memberState.getLeaderId() == null) { changeRoleToFollower(request.getTerm(), request.getLeaderId()); return CompletableFuture.completedFuture(new HeartBeatResponse()); @@ -283,10 +289,12 @@ private void sendHeartbeats(long term, String leaderId) throws Exception { break; } - if (x.getCode() == DLedgerResponseCode.NETWORK_ERROR.getCode()) + if (x.getCode() == DLedgerResponseCode.NETWORK_ERROR.getCode()) { memberState.getPeersLiveTable().put(id, Boolean.FALSE); - else + } else { memberState.getPeersLiveTable().put(id, Boolean.TRUE); + memberState.getPeersTermTable().put(id, x.getTerm()); + } if (memberState.isQuorum(succNum.get()) || memberState.isQuorum(succNum.get() + notReadyNum.get())) { @@ -305,6 +313,7 @@ private void sendHeartbeats(long term, String leaderId) throws Exception { beatLatch.await(heartBeatTimeIntervalMs, TimeUnit.MILLISECONDS); if (memberState.isQuorum(succNum.get())) { lastSuccHeartBeatTime = System.currentTimeMillis(); + checkPeersTermTable(); } else { logger.info("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}", memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), new Timestamp(lastSuccHeartBeatTime)); @@ -320,6 +329,28 @@ private void sendHeartbeats(long term, String leaderId) throws Exception { } } + private void checkPeersTermTable() throws Exception { + if (memberState.getSelfId().equals(memberState.getLeaderId())) { + long leaderTerm = memberState.getPeersTermTable().getOrDefault(memberState.getLeaderId(), -1L); + for (Map.Entry entryTerm : memberState.getPeersTermTable().entrySet()) { + if (entryTerm.getKey().equals(memberState.getSelfId())) { + continue; + } + + if (entryTerm.getValue() > leaderTerm) { + HeartBeatRequest heartBeatRequest = new HeartBeatRequest(); + heartBeatRequest.setGroup(memberState.getGroup()); + heartBeatRequest.setLocalId(memberState.getSelfId()); + heartBeatRequest.setRemoteId(entryTerm.getKey()); + heartBeatRequest.setLeaderId(memberState.getLeaderId()); + heartBeatRequest.setNeedCheckMemberState(true); + heartBeatRequest.setTerm(leaderTerm); + dLedgerRpcService.heartBeat(heartBeatRequest); + } + } + } + } + private void maintainAsLeader() throws Exception { if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) { long term; diff --git a/src/main/java/io/openmessaging/storage/dledger/MemberState.java b/src/main/java/io/openmessaging/storage/dledger/MemberState.java index 9160cb37..4298096e 100644 --- a/src/main/java/io/openmessaging/storage/dledger/MemberState.java +++ b/src/main/java/io/openmessaging/storage/dledger/MemberState.java @@ -52,6 +52,7 @@ public class MemberState { private long knownMaxTermInGroup = -1; private Map peerMap = new HashMap<>(); private Map peersLiveTable = new ConcurrentHashMap<>(); + private Map peersTermTable = new HashMap<>(); private volatile String transferee; private volatile long termToTakeLeadership = -1; @@ -132,6 +133,7 @@ public synchronized void changeToLeader(long term) { this.role = LEADER; this.leaderId = selfId; peersLiveTable.clear(); + peersTermTable.clear(); } public synchronized void changeToFollower(long term, String leaderId) { @@ -153,6 +155,13 @@ public synchronized void changeToCandidate(long term) { transferee = null; } + public synchronized void recoveryToFollower(long term, String leaderId) { + this.role = FOLLOWER; + this.leaderId = leaderId; + this.currTerm = term; + transferee = null; + } + public String getTransferee() { return transferee; } @@ -226,6 +235,10 @@ public Map getPeersLiveTable() { return peersLiveTable; } + public Map getPeersTermTable() { + return peersTermTable; + } + //just for test public void setCurrTermForTest(long term) { PreConditions.check(term >= currTerm, DLedgerResponseCode.ILLEGAL_MEMBER_STATE); diff --git a/src/main/java/io/openmessaging/storage/dledger/protocol/HeartBeatRequest.java b/src/main/java/io/openmessaging/storage/dledger/protocol/HeartBeatRequest.java index 6617855f..ecdb9e87 100644 --- a/src/main/java/io/openmessaging/storage/dledger/protocol/HeartBeatRequest.java +++ b/src/main/java/io/openmessaging/storage/dledger/protocol/HeartBeatRequest.java @@ -18,4 +18,13 @@ public class HeartBeatRequest extends RequestOrResponse { + private boolean needCheckMemberState = false; + + public boolean isNeedCheckMemberState() { + return needCheckMemberState; + } + + public void setNeedCheckMemberState(boolean needCheckMemberState) { + this.needCheckMemberState = needCheckMemberState; + } }