Skip to content

Commit

Permalink
Move committable indices to state, to faciliate tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
achamayou committed Oct 30, 2023
1 parent af21407 commit 216263e
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 48 deletions.
7 changes: 6 additions & 1 deletion src/consensus/aft/impl/state.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "consensus/aft/raft_types.h"
#include "kv/kv_types.h"

#include <deque>
#include <map>
#include <set>

Expand Down Expand Up @@ -160,6 +161,9 @@ namespace aft
ViewHistory view_history;
kv::Version new_view_idx = 0;

// Indices that are eligible for global commit, from a Node's perspective
std::deque<Index> committable_indices;

// Replicas start in leadership state Follower. Apart from a single forced
// transition from Follower to Leader on the initial node at startup,
// the state machine is made up of the following transitions:
Expand All @@ -184,5 +188,6 @@ namespace aft
commit_idx,
new_view_idx,
leadership_state,
membership_state);
membership_state,
committable_indices);
}
45 changes: 14 additions & 31 deletions src/consensus/aft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "service/tables/signatures.h"

#include <algorithm>
#include <deque>
#include <list>
#include <random>
#include <unordered_map>
Expand Down Expand Up @@ -169,9 +168,6 @@ namespace aft
static constexpr int batch_window_size = 100;
int batch_window_sum = 0;

// Indices that are eligible for global commit, from a Node's perspective
std::deque<Index> committable_indices;

// When this is set, only public domain is deserialised when receiving
// append entries
bool public_only = false;
Expand Down Expand Up @@ -303,8 +299,9 @@ namespace aft

Index last_committable_index() const
{
return committable_indices.empty() ? state->commit_idx :
committable_indices.back();
return state->committable_indices.empty() ?
state->commit_idx :
state->committable_indices.back();
}

// Returns the highest committable index which is not greater than the
Expand All @@ -313,11 +310,11 @@ namespace aft
Index idx) const
{
const auto it = std::upper_bound(
committable_indices.rbegin(),
committable_indices.rend(),
state->committable_indices.rbegin(),
state->committable_indices.rend(),
idx,
[](const auto& l, const auto& r) { return l >= r; });
if (it == committable_indices.rend())
if (it == state->committable_indices.rend())
{
return std::nullopt;
}
Expand All @@ -327,10 +324,10 @@ namespace aft

void compact_committable_indices(Index idx)
{
while (!committable_indices.empty() &&
(committable_indices.front() <= idx))
while (!state->committable_indices.empty() &&
(state->committable_indices.front() <= idx))
{
committable_indices.pop_front();
state->committable_indices.pop_front();
}
}

Expand Down Expand Up @@ -461,7 +458,6 @@ namespace aft
j["state"] = *state;
j["configurations"] = configurations;
j["new_configuration"] = Configuration{idx, conf, idx};
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif

Expand Down Expand Up @@ -601,7 +597,6 @@ namespace aft
j["view"] = term;
j["seqno"] = index;
j["globally_committable"] = globally_committable;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif

Expand All @@ -622,7 +617,7 @@ namespace aft
{
become_retired(index, kv::RetirementPhase::Signed);
}
committable_indices.push_back(index);
state->committable_indices.push_back(index);
start_ticking_if_necessary();

// Reset should_sign here - whenever we see a committable entry we
Expand Down Expand Up @@ -971,7 +966,6 @@ namespace aft
j["to_node_id"] = to;
j["match_idx"] = node.match_idx;
j["sent_idx"] = node.sent_idx;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif

Expand Down Expand Up @@ -1010,7 +1004,6 @@ namespace aft
j["packet"] = r;
j["state"] = *state;
j["from_node_id"] = from;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif

Expand Down Expand Up @@ -1218,7 +1211,6 @@ namespace aft
j["function"] = "execute_append_entries_sync";
j["state"] = *state;
j["from_node_id"] = from;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif

Expand Down Expand Up @@ -1270,7 +1262,7 @@ namespace aft
{
become_retired(i, kv::RetirementPhase::Signed);
}
committable_indices.push_back(i);
state->committable_indices.push_back(i);

if (ds->get_term())
{
Expand Down Expand Up @@ -1375,7 +1367,6 @@ namespace aft
j["packet"] = response;
j["state"] = *state;
j["to_node_id"] = to;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif

Expand Down Expand Up @@ -1417,7 +1408,6 @@ namespace aft
j["from_node_id"] = from;
j["match_idx"] = node->second.match_idx;
j["sent_idx"] = node->second.sent_idx;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif

Expand Down Expand Up @@ -1534,7 +1524,6 @@ namespace aft
j["packet"] = rv;
j["state"] = *state;
j["to_node_id"] = to;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif

Expand All @@ -1558,7 +1547,6 @@ namespace aft
j["packet"] = r;
j["state"] = *state;
j["from_node_id"] = from;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif

Expand Down Expand Up @@ -1669,7 +1657,6 @@ namespace aft
j["packet"] = r;
j["state"] = *state;
j["from_node_id"] = from;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif

Expand Down Expand Up @@ -1745,7 +1732,6 @@ namespace aft
j["packet"] = r;
j["state"] = *state;
j["from_node_id"] = from;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif
if (can_endorse_primary() && ticking && r.term == state->current_view)
Expand Down Expand Up @@ -1809,7 +1795,6 @@ namespace aft
j["function"] = "become_candidate";
j["state"] = *state;
j["configurations"] = configurations;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif

Expand Down Expand Up @@ -1863,7 +1848,6 @@ namespace aft
j["function"] = "become_leader";
j["state"] = *state;
j["configurations"] = configurations;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif

Expand Down Expand Up @@ -1920,7 +1904,6 @@ namespace aft
j["function"] = "become_follower";
j["state"] = *state;
j["configurations"] = configurations;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif
}
Expand Down Expand Up @@ -2228,7 +2211,6 @@ namespace aft
j["function"] = "commit";
j["state"] = *state;
j["configurations"] = configurations;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif

Expand Down Expand Up @@ -2313,9 +2295,10 @@ namespace aft

state->view_history.rollback(idx);

while (!committable_indices.empty() && (committable_indices.back() > idx))
while (!state->committable_indices.empty() &&
(state->committable_indices.back() > idx))
{
committable_indices.pop_back();
state->committable_indices.pop_back();
}

if (
Expand Down
32 changes: 16 additions & 16 deletions tla/consensus/Traceccfraft.tla
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,21 @@ IsTimeout ==
/\ IsEvent("become_candidate")
/\ logline.msg.state.leadership_state = "Candidate"
/\ Timeout(logline.msg.state.node_id)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.state.committable_indices)

IsBecomeLeader ==
/\ IsEvent("become_leader")
/\ logline.msg.state.leadership_state = "Leader"
/\ BecomeLeader(logline.msg.state.node_id)
/\ committableIndices'[logline.msg.state.node_id] = Range(logline.msg.committable_indices)
/\ committableIndices'[logline.msg.state.node_id] = Range(logline.msg.state.committable_indices)

IsClientRequest ==
/\ IsEvent("replicate")
/\ ~logline.msg.globally_committable
/\ ClientRequest(logline.msg.state.node_id)
\* TODO Consider creating a mapping from clientRequests to actual values in the system trace.
\* TODO Alternatively, extract the written values from the system trace and redefine clientRequests at startup.
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.state.committable_indices)

IsSendAppendEntries ==
/\ IsEvent("send_append_entries")
Expand All @@ -187,7 +187,7 @@ IsSendAppendEntries ==
/\ Network!OneMoreMessage(msg)
/\ logline.msg.sent_idx + 1 = nextIndex[i][j]
/\ logline.msg.match_idx = matchIndex[i][j]
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.state.committable_indices)

IsRcvAppendEntriesRequest ==
/\ IsEvent("recv_append_entries")
Expand All @@ -205,14 +205,14 @@ IsSendAppendEntriesResponse ==
\* Skip saer because ccfraft!HandleAppendEntriesRequest atomcially handles the request and sends the response.
\* Find a similar pattern in Traceccfraft!IsRcvRequestVoteRequest below.
/\ IsEvent("send_append_entries_response")
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.state.committable_indices)
/\ UNCHANGED vars

IsAddConfiguration ==
/\ IsEvent("add_configuration")
/\ state[logline.msg.state.node_id] = Follower
/\ UNCHANGED vars
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.state.committable_indices)

IsSignCommittableMessages ==
/\ IsEvent("replicate")
Expand All @@ -224,7 +224,7 @@ IsSignCommittableMessages ==
\* which is not the case if the logs ends after this "replicate" line. If it does not end,
\* the subsequent send_append_entries will assert the effect of SignCommittableMessages anyway.
\* Also see IsExecuteAppendEntries below.
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.state.committable_indices)

IsAdvanceCommitIndex ==
\* This is enabled *after* a SignCommittableMessages because ACI looks for a
Expand All @@ -234,7 +234,7 @@ IsAdvanceCommitIndex ==
/\ LET i == logline.msg.state.node_id
IN /\ AdvanceCommitIndex(i)
/\ commitIndex'[i] = logline.msg.state.commit_idx
/\ committableIndices'[i] = Range(logline.msg.committable_indices)
/\ committableIndices'[i] = Range(logline.msg.state.committable_indices)
\/ /\ IsEvent("commit")
/\ logline.msg.state.leadership_state = "Follower"
/\ UNCHANGED vars
Expand All @@ -245,7 +245,7 @@ IsChangeConfiguration ==
/\ LET i == logline.msg.state.node_id
newConfiguration == DOMAIN logline.msg.new_configuration.nodes
IN ChangeConfigurationInt(i, newConfiguration)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.state.committable_indices)

IsRcvAppendEntriesResponse ==
/\ IsEvent("recv_append_entries_response")
Expand All @@ -259,7 +259,7 @@ IsRcvAppendEntriesResponse ==
\/ UpdateTerm(i, j, m) \cdot HandleAppendEntriesResponse(i, j, m)
\/ UpdateTerm(i, j, m) \cdot DropResponseWhenNotInState(i, j, m)
\/ DropResponseWhenNotInState(i, j, m)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.state.committable_indices)

IsSendRequestVote ==
/\ IsEvent("send_request_vote")
Expand All @@ -274,7 +274,7 @@ IsSendRequestVote ==
/\ m.lastCommittableTerm = logline.msg.packet.term_of_last_committable_idx
\* There is now one more message of this type.
/\ Network!OneMoreMessage(m)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.state.committable_indices)

IsRcvRequestVoteRequest ==
\/ /\ IsEvent("recv_request_vote")
Expand All @@ -292,7 +292,7 @@ IsRcvRequestVoteRequest ==
\* a (ccfraft!UpdateTerm \cdot ccfraft!HandleRequestVoteRequest) step.
\* (see https://github.com/microsoft/CCF/issues/5057#issuecomment-1487279316)
\/ UpdateTerm(i, j, m) \cdot HandleRequestVoteRequest(i, j, m)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.state.committable_indices)

IsExecuteAppendEntries ==
\* Skip append because ccfraft!HandleRequestVoteRequest atomcially handles the request, sends the response,
Expand All @@ -318,20 +318,20 @@ IsRcvRequestVoteResponse ==
\/ UpdateTerm(i, j, m) \cdot HandleRequestVoteResponse(i, j, m)
\/ UpdateTerm(i, j, m) \cdot DropResponseWhenNotInState(i, j, m)
\/ DropResponseWhenNotInState(i, j, m)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.state.committable_indices)

IsBecomeFollower ==
/\ IsEvent("become_follower")
/\ state[logline.msg.state.node_id] \in {Follower}
/\ configurations[logline.msg.state.node_id] = ToConfigurations(logline.msg.configurations)
/\ UNCHANGED vars \* UNCHANGED implies that it doesn't matter if we prime the previous variables.
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.state.committable_indices)

IsCheckQuorum ==
/\ IsEvent("become_follower")
/\ state[logline.msg.state.node_id] = Leader
/\ CheckQuorum(logline.msg.state.node_id)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.state.committable_indices)

IsRcvProposeVoteRequest ==
/\ IsEvent("recv_propose_request_vote")
Expand All @@ -344,7 +344,7 @@ IsRcvProposeVoteRequest ==
/\ m.term = logline.msg.packet.term
\* There is now one more message of this type.
/\ Network!OneMoreMessage(m)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.state.committable_indices)

TraceNext ==
\/ IsTimeout
Expand Down

0 comments on commit 216263e

Please sign in to comment.