From 1d99921ef4bf676029cf231faf2ca79eff59f44b Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Tue, 6 Feb 2024 08:22:52 -0800 Subject: [PATCH] RATIS-2024. Refactor appendEntries code. (#1040) --- .../ratis/server/impl/RaftServerImpl.java | 169 +++++++----------- .../apache/ratis/server/impl/ServerState.java | 23 ++- .../impl/SnapshotInstallationHandler.java | 29 ++- .../ratis/server/util/ServerStringUtils.java | 8 +- 4 files changed, 94 insertions(+), 135 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index c47db14d66..7390093c33 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -108,6 +108,11 @@ import org.apache.ratis.util.Timestamp; import org.apache.ratis.util.function.CheckedSupplier; +import static org.apache.ratis.server.impl.ServerImplUtils.effectiveCommitIndex; +import static org.apache.ratis.server.impl.ServerProtoUtils.toAppendEntriesReplyProto; +import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesReplyString; +import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesRequestString; + class RaftServerImpl implements RaftServer.Division, RaftServerProtocol, RaftServerAsynchronousProtocol, RaftClientProtocol, RaftClientAsynchronousProtocol { @@ -1483,18 +1488,24 @@ public CompletableFuture appendEntriesAsync( ReferenceCountedObject requestRef) throws IOException { final AppendEntriesRequestProto r = requestRef.retain(); final RaftRpcRequestProto request = r.getServerRequest(); - final List entries = r.getEntriesList(); final TermIndex previous = r.hasPreviousLog()? TermIndex.valueOf(r.getPreviousLog()) : null; - final RaftPeerId requestorId = RaftPeerId.valueOf(request.getRequestorId()); - try { - preAppendEntriesAsync(requestorId, ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(), - previous, r.getLeaderCommit(), r.getInitializing(), entries); - return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous, r.getLeaderCommit(), - request.getCallId(), r.getInitializing(), r.getCommitInfosList(), entries, requestRef); + final RaftPeerId leaderId = RaftPeerId.valueOf(request.getRequestorId()); + final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(request.getRaftGroupId()); + + CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), leaderId, previous, r); + + assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); + if (!startComplete.get()) { + throw new ServerNotReadyException(getMemberId() + ": The server role is not yet initialized."); + } + assertGroup(leaderId, leaderGroupId); + validateEntries(r.getLeaderTerm(), previous, r.getEntriesList()); + + return appendEntriesAsync(leaderId, request.getCallId(), previous, requestRef); } catch(Exception t) { - LOG.error("{}: Failed appendEntriesAsync {}", getMemberId(), r, t); - throw t; + LOG.error("{}: Failed appendEntries* {}", getMemberId(), toAppendEntriesRequestString(r), t); + throw IOUtils.asIOException(t); } finally { requestRef.release(); } @@ -1540,24 +1551,6 @@ Optional updateLastRpcTime(FollowerState.UpdateType updateType) { } } - private void preAppendEntriesAsync(RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm, - TermIndex previous, long leaderCommit, boolean initializing, List entries) throws IOException { - CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), - leaderId, leaderTerm, previous, leaderCommit, initializing, entries); - - assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); - if (!startComplete.get()) { - throw new ServerNotReadyException(getMemberId() + ": The server role is not yet initialized."); - } - assertGroup(leaderId, leaderGroupId); - - try { - validateEntries(leaderTerm, previous, entries); - } catch (IllegalArgumentException e) { - throw new IOException(e); - } - } - private long updateCommitInfoCache() { return commitInfoCache.update(getId(), state.getLog().getLastCommittedIndex()); } @@ -1566,19 +1559,15 @@ ExecutorService getServerExecutor() { return serverExecutor; } - @SuppressWarnings("checkstyle:parameternumber") - private CompletableFuture appendEntriesAsync( - RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing, - List commitInfos, List entries, - ReferenceCountedObject requestRef) throws IOException { + private CompletableFuture appendEntriesAsync(RaftPeerId leaderId, long callId, + TermIndex previous, ReferenceCountedObject requestRef) throws IOException { + final AppendEntriesRequestProto proto = requestRef.get(); + final List entries = proto.getEntriesList(); final boolean isHeartbeat = entries.isEmpty(); - logAppendEntries(isHeartbeat, - () -> getMemberId() + ": receive appendEntries(" + leaderId + ", " + leaderTerm + ", " - + previous + ", " + leaderCommit + ", " + initializing - + ", commits:" + ProtoUtils.toString(commitInfos) - + ", cId:" + callId - + ", entries: " + LogProtoUtils.toLogEntriesString(entries)); + logAppendEntries(isHeartbeat, () -> getMemberId() + ": appendEntries* " + + toAppendEntriesRequestString(proto)); + final long leaderTerm = proto.getLeaderTerm(); final long currentTerm; final long followerCommit = state.getLog().getLastCommittedIndex(); final Optional followerState; @@ -1586,17 +1575,12 @@ private CompletableFuture appendEntriesAsync( synchronized (this) { // Check life cycle state again to avoid the PAUSING/PAUSED state. assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); - final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); + final boolean recognized = state.recognizeLeader(Op.APPEND_ENTRIES, leaderId, leaderTerm); if (!recognized) { - final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( + return CompletableFuture.completedFuture(toAppendEntriesReplyProto( leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(), - AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} reply: {}", - getMemberId(), leaderId, leaderTerm, state, ServerStringUtils.toAppendEntriesReplyString(reply)); - } - return CompletableFuture.completedFuture(reply); + AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat)); } try { changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries"); @@ -1605,7 +1589,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(), } state.setLeader(leaderId, "appendEntries"); - if (!initializing && lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) { + if (!proto.getInitializing() && lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) { role.startFollowerState(this, Op.APPEND_ENTRIES); } followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START); @@ -1617,12 +1601,14 @@ leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(), // 3. There is a gap between the local log and the entries // In any of these scenarios, we should return an INCONSISTENCY reply // back to leader so that the leader can update this follower's next index. - - AppendEntriesReplyProto inconsistencyReply = checkInconsistentAppendEntries( - leaderId, currentTerm, followerCommit, previous, callId, isHeartbeat, entries); - if (inconsistencyReply != null) { + final long inconsistencyReplyNextIndex = checkInconsistentAppendEntries(previous, entries); + if (inconsistencyReplyNextIndex > RaftLog.INVALID_LOG_INDEX) { + final AppendEntriesReplyProto reply = toAppendEntriesReplyProto( + leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextIndex, + AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat); + LOG.info("{}: appendEntries* reply {}", getMemberId(), toAppendEntriesReplyString(reply)); followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); - return CompletableFuture.completedFuture(inconsistencyReply); + return CompletableFuture.completedFuture(reply); } state.updateConfiguration(entries); @@ -1631,7 +1617,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(), final List> futures = entries.isEmpty() ? Collections.emptyList() : state.getLog().append(requestRef.delegate(entries)); - commitInfos.forEach(commitInfoCache::update); + proto.getCommitInfosList().forEach(commitInfoCache::update); CodeInjectionForTesting.execute(LOG_SYNC, getId(), null); if (!isHeartbeat) { @@ -1641,49 +1627,27 @@ leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(), stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, installedIndex, getPeer()); } } - return JavaUtils.allOf(futures).whenCompleteAsync( - (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)), - serverExecutor - ).thenApply(v -> { - final AppendEntriesReplyProto reply; - synchronized(this) { - final long commitIndex = ServerImplUtils.effectiveCommitIndex(leaderCommit, previous, entries.size()); - state.updateCommitIndex(commitIndex, currentTerm, false); + + final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size()); + final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex(); + return JavaUtils.allOf(futures).whenCompleteAsync((r, t) -> { + followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); + timer.stop(); + }, getServerExecutor()).thenApply(v -> { + final boolean updated = state.updateCommitIndex(commitIndex, currentTerm, false); + if (updated) { updateCommitInfoCache(); - final long n; - final long matchIndex; - if (!isHeartbeat) { - LogEntryProto requestLastEntry = entries.get(entries.size() - 1); - n = requestLastEntry.getIndex() + 1; - matchIndex = requestLastEntry.getIndex(); - } else { - n = state.getLog().getNextIndex(); - matchIndex = RaftLog.INVALID_LOG_INDEX; - } - reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getMemberId(), currentTerm, - state.getLog().getLastCommittedIndex(), n, AppendResult.SUCCESS, callId, matchIndex, isHeartbeat); } - logAppendEntries(isHeartbeat, () -> getMemberId() + ": succeeded to handle AppendEntries. Reply: " - + ServerStringUtils.toAppendEntriesReplyString(reply)); - timer.stop(); // TODO: future never completes exceptionally? + final long nextIndex = isHeartbeat? state.getNextIndex(): matchIndex + 1; + final AppendEntriesReplyProto reply = toAppendEntriesReplyProto(leaderId, getMemberId(), + currentTerm, updated? commitIndex : state.getLog().getLastCommittedIndex(), + nextIndex, AppendResult.SUCCESS, callId, matchIndex, isHeartbeat); + logAppendEntries(isHeartbeat, () -> getMemberId() + + ": appendEntries* reply " + toAppendEntriesReplyString(reply)); return reply; }); } - private AppendEntriesReplyProto checkInconsistentAppendEntries(RaftPeerId leaderId, long currentTerm, - long followerCommit, TermIndex previous, long callId, boolean isHeartbeat, List entries) { - final long replyNextIndex = checkInconsistentAppendEntries(previous, entries); - if (replyNextIndex == -1) { - return null; - } - - final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( - leaderId, getMemberId(), currentTerm, followerCommit, replyNextIndex, - AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat); - LOG.info("{}: inconsistency entries. Reply:{}", getMemberId(), ServerStringUtils.toAppendEntriesReplyString(reply)); - return reply; - } - private long checkInconsistentAppendEntries(TermIndex previous, List entries) { // Check if a snapshot installation through state machine is in progress. final long installSnapshot = snapshotInstallationHandler.getInProgressInstallSnapshotIndex(); @@ -1714,7 +1678,7 @@ private long checkInconsistentAppendEntries(TermIndex previous, List current || curLeaderId == null) { - // If the request indicates a term that is greater than the current term - // or no leader has been set for the current term, make sure to update - // leader and term later - return true; + if (peerTerm == current && curLeaderId != null && !curLeaderId.equals(peerId)) { + LOG.warn("{}: Failed to recognize {} as leader for {} since current leader is {} (peerTerm = currentTerm = {})", + getMemberId(), peerId, op, curLeaderId, current); + return false; } - return curLeaderId.equals(peerLeaderId); + return true; } static int compareLog(TermIndex lastEntry, TermIndex candidateLastEntry) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 9794314b83..7aae944a43 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -32,6 +32,7 @@ import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.server.util.ServerStringUtils; @@ -49,6 +50,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.ratis.server.impl.ServerProtoUtils.toInstallSnapshotReplyProto; +import static org.apache.ratis.server.impl.ServerProtoUtils.toServerRpcProto; import static org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX; class SnapshotInstallationHandler { @@ -142,7 +145,7 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt } // There is a mismatch between configurations on leader and follower. - final InstallSnapshotReplyProto failedReply = ServerProtoUtils.toInstallSnapshotReplyProto( + final InstallSnapshotReplyProto failedReply = toInstallSnapshotReplyProto( leaderId, getMemberId(), state.getCurrentTerm(), InstallSnapshotResult.CONF_MISMATCH); LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}", getMemberId(), RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY, @@ -158,13 +161,11 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex()); final long lastIncludedIndex = lastIncluded.getIndex(); synchronized (server) { - final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); + final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); if (!recognized) { - final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), + return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); - LOG.warn("{}: Failed to recognize leader for installSnapshot chunk.", getMemberId()); - return reply; } server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); state.setLeader(leaderId, "installSnapshot"); @@ -193,7 +194,7 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest if (snapshotChunkRequest.getDone()) { LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex); } - return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), + return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS); } @@ -205,13 +206,11 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( request.getNotification().getFirstAvailableTermIndex()); final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex(); synchronized (server) { - final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); + final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); if (!recognized) { - final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), + return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.NOT_LEADER); - LOG.warn("{}: Failed to recognize leader for installSnapshot notification.", getMemberId()); - return reply; } server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); state.setLeader(leaderId, "installSnapshot"); @@ -229,7 +228,7 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(), InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); - return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, + return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); } @@ -307,7 +306,7 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer()); - return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), + return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE); } @@ -325,7 +324,7 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer()); installedIndex.set(latestInstalledIndex); - return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), + return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex()); } @@ -334,7 +333,7 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(), InstallSnapshotResult.IN_PROGRESS); } - return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), + return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.IN_PROGRESS); } } @@ -342,7 +341,7 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( private RoleInfoProto getRoleInfoProto(RaftPeer leader) { final RoleInfo role = server.getRole(); final Optional fs = role.getFollowerState(); - final ServerRpcProto leaderInfo = ServerProtoUtils.toServerRpcProto(leader, + final ServerRpcProto leaderInfo = toServerRpcProto(leader, fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L)); final FollowerInfoProto.Builder followerInfo = FollowerInfoProto.newBuilder() .setLeaderInfo(leaderInfo) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java index 25223c0f4d..284664d012 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java @@ -21,16 +21,19 @@ import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.util.ProtoUtils; +import java.util.List; + /** * This class provides convenient utilities for converting Protocol Buffers messages to strings. * The output strings are for information purpose only. * They are concise and compact compared to the Protocol Buffers implementations of {@link Object#toString()}. - * + *

* The output messages or the output formats may be changed without notice. * Callers of this class should not try to parse the output strings for any purposes. * Instead, they should use the public APIs provided by Protocol Buffers. @@ -42,12 +45,13 @@ public static String toAppendEntriesRequestString(AppendEntriesRequestProto requ if (request == null) { return null; } + final List entries = request.getEntriesList(); return ProtoUtils.toString(request.getServerRequest()) + "-t" + request.getLeaderTerm() + ",previous=" + TermIndex.valueOf(request.getPreviousLog()) + ",leaderCommit=" + request.getLeaderCommit() + ",initializing? " + request.getInitializing() - + ",entries: " + LogProtoUtils.toLogEntriesShortString(request.getEntriesList()); + + "," + (entries.isEmpty()? "HEARTBEAT" : "entries: " + LogProtoUtils.toLogEntriesShortString(entries)); } public static String toAppendEntriesReplyString(AppendEntriesReplyProto reply) {