From d62f2fd8fd6b3fab2f31685d1f42ec9f12bd3574 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Fri, 5 Jul 2024 17:45:23 +0800 Subject: [PATCH] Refactor: Introduce `CommittedVote` Type to Enhance Vote Handling This commit introduces a new type, `CommittedVote`, which represents a committed vote. This enhancement is aimed at ensuring that only committed votes are considered valid for the leader's status. By using the `CommittedVote` type, we can eliminate several runtime assertion checks previously required to verify the commitment status of votes. --- openraft/src/core/raft_core.rs | 3 +- openraft/src/engine/engine_impl.rs | 14 +++--- .../src/engine/handler/leader_handler/mod.rs | 21 +-------- .../src/engine/handler/vote_handler/mod.rs | 2 +- openraft/src/proposer/candidate.rs | 5 +- openraft/src/proposer/leader.rs | 39 ++++++++------- openraft/src/raft_state/mod.rs | 2 +- .../src/replication/replication_session_id.rs | 5 +- openraft/src/vote/committed.rs | 47 +++++++++++++++++++ openraft/src/vote/mod.rs | 2 + openraft/src/vote/vote.rs | 19 ++++---- 11 files changed, 93 insertions(+), 66 deletions(-) create mode 100644 openraft/src/vote/committed.rs diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 46d1d45a9..7638a40b9 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -4,6 +4,7 @@ use std::fmt::Debug; use std::fmt::Display; use std::fmt::Formatter; use std::marker::PhantomData; +use std::ops::Deref; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; @@ -1507,7 +1508,7 @@ where // - Otherwise, it is sent by a Candidate, we check against the current in progress voting state. let my_vote = if sender_vote.is_committed() { let l = self.engine.leader.as_ref(); - l.map(|x| x.vote) + l.map(|x| *x.vote.deref()) } else { // If it finished voting, Candidate's vote is None. let candidate = self.engine.candidate_ref(); diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index f8507ca1e..a799ac232 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -754,14 +754,12 @@ where C: RaftTypeConfig Some(x) => x, }; - // This leader is not accepted by a quorum yet. - // Not a valid leader. - // - // Note that leading state is separated from local RaftState(which is used by the `Acceptor` part), - // and do not consider the vote in the local RaftState. - if !leader.vote.is_committed() { - return Err(self.state.forward_to_leader()); - } + debug_assert!( + *leader.vote_ref() >= *self.state.vote_ref(), + "leader.vote({}) >= state.vote({})", + leader.vote_ref(), + self.state.vote_ref() + ); Ok(LeaderHandler { config: &mut self.config, diff --git a/openraft/src/engine/handler/leader_handler/mod.rs b/openraft/src/engine/handler/leader_handler/mod.rs index 36f05b568..c739cd021 100644 --- a/openraft/src/engine/handler/leader_handler/mod.rs +++ b/openraft/src/engine/handler/leader_handler/mod.rs @@ -67,29 +67,10 @@ where C: RaftTypeConfig } } - // TODO: In future implementations with asynchronous IO, - // ensure logs are not written until the vote is committed - // to maintain consistency. - // --- - // Currently, IO requests to `RaftLogStorage` are executed - // within the `RaftCore` task. This means an `AppendLog` request - // won't be submitted to `RaftLogStorage` until `save_vote()` completes, - // which ensures consistency. - // --- - // However, when `RaftLogStorage` is moved to a separate task, - // `RaftCore` will communicate with `RaftLogStorage` via a channel. - // This change could result in `AppendLog` requests being submitted - // before the previous `save_vote()` request is finished. - // --- - // This scenario creates a risk where a log entry becomes visible and - // is replicated by `ReplicationCore` to other nodes before the vote - // is flushed to disk. If the vote isn't flushed and the server restarts, - // the vote could revert to a previous state. This could allow a new leader - // to be elected with a smaller vote (term), breaking consistency. self.output.push_command(Command::AppendInputEntries { // A leader should always use the leader's vote. // It is allowed to be different from local vote. - vote: self.leader.vote, + vote: *self.leader.vote, entries, }); diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index fed7aee27..1fb756911 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -186,7 +186,7 @@ where C: RaftTypeConfig // TODO: this is not gonna happen, // because `self.leader`(previous `internal_server_state`) // does not include Candidate any more. - l.vote = *self.state.vote_ref(); + l.vote = self.state.vote_ref().into_committed(); self.server_state_handler().update_server_state_if_changed(); return; } diff --git a/openraft/src/proposer/candidate.rs b/openraft/src/proposer/candidate.rs index 05ca3aa6f..2d4a63045 100644 --- a/openraft/src/proposer/candidate.rs +++ b/openraft/src/proposer/candidate.rs @@ -105,10 +105,9 @@ where pub(crate) fn into_leader(self) -> Leader { // Mark the vote as committed, i.e., being granted and saved by a quorum. let vote = { - let mut vote = *self.vote_ref(); + let vote = *self.vote_ref(); debug_assert!(!vote.is_committed()); - vote.commit(); - vote + vote.into_committed() }; let last_leader_log_ids = self.last_log_id().copied().into_iter().collect::>(); diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index 1a79d67c1..57ce136ec 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -7,6 +7,7 @@ use crate::progress::VecProgress; use crate::quorum::QuorumSet; use crate::type_config::alias::InstantOf; use crate::type_config::alias::LogIdOf; +use crate::vote::CommittedVote; use crate::Instant; use crate::LogId; use crate::LogIdOptionExt; @@ -36,7 +37,7 @@ where C: RaftTypeConfig /// The vote this leader works in. /// /// `self.voting` may be in progress requesting vote for a higher vote. - pub(crate) vote: Vote, + pub(crate) vote: CommittedVote, last_log_id: Option>, @@ -66,20 +67,19 @@ where /// /// `last_leader_log_id` is the first and last log id proposed by the last leader. pub(crate) fn new( - vote: Vote, + vote: CommittedVote, quorum_set: QS, learner_ids: impl IntoIterator, last_leader_log_id: &[LogIdOf], ) -> Self { - debug_assert!(vote.is_committed()); debug_assert!( - Some(vote.committed_leader_id().unwrap()) >= last_leader_log_id.last().map(|x| *x.committed_leader_id()), + Some(vote.committed_leader_id()) >= last_leader_log_id.last().map(|x| *x.committed_leader_id()), "vote {} must GE last_leader_log_id.last() {}", vote, last_leader_log_id.display() ); debug_assert!( - Some(vote.committed_leader_id().unwrap()) >= last_leader_log_id.first().map(|x| *x.committed_leader_id()), + Some(vote.committed_leader_id()) >= last_leader_log_id.first().map(|x| *x.committed_leader_id()), "vote {} must GE last_leader_log_id.first() {}", vote, last_leader_log_id.display() @@ -87,7 +87,7 @@ where let learner_ids = learner_ids.into_iter().collect::>(); - let vote_leader_id = vote.committed_leader_id().unwrap(); + let vote_leader_id = vote.committed_leader_id(); let first = last_leader_log_id.first(); let noop_log_id = if first.map(|x| *x.committed_leader_id()) == Some(vote_leader_id) { @@ -99,7 +99,7 @@ where } else { // Set to a log id that will be proposed. Some(LogId::new( - vote.committed_leader_id().unwrap(), + vote.committed_leader_id(), last_leader_log_id.last().next_index(), )) }; @@ -146,8 +146,7 @@ where &mut self, entries: impl IntoIterator, ) { - debug_assert!(self.vote.is_committed()); - let committed_leader_id = self.vote.committed_leader_id().unwrap(); + let committed_leader_id = self.vote.committed_leader_id(); let first = LogId::new(committed_leader_id, self.last_log_id().next_index()); let mut last = first; @@ -218,7 +217,7 @@ mod tests { fn test_leader_new_with_proposed_log_id() { tracing::info!("--- vote greater than last log id, create new noop_log_id"); { - let vote = Vote::new_committed(2, 2); + let vote = Vote::new(2, 2).into_committed(); let leader = Leader::::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 1), log_id(1, 2, 3)]); assert_eq!(leader.noop_log_id(), Some(&log_id(2, 2, 4))); @@ -227,7 +226,7 @@ mod tests { tracing::info!("--- vote equals last log id, reuse noop_log_id"); { - let vote = Vote::new_committed(1, 2); + let vote = Vote::new(1, 2).into_committed(); let leader = Leader::::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 1), log_id(1, 2, 3)]); assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 1))); @@ -236,7 +235,7 @@ mod tests { tracing::info!("--- vote equals last log id, reuse noop_log_id, last_leader_log_id.len()==1"); { - let vote = Vote::new_committed(1, 2); + let vote = Vote::new(1, 2).into_committed(); let leader = Leader::::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 3)]); assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 3))); @@ -245,7 +244,7 @@ mod tests { tracing::info!("--- no last log ids, create new noop_log_id, last_leader_log_id.len()==0"); { - let vote = Vote::new_committed(1, 2); + let vote = Vote::new(1, 2).into_committed(); let leader = Leader::::new(vote, vec![1, 2, 3], vec![], &[]); assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 0))); @@ -255,7 +254,7 @@ mod tests { #[test] fn test_leader_established() { - let vote = Vote::new_committed(2, 2); + let vote = Vote::new(2, 2).into_committed(); let mut leader = Leader::::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 3)]); let mut entries = vec![Entry::::new_blank(log_id(5, 5, 2))]; @@ -271,7 +270,7 @@ mod tests { #[test] fn test_1_entry_none_last_log_id() { - let vote = Vote::new_committed(0, 0); + let vote = Vote::new(0, 0).into_committed(); let mut leading = Leader::::new(vote, vec![1, 2, 3], vec![], &[]); let mut entries: Vec> = vec![blank_ent(1, 1, 1)]; @@ -283,7 +282,7 @@ mod tests { #[test] fn test_no_entries_provided() { - let vote = Vote::new_committed(2, 2); + let vote = Vote::new(2, 2).into_committed(); let mut leading = Leader::::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 1, 8)]); let mut entries: Vec> = vec![]; @@ -293,7 +292,7 @@ mod tests { #[test] fn test_multiple_entries() { - let vote = Vote::new_committed(2, 2); + let vote = Vote::new(2, 2).into_committed(); let mut leading = Leader::::new(vote, vec![1, 2, 3], [], &[log_id(1, 1, 8)]); let mut entries: Vec> = vec![blank_ent(1, 1, 1), blank_ent(1, 1, 1), blank_ent(1, 1, 1)]; @@ -307,7 +306,7 @@ mod tests { #[test] fn test_leading_last_quorum_acked_time_leader_is_voter() { - let mut leading = Leader::>::new(Vote::new_committed(2, 1), vec![1, 2, 3], [4], &[]); + let mut leading = Leader::>::new(Vote::new(2, 1).into_committed(), vec![1, 2, 3], [4], &[]); let now1 = UTConfig::<()>::now(); @@ -318,7 +317,7 @@ mod tests { #[test] fn test_leading_last_quorum_acked_time_leader_is_learner() { - let mut leading = Leader::>::new(Vote::new_committed(2, 4), vec![1, 2, 3], [4], &[]); + let mut leading = Leader::>::new(Vote::new(2, 4).into_committed(), vec![1, 2, 3], [4], &[]); let t2 = UTConfig::<()>::now(); let _ = leading.clock_progress.increase_to(&2, Some(t2)); @@ -333,7 +332,7 @@ mod tests { #[test] fn test_leading_last_quorum_acked_time_leader_is_not_member() { - let mut leading = Leader::>::new(Vote::new_committed(2, 5), vec![1, 2, 3], [4], &[]); + let mut leading = Leader::>::new(Vote::new(2, 5).into_committed(), vec![1, 2, 3], [4], &[]); let t2 = UTConfig::<()>::now(); let _ = leading.clock_progress.increase_to(&2, Some(t2)); diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 189ab693a..bc97ef8f4 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -383,7 +383,7 @@ where C: RaftTypeConfig let last_leader_log_ids = self.log_ids.by_last_leader(); Leader::new( - *self.vote_ref(), + self.vote_ref().into_committed(), em.to_quorum_set(), em.learner_ids(), last_leader_log_ids, diff --git a/openraft/src/replication/replication_session_id.rs b/openraft/src/replication/replication_session_id.rs index f1ac1378c..9e2d044b4 100644 --- a/openraft/src/replication/replication_session_id.rs +++ b/openraft/src/replication/replication_session_id.rs @@ -2,6 +2,7 @@ use std::fmt::Display; use std::fmt::Formatter; use crate::display_ext::DisplayOptionExt; +use crate::vote::CommittedVote; use crate::LogId; use crate::RaftTypeConfig; use crate::Vote; @@ -33,7 +34,7 @@ pub(crate) struct ReplicationSessionId where C: RaftTypeConfig { /// The Leader or Candidate this replication belongs to. - pub(crate) vote: Vote, + pub(crate) vote: CommittedVote, /// The log id of the membership log this replication works for. pub(crate) membership_log_id: Option>, @@ -50,7 +51,7 @@ where C: RaftTypeConfig impl ReplicationSessionId where C: RaftTypeConfig { - pub(crate) fn new(vote: Vote, membership_log_id: Option>) -> Self { + pub(crate) fn new(vote: CommittedVote, membership_log_id: Option>) -> Self { Self { vote, membership_log_id, diff --git a/openraft/src/vote/committed.rs b/openraft/src/vote/committed.rs new file mode 100644 index 000000000..ac1902f37 --- /dev/null +++ b/openraft/src/vote/committed.rs @@ -0,0 +1,47 @@ +use std::fmt; +use std::ops::Deref; + +use crate::type_config::alias::NodeIdOf; +use crate::CommittedLeaderId; +use crate::RaftTypeConfig; +use crate::Vote; + +/// Represents a committed Vote that has been accepted by a quorum. +/// +/// The inner `Vote`'s attribute `committed` is always set to `true` +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)] +pub(crate) struct CommittedVote +where C: RaftTypeConfig +{ + vote: Vote>, +} + +impl CommittedVote +where C: RaftTypeConfig +{ + pub(crate) fn new(mut vote: Vote>) -> Self { + vote.committed = true; + Self { vote } + } + pub(crate) fn committed_leader_id(&self) -> CommittedLeaderId { + self.leader_id().to_committed() + } +} + +impl Deref for CommittedVote +where C: RaftTypeConfig +{ + type Target = Vote>; + + fn deref(&self) -> &Self::Target { + &self.vote + } +} + +impl fmt::Display for CommittedVote +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.vote.fmt(f) + } +} diff --git a/openraft/src/vote/mod.rs b/openraft/src/vote/mod.rs index 2146647e4..36ab1eecc 100644 --- a/openraft/src/vote/mod.rs +++ b/openraft/src/vote/mod.rs @@ -1,7 +1,9 @@ +pub(crate) mod committed; mod leader_id; #[allow(clippy::module_inception)] mod vote; +pub(crate) use committed::CommittedVote; pub use leader_id::CommittedLeaderId; pub use leader_id::LeaderId; pub use vote::Vote; diff --git a/openraft/src/vote/vote.rs b/openraft/src/vote/vote.rs index cee8a0c19..139ccacef 100644 --- a/openraft/src/vote/vote.rs +++ b/openraft/src/vote/vote.rs @@ -1,9 +1,11 @@ use std::cmp::Ordering; use std::fmt::Formatter; +use crate::vote::committed::CommittedVote; use crate::vote::leader_id::CommittedLeaderId; use crate::LeaderId; use crate::NodeId; +use crate::RaftTypeConfig; /// `Vote` represent the privilege of a node. #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] @@ -68,10 +70,17 @@ impl Vote { } } + #[deprecated(note = "use `into_committed()` instead", since = "0.10.0")] pub fn commit(&mut self) { self.committed = true } + /// Convert this vote into a `CommittedVote` + pub(crate) fn into_committed(self) -> CommittedVote + where C: RaftTypeConfig { + CommittedVote::new(self) + } + pub fn is_committed(&self) -> bool { self.committed } @@ -83,16 +92,6 @@ impl Vote { &self.leader_id } - /// Return a [`CommittedLeaderId`], which is granted by a quorum. - pub(crate) fn committed_leader_id(&self) -> Option> { - // Special case (term==0): when initializing the first log does not need vote to be committed. - if self.is_committed() || self.leader_id().term == 0 { - Some(self.leader_id().to_committed()) - } else { - None - } - } - pub(crate) fn is_same_leader(&self, leader_id: &CommittedLeaderId) -> bool { self.leader_id().is_same_as_committed(leader_id) }