Skip to content

Commit

Permalink
RATIS-2024. Refactor appendEntries code. (apache#1040)
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo authored Feb 6, 2024
1 parent 775b286 commit 1d99921
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@
import org.apache.ratis.util.Timestamp;
import org.apache.ratis.util.function.CheckedSupplier;

import static org.apache.ratis.server.impl.ServerImplUtils.effectiveCommitIndex;
import static org.apache.ratis.server.impl.ServerProtoUtils.toAppendEntriesReplyProto;
import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesReplyString;
import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesRequestString;

class RaftServerImpl implements RaftServer.Division,
RaftServerProtocol, RaftServerAsynchronousProtocol,
RaftClientProtocol, RaftClientAsynchronousProtocol {
Expand Down Expand Up @@ -1483,18 +1488,24 @@ public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws IOException {
final AppendEntriesRequestProto r = requestRef.retain();
final RaftRpcRequestProto request = r.getServerRequest();
final List<LogEntryProto> entries = r.getEntriesList();
final TermIndex previous = r.hasPreviousLog()? TermIndex.valueOf(r.getPreviousLog()) : null;
final RaftPeerId requestorId = RaftPeerId.valueOf(request.getRequestorId());

try {
preAppendEntriesAsync(requestorId, ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
previous, r.getLeaderCommit(), r.getInitializing(), entries);
return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous, r.getLeaderCommit(),
request.getCallId(), r.getInitializing(), r.getCommitInfosList(), entries, requestRef);
final RaftPeerId leaderId = RaftPeerId.valueOf(request.getRequestorId());
final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(request.getRaftGroupId());

CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), leaderId, previous, r);

assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
if (!startComplete.get()) {
throw new ServerNotReadyException(getMemberId() + ": The server role is not yet initialized.");
}
assertGroup(leaderId, leaderGroupId);
validateEntries(r.getLeaderTerm(), previous, r.getEntriesList());

return appendEntriesAsync(leaderId, request.getCallId(), previous, requestRef);
} catch(Exception t) {
LOG.error("{}: Failed appendEntriesAsync {}", getMemberId(), r, t);
throw t;
LOG.error("{}: Failed appendEntries* {}", getMemberId(), toAppendEntriesRequestString(r), t);
throw IOUtils.asIOException(t);
} finally {
requestRef.release();
}
Expand Down Expand Up @@ -1540,24 +1551,6 @@ Optional<FollowerState> updateLastRpcTime(FollowerState.UpdateType updateType) {
}
}

private void preAppendEntriesAsync(RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm,
TermIndex previous, long leaderCommit, boolean initializing, List<LogEntryProto> entries) throws IOException {
CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
leaderId, leaderTerm, previous, leaderCommit, initializing, entries);

assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
if (!startComplete.get()) {
throw new ServerNotReadyException(getMemberId() + ": The server role is not yet initialized.");
}
assertGroup(leaderId, leaderGroupId);

try {
validateEntries(leaderTerm, previous, entries);
} catch (IllegalArgumentException e) {
throw new IOException(e);
}
}

private long updateCommitInfoCache() {
return commitInfoCache.update(getId(), state.getLog().getLastCommittedIndex());
}
Expand All @@ -1566,37 +1559,28 @@ ExecutorService getServerExecutor() {
return serverExecutor;
}

@SuppressWarnings("checkstyle:parameternumber")
private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing,
List<CommitInfoProto> commitInfos, List<LogEntryProto> entries,
ReferenceCountedObject<?> requestRef) throws IOException {
private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(RaftPeerId leaderId, long callId,
TermIndex previous, ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws IOException {
final AppendEntriesRequestProto proto = requestRef.get();
final List<LogEntryProto> entries = proto.getEntriesList();
final boolean isHeartbeat = entries.isEmpty();
logAppendEntries(isHeartbeat,
() -> getMemberId() + ": receive appendEntries(" + leaderId + ", " + leaderTerm + ", "
+ previous + ", " + leaderCommit + ", " + initializing
+ ", commits:" + ProtoUtils.toString(commitInfos)
+ ", cId:" + callId
+ ", entries: " + LogProtoUtils.toLogEntriesString(entries));
logAppendEntries(isHeartbeat, () -> getMemberId() + ": appendEntries* "
+ toAppendEntriesRequestString(proto));

final long leaderTerm = proto.getLeaderTerm();
final long currentTerm;
final long followerCommit = state.getLog().getLastCommittedIndex();
final Optional<FollowerState> followerState;
final Timekeeper.Context timer = raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time();
synchronized (this) {
// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
final boolean recognized = state.recognizeLeader(Op.APPEND_ENTRIES, leaderId, leaderTerm);
if (!recognized) {
final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
return CompletableFuture.completedFuture(toAppendEntriesReplyProto(
leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(),
AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} reply: {}",
getMemberId(), leaderId, leaderTerm, state, ServerStringUtils.toAppendEntriesReplyString(reply));
}
return CompletableFuture.completedFuture(reply);
AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat));
}
try {
changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries");
Expand All @@ -1605,7 +1589,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(),
}
state.setLeader(leaderId, "appendEntries");

if (!initializing && lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) {
if (!proto.getInitializing() && lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) {
role.startFollowerState(this, Op.APPEND_ENTRIES);
}
followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START);
Expand All @@ -1617,12 +1601,14 @@ leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(),
// 3. There is a gap between the local log and the entries
// In any of these scenarios, we should return an INCONSISTENCY reply
// back to leader so that the leader can update this follower's next index.

AppendEntriesReplyProto inconsistencyReply = checkInconsistentAppendEntries(
leaderId, currentTerm, followerCommit, previous, callId, isHeartbeat, entries);
if (inconsistencyReply != null) {
final long inconsistencyReplyNextIndex = checkInconsistentAppendEntries(previous, entries);
if (inconsistencyReplyNextIndex > RaftLog.INVALID_LOG_INDEX) {
final AppendEntriesReplyProto reply = toAppendEntriesReplyProto(
leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextIndex,
AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat);
LOG.info("{}: appendEntries* reply {}", getMemberId(), toAppendEntriesReplyString(reply));
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
return CompletableFuture.completedFuture(inconsistencyReply);
return CompletableFuture.completedFuture(reply);
}

state.updateConfiguration(entries);
Expand All @@ -1631,7 +1617,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(),

final List<CompletableFuture<Long>> futures = entries.isEmpty() ? Collections.emptyList()
: state.getLog().append(requestRef.delegate(entries));
commitInfos.forEach(commitInfoCache::update);
proto.getCommitInfosList().forEach(commitInfoCache::update);

CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
if (!isHeartbeat) {
Expand All @@ -1641,49 +1627,27 @@ leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(),
stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, installedIndex, getPeer());
}
}
return JavaUtils.allOf(futures).whenCompleteAsync(
(r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)),
serverExecutor
).thenApply(v -> {
final AppendEntriesReplyProto reply;
synchronized(this) {
final long commitIndex = ServerImplUtils.effectiveCommitIndex(leaderCommit, previous, entries.size());
state.updateCommitIndex(commitIndex, currentTerm, false);

final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size());
final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex();
return JavaUtils.allOf(futures).whenCompleteAsync((r, t) -> {
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
timer.stop();
}, getServerExecutor()).thenApply(v -> {
final boolean updated = state.updateCommitIndex(commitIndex, currentTerm, false);
if (updated) {
updateCommitInfoCache();
final long n;
final long matchIndex;
if (!isHeartbeat) {
LogEntryProto requestLastEntry = entries.get(entries.size() - 1);
n = requestLastEntry.getIndex() + 1;
matchIndex = requestLastEntry.getIndex();
} else {
n = state.getLog().getNextIndex();
matchIndex = RaftLog.INVALID_LOG_INDEX;
}
reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getMemberId(), currentTerm,
state.getLog().getLastCommittedIndex(), n, AppendResult.SUCCESS, callId, matchIndex, isHeartbeat);
}
logAppendEntries(isHeartbeat, () -> getMemberId() + ": succeeded to handle AppendEntries. Reply: "
+ ServerStringUtils.toAppendEntriesReplyString(reply));
timer.stop(); // TODO: future never completes exceptionally?
final long nextIndex = isHeartbeat? state.getNextIndex(): matchIndex + 1;
final AppendEntriesReplyProto reply = toAppendEntriesReplyProto(leaderId, getMemberId(),
currentTerm, updated? commitIndex : state.getLog().getLastCommittedIndex(),
nextIndex, AppendResult.SUCCESS, callId, matchIndex, isHeartbeat);
logAppendEntries(isHeartbeat, () -> getMemberId()
+ ": appendEntries* reply " + toAppendEntriesReplyString(reply));
return reply;
});
}

private AppendEntriesReplyProto checkInconsistentAppendEntries(RaftPeerId leaderId, long currentTerm,
long followerCommit, TermIndex previous, long callId, boolean isHeartbeat, List<LogEntryProto> entries) {
final long replyNextIndex = checkInconsistentAppendEntries(previous, entries);
if (replyNextIndex == -1) {
return null;
}

final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
leaderId, getMemberId(), currentTerm, followerCommit, replyNextIndex,
AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat);
LOG.info("{}: inconsistency entries. Reply:{}", getMemberId(), ServerStringUtils.toAppendEntriesReplyString(reply));
return reply;
}

private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryProto> entries) {
// Check if a snapshot installation through state machine is in progress.
final long installSnapshot = snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
Expand Down Expand Up @@ -1714,7 +1678,7 @@ private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryPro
return replyNextIndex;
}

return -1;
return RaftLog.INVALID_LOG_INDEX;
}

@Override
Expand Down Expand Up @@ -1762,30 +1726,25 @@ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequ
final RaftRpcRequestProto r = request.getServerRequest();
final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId());
final TermIndex leaderLastEntry = TermIndex.valueOf(request.getLeaderLastEntry());

CodeInjectionForTesting.execute(START_LEADER_ELECTION, getId(), leaderId, request);

LOG.debug("{}: receive startLeaderElection from:{}, leaderLastEntry:{},",
getMemberId(), leaderId, request.getLeaderLastEntry());
if (!request.hasLeaderLastEntry()) {
// It should have a leaderLastEntry since there is a placeHolder entry.
LOG.warn("{}: leaderLastEntry is missing in {}", getMemberId(), request);
return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}

final TermIndex leaderLastEntry = TermIndex.valueOf(request.getLeaderLastEntry());
LOG.debug("{}: receive startLeaderElection from {} with lastEntry {}", getMemberId(), leaderId, leaderLastEntry);

assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(leaderId, leaderGroupId);

synchronized (this) {
// leaderLastEntry should not be null because LeaderStateImpl#start append a placeHolder entry
// so leader at each term should has at least one entry
if (leaderLastEntry == null) {
LOG.warn("{}: receive null leaderLastEntry which is unexpected", getMemberId());
return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}

// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
final boolean recognized = state.recognizeLeader(leaderId, leaderLastEntry.getTerm());
final boolean recognized = state.recognizeLeader("startLeaderElection", leaderId, leaderLastEntry.getTerm());
if (!recognized) {
LOG.warn("{}: Not recognize {} (term={}) as leader, state: {}",
getMemberId(), leaderId, leaderLastEntry.getTerm(), state);
return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,24 +321,21 @@ void appendLog(TransactionContext operation) throws StateMachineException {
Objects.requireNonNull(operation.getLogEntry());
}

/**
* Check if accept the leader selfId and term from the incoming AppendEntries rpc.
* If accept, update the current state.
* @return true if the check passes
*/
boolean recognizeLeader(RaftPeerId peerLeaderId, long leaderTerm) {
/** @return true iff the given peer id is recognized as the leader. */
boolean recognizeLeader(Object op, RaftPeerId peerId, long peerTerm) {
final long current = currentTerm.get();
if (leaderTerm < current) {
if (peerTerm < current) {
LOG.warn("{}: Failed to recognize {} as leader for {} since peerTerm = {} < currentTerm = {}",
getMemberId(), peerId, op, peerTerm, current);
return false;
}
final RaftPeerId curLeaderId = getLeaderId();
if (leaderTerm > current || curLeaderId == null) {
// If the request indicates a term that is greater than the current term
// or no leader has been set for the current term, make sure to update
// leader and term later
return true;
if (peerTerm == current && curLeaderId != null && !curLeaderId.equals(peerId)) {
LOG.warn("{}: Failed to recognize {} as leader for {} since current leader is {} (peerTerm = currentTerm = {})",
getMemberId(), peerId, op, curLeaderId, current);
return false;
}
return curLeaderId.equals(peerLeaderId);
return true;
}

static int compareLog(TermIndex lastEntry, TermIndex candidateLastEntry) {
Expand Down
Loading

0 comments on commit 1d99921

Please sign in to comment.