Skip to content

Commit

Permalink
Refactor: Introduce CommittedVote Type to Enhance Vote Handling
Browse files Browse the repository at this point in the history
This commit introduces a new type, `CommittedVote`, which represents a
committed vote. This enhancement is aimed at ensuring that only
committed votes are considered valid for the leader's status.

By using the `CommittedVote` type, we can eliminate several runtime
assertion checks previously required to verify the commitment status of
votes.
  • Loading branch information
drmingdrmer committed Jul 5, 2024
1 parent 9dbcd4c commit d62f2fd
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 66 deletions.
3 changes: 2 additions & 1 deletion openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Formatter;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -1507,7 +1508,7 @@ where
// - Otherwise, it is sent by a Candidate, we check against the current in progress voting state.
let my_vote = if sender_vote.is_committed() {
let l = self.engine.leader.as_ref();
l.map(|x| x.vote)
l.map(|x| *x.vote.deref())
} else {
// If it finished voting, Candidate's vote is None.
let candidate = self.engine.candidate_ref();
Expand Down
14 changes: 6 additions & 8 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,14 +754,12 @@ where C: RaftTypeConfig
Some(x) => x,
};

// This leader is not accepted by a quorum yet.
// Not a valid leader.
//
// Note that leading state is separated from local RaftState(which is used by the `Acceptor` part),
// and do not consider the vote in the local RaftState.
if !leader.vote.is_committed() {
return Err(self.state.forward_to_leader());
}
debug_assert!(
*leader.vote_ref() >= *self.state.vote_ref(),
"leader.vote({}) >= state.vote({})",
leader.vote_ref(),
self.state.vote_ref()
);

Ok(LeaderHandler {
config: &mut self.config,
Expand Down
21 changes: 1 addition & 20 deletions openraft/src/engine/handler/leader_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,29 +67,10 @@ where C: RaftTypeConfig
}
}

// TODO: In future implementations with asynchronous IO,
// ensure logs are not written until the vote is committed
// to maintain consistency.
// ---
// Currently, IO requests to `RaftLogStorage` are executed
// within the `RaftCore` task. This means an `AppendLog` request
// won't be submitted to `RaftLogStorage` until `save_vote()` completes,
// which ensures consistency.
// ---
// However, when `RaftLogStorage` is moved to a separate task,
// `RaftCore` will communicate with `RaftLogStorage` via a channel.
// This change could result in `AppendLog` requests being submitted
// before the previous `save_vote()` request is finished.
// ---
// This scenario creates a risk where a log entry becomes visible and
// is replicated by `ReplicationCore` to other nodes before the vote
// is flushed to disk. If the vote isn't flushed and the server restarts,
// the vote could revert to a previous state. This could allow a new leader
// to be elected with a smaller vote (term), breaking consistency.
self.output.push_command(Command::AppendInputEntries {
// A leader should always use the leader's vote.
// It is allowed to be different from local vote.
vote: self.leader.vote,
vote: *self.leader.vote,
entries,
});

Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ where C: RaftTypeConfig
// TODO: this is not gonna happen,
// because `self.leader`(previous `internal_server_state`)
// does not include Candidate any more.
l.vote = *self.state.vote_ref();
l.vote = self.state.vote_ref().into_committed();
self.server_state_handler().update_server_state_if_changed();
return;
}
Expand Down
5 changes: 2 additions & 3 deletions openraft/src/proposer/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,9 @@ where
pub(crate) fn into_leader(self) -> Leader<C, QS> {
// Mark the vote as committed, i.e., being granted and saved by a quorum.
let vote = {
let mut vote = *self.vote_ref();
let vote = *self.vote_ref();
debug_assert!(!vote.is_committed());
vote.commit();
vote
vote.into_committed()
};

let last_leader_log_ids = self.last_log_id().copied().into_iter().collect::<Vec<_>>();
Expand Down
39 changes: 19 additions & 20 deletions openraft/src/proposer/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::progress::VecProgress;
use crate::quorum::QuorumSet;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::LogIdOf;
use crate::vote::CommittedVote;
use crate::Instant;
use crate::LogId;
use crate::LogIdOptionExt;
Expand Down Expand Up @@ -36,7 +37,7 @@ where C: RaftTypeConfig
/// The vote this leader works in.
///
/// `self.voting` may be in progress requesting vote for a higher vote.
pub(crate) vote: Vote<C::NodeId>,
pub(crate) vote: CommittedVote<C>,

last_log_id: Option<LogIdOf<C>>,

Expand Down Expand Up @@ -66,28 +67,27 @@ where
///
/// `last_leader_log_id` is the first and last log id proposed by the last leader.
pub(crate) fn new(
vote: Vote<C::NodeId>,
vote: CommittedVote<C>,
quorum_set: QS,
learner_ids: impl IntoIterator<Item = C::NodeId>,
last_leader_log_id: &[LogIdOf<C>],
) -> Self {
debug_assert!(vote.is_committed());
debug_assert!(
Some(vote.committed_leader_id().unwrap()) >= last_leader_log_id.last().map(|x| *x.committed_leader_id()),
Some(vote.committed_leader_id()) >= last_leader_log_id.last().map(|x| *x.committed_leader_id()),
"vote {} must GE last_leader_log_id.last() {}",
vote,
last_leader_log_id.display()
);
debug_assert!(
Some(vote.committed_leader_id().unwrap()) >= last_leader_log_id.first().map(|x| *x.committed_leader_id()),
Some(vote.committed_leader_id()) >= last_leader_log_id.first().map(|x| *x.committed_leader_id()),
"vote {} must GE last_leader_log_id.first() {}",
vote,
last_leader_log_id.display()
);

let learner_ids = learner_ids.into_iter().collect::<Vec<_>>();

let vote_leader_id = vote.committed_leader_id().unwrap();
let vote_leader_id = vote.committed_leader_id();
let first = last_leader_log_id.first();

let noop_log_id = if first.map(|x| *x.committed_leader_id()) == Some(vote_leader_id) {
Expand All @@ -99,7 +99,7 @@ where
} else {
// Set to a log id that will be proposed.
Some(LogId::new(
vote.committed_leader_id().unwrap(),
vote.committed_leader_id(),
last_leader_log_id.last().next_index(),
))
};
Expand Down Expand Up @@ -146,8 +146,7 @@ where
&mut self,
entries: impl IntoIterator<Item = &'a mut LID>,
) {
debug_assert!(self.vote.is_committed());
let committed_leader_id = self.vote.committed_leader_id().unwrap();
let committed_leader_id = self.vote.committed_leader_id();

let first = LogId::new(committed_leader_id, self.last_log_id().next_index());
let mut last = first;
Expand Down Expand Up @@ -218,7 +217,7 @@ mod tests {
fn test_leader_new_with_proposed_log_id() {
tracing::info!("--- vote greater than last log id, create new noop_log_id");
{
let vote = Vote::new_committed(2, 2);
let vote = Vote::new(2, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 1), log_id(1, 2, 3)]);

assert_eq!(leader.noop_log_id(), Some(&log_id(2, 2, 4)));
Expand All @@ -227,7 +226,7 @@ mod tests {

tracing::info!("--- vote equals last log id, reuse noop_log_id");
{
let vote = Vote::new_committed(1, 2);
let vote = Vote::new(1, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 1), log_id(1, 2, 3)]);

assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 1)));
Expand All @@ -236,7 +235,7 @@ mod tests {

tracing::info!("--- vote equals last log id, reuse noop_log_id, last_leader_log_id.len()==1");
{
let vote = Vote::new_committed(1, 2);
let vote = Vote::new(1, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 3)]);

assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 3)));
Expand All @@ -245,7 +244,7 @@ mod tests {

tracing::info!("--- no last log ids, create new noop_log_id, last_leader_log_id.len()==0");
{
let vote = Vote::new_committed(1, 2);
let vote = Vote::new(1, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[]);

assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 0)));
Expand All @@ -255,7 +254,7 @@ mod tests {

#[test]
fn test_leader_established() {
let vote = Vote::new_committed(2, 2);
let vote = Vote::new(2, 2).into_committed();
let mut leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 3)]);

let mut entries = vec![Entry::<UTConfig>::new_blank(log_id(5, 5, 2))];
Expand All @@ -271,7 +270,7 @@ mod tests {

#[test]
fn test_1_entry_none_last_log_id() {
let vote = Vote::new_committed(0, 0);
let vote = Vote::new(0, 0).into_committed();
let mut leading = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[]);

let mut entries: Vec<Entry<UTConfig>> = vec![blank_ent(1, 1, 1)];
Expand All @@ -283,7 +282,7 @@ mod tests {

#[test]
fn test_no_entries_provided() {
let vote = Vote::new_committed(2, 2);
let vote = Vote::new(2, 2).into_committed();
let mut leading = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 1, 8)]);

let mut entries: Vec<Entry<UTConfig>> = vec![];
Expand All @@ -293,7 +292,7 @@ mod tests {

#[test]
fn test_multiple_entries() {
let vote = Vote::new_committed(2, 2);
let vote = Vote::new(2, 2).into_committed();
let mut leading = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], [], &[log_id(1, 1, 8)]);

let mut entries: Vec<Entry<UTConfig>> = vec![blank_ent(1, 1, 1), blank_ent(1, 1, 1), blank_ent(1, 1, 1)];
Expand All @@ -307,7 +306,7 @@ mod tests {

#[test]
fn test_leading_last_quorum_acked_time_leader_is_voter() {
let mut leading = Leader::<UTConfig, Vec<u64>>::new(Vote::new_committed(2, 1), vec![1, 2, 3], [4], &[]);
let mut leading = Leader::<UTConfig, Vec<u64>>::new(Vote::new(2, 1).into_committed(), vec![1, 2, 3], [4], &[]);

let now1 = UTConfig::<()>::now();

Expand All @@ -318,7 +317,7 @@ mod tests {

#[test]
fn test_leading_last_quorum_acked_time_leader_is_learner() {
let mut leading = Leader::<UTConfig, Vec<u64>>::new(Vote::new_committed(2, 4), vec![1, 2, 3], [4], &[]);
let mut leading = Leader::<UTConfig, Vec<u64>>::new(Vote::new(2, 4).into_committed(), vec![1, 2, 3], [4], &[]);

let t2 = UTConfig::<()>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
Expand All @@ -333,7 +332,7 @@ mod tests {

#[test]
fn test_leading_last_quorum_acked_time_leader_is_not_member() {
let mut leading = Leader::<UTConfig, Vec<u64>>::new(Vote::new_committed(2, 5), vec![1, 2, 3], [4], &[]);
let mut leading = Leader::<UTConfig, Vec<u64>>::new(Vote::new(2, 5).into_committed(), vec![1, 2, 3], [4], &[]);

let t2 = UTConfig::<()>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/raft_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ where C: RaftTypeConfig
let last_leader_log_ids = self.log_ids.by_last_leader();

Leader::new(
*self.vote_ref(),
self.vote_ref().into_committed(),
em.to_quorum_set(),
em.learner_ids(),
last_leader_log_ids,
Expand Down
5 changes: 3 additions & 2 deletions openraft/src/replication/replication_session_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::fmt::Display;
use std::fmt::Formatter;

use crate::display_ext::DisplayOptionExt;
use crate::vote::CommittedVote;
use crate::LogId;
use crate::RaftTypeConfig;
use crate::Vote;
Expand Down Expand Up @@ -33,7 +34,7 @@ pub(crate) struct ReplicationSessionId<C>
where C: RaftTypeConfig
{
/// The Leader or Candidate this replication belongs to.
pub(crate) vote: Vote<C::NodeId>,
pub(crate) vote: CommittedVote<C>,

/// The log id of the membership log this replication works for.
pub(crate) membership_log_id: Option<LogId<C::NodeId>>,
Expand All @@ -50,7 +51,7 @@ where C: RaftTypeConfig
impl<C> ReplicationSessionId<C>
where C: RaftTypeConfig
{
pub(crate) fn new(vote: Vote<C::NodeId>, membership_log_id: Option<LogId<C::NodeId>>) -> Self {
pub(crate) fn new(vote: CommittedVote<C>, membership_log_id: Option<LogId<C::NodeId>>) -> Self {
Self {
vote,
membership_log_id,
Expand Down
47 changes: 47 additions & 0 deletions openraft/src/vote/committed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::fmt;
use std::ops::Deref;

use crate::type_config::alias::NodeIdOf;
use crate::CommittedLeaderId;
use crate::RaftTypeConfig;
use crate::Vote;

/// Represents a committed Vote that has been accepted by a quorum.
///
/// The inner `Vote`'s attribute `committed` is always set to `true`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
pub(crate) struct CommittedVote<C>
where C: RaftTypeConfig
{
vote: Vote<NodeIdOf<C>>,
}

impl<C> CommittedVote<C>
where C: RaftTypeConfig
{
pub(crate) fn new(mut vote: Vote<NodeIdOf<C>>) -> Self {
vote.committed = true;
Self { vote }
}
pub(crate) fn committed_leader_id(&self) -> CommittedLeaderId<C::NodeId> {
self.leader_id().to_committed()
}
}

impl<C> Deref for CommittedVote<C>
where C: RaftTypeConfig
{
type Target = Vote<NodeIdOf<C>>;

fn deref(&self) -> &Self::Target {
&self.vote
}
}

impl<C> fmt::Display for CommittedVote<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.vote.fmt(f)
}
}
2 changes: 2 additions & 0 deletions openraft/src/vote/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub(crate) mod committed;
mod leader_id;
#[allow(clippy::module_inception)]
mod vote;

pub(crate) use committed::CommittedVote;
pub use leader_id::CommittedLeaderId;
pub use leader_id::LeaderId;
pub use vote::Vote;
19 changes: 9 additions & 10 deletions openraft/src/vote/vote.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::cmp::Ordering;
use std::fmt::Formatter;

use crate::vote::committed::CommittedVote;
use crate::vote::leader_id::CommittedLeaderId;
use crate::LeaderId;
use crate::NodeId;
use crate::RaftTypeConfig;

/// `Vote` represent the privilege of a node.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
Expand Down Expand Up @@ -68,10 +70,17 @@ impl<NID: NodeId> Vote<NID> {
}
}

#[deprecated(note = "use `into_committed()` instead", since = "0.10.0")]
pub fn commit(&mut self) {
self.committed = true
}

/// Convert this vote into a `CommittedVote`
pub(crate) fn into_committed<C>(self) -> CommittedVote<C>
where C: RaftTypeConfig<NodeId = NID> {
CommittedVote::new(self)
}

pub fn is_committed(&self) -> bool {
self.committed
}
Expand All @@ -83,16 +92,6 @@ impl<NID: NodeId> Vote<NID> {
&self.leader_id
}

/// Return a [`CommittedLeaderId`], which is granted by a quorum.
pub(crate) fn committed_leader_id(&self) -> Option<CommittedLeaderId<NID>> {
// Special case (term==0): when initializing the first log does not need vote to be committed.
if self.is_committed() || self.leader_id().term == 0 {
Some(self.leader_id().to_committed())
} else {
None
}
}

pub(crate) fn is_same_leader(&self, leader_id: &CommittedLeaderId<NID>) -> bool {
self.leader_id().is_same_as_committed(leader_id)
}
Expand Down

0 comments on commit d62f2fd

Please sign in to comment.