diff --git a/harness/src/network.rs b/harness/src/network.rs index 6bc8a0413..b7fa25fe9 100644 --- a/harness/src/network.rs +++ b/harness/src/network.rs @@ -150,6 +150,7 @@ impl Network { /// Read out all messages generated by peers in the `Network`. /// /// Note: messages are not filtered by any configured filters. + #[inline] pub fn read_messages(&mut self) -> Vec { self.peers .iter_mut() @@ -157,6 +158,13 @@ impl Network { .collect() } + #[inline] + pub fn read_peer_messages(&mut self, id: u64) -> Vec { + self.peers + .get_mut(&id) + .map_or(vec![], |node| node.read_messages()) + } + /// Instruct the cluster to `step` through the given messages. /// /// NOTE: the given `msgs` won't be filtered by its filters. diff --git a/harness/tests/integration_cases/mod.rs b/harness/tests/integration_cases/mod.rs index 5e3928f75..f25b2ab15 100644 --- a/harness/tests/integration_cases/mod.rs +++ b/harness/tests/integration_cases/mod.rs @@ -2,6 +2,7 @@ mod test_raft; mod test_raft_flow_control; +mod test_raft_follower_replication; mod test_raft_paper; mod test_raft_snap; mod test_raw_node; diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index ed476d7d7..383905291 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -1196,7 +1196,7 @@ fn test_handle_msg_append() { ); sm.become_follower(2, INVALID_ID); - sm.handle_append_entries(&m); + sm.handle_append_message(m); if sm.raft_log.last_index() != w_index { panic!( "#{}: last_index = {}, want {}", diff --git a/harness/tests/integration_cases/test_raft_follower_replication.rs b/harness/tests/integration_cases/test_raft_follower_replication.rs new file mode 100644 index 000000000..75e1f2149 --- /dev/null +++ b/harness/tests/integration_cases/test_raft_follower_replication.rs @@ -0,0 +1,615 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::test_util::*; +use harness::{Interface, Network}; +use raft::eraftpb::*; +use raft::storage::MemStorage; +use raft::*; +use slog::Logger; +use std::collections::HashSet; +use std::iter::FromIterator; + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +enum FollowerScenario { + // Follower is ready for new raft logs + UpToDate, + // Follower's next_idx = given - 1 and matched = given - 2 + NeedEntries(u64), + // Follower need snapshot + Snapshot, +} + +// Sandbox is a helper struct to represent a determined stable state of a raft cluster. +struct Sandbox { + leader: u64, + // initialized last index + last_index: u64, + followers: Vec<(u64, FollowerScenario)>, + network: Network, +} + +impl Sandbox { + // Create a sandbox for testing + // + // The relationship between followers in different states: + // + // +-----+ + // | + // | + // | Follower::Snapshot + // | + // | + // +-----+ + // | + // | + // | + // | Follower::NeedEntries + // | + // | + // | + // +-----+ Follower::UpToDate + // + // The given `leader` and `followers` should be mutually exclusive. + // The ProgressSet in generated followers are uninitialized + // + pub fn new( + l: &Logger, + leader: u64, + followers: Vec<(u64, FollowerScenario)>, + group_config: Vec<(u64, Vec)>, + snapshot_index: u64, + last_index: u64, + ) -> Self { + if snapshot_index >= last_index { + panic!( + "snapshot_index {} should be less than last_index {}", + snapshot_index, last_index + ); + } + if last_index < 2 { + panic!("last_index {} should be larger than 1", last_index); + } + let peers = followers.iter().map(|(id, _)| *id).collect::>(); + if peers.contains(&leader) { + panic!( + "followers {:?} and leader {} should be mutually exclusive", + &peers, leader + ) + } + let mut peers = peers.into_iter().collect::>(); + peers.push(leader); + let c = new_test_config(leader, 10, 1); + let storage = new_storage(peers.clone(), snapshot_index, last_index - 1); + let mut leader_node = Interface::new(Raft::new(&c, storage, l).unwrap()); + leader_node.set_groups(group_config); + leader_node.become_candidate(); + leader_node.become_leader(); + let entries = leader_node.raft_log.all_entries(); + let mut prs = leader_node.take_prs(); + let mut interfaces = followers + .clone() + .drain(..) + .map(|(id, scenario)| { + let storage = + new_storage_by_scenario(scenario, peers.clone(), snapshot_index, last_index); + let mut c = c.clone(); + c.id = id; + let node = Interface::new(Raft::new(&c, storage, l).unwrap()); + let node_entries = node.raft_log.all_entries(); + if scenario != FollowerScenario::Snapshot { + Self::assert_entries_consistent(entries.clone(), node_entries); + } + let mut pr = prs.get_mut(id).unwrap(); + pr.state = match scenario { + FollowerScenario::NeedEntries(_) | FollowerScenario::UpToDate => { + ProgressState::Replicate + } + FollowerScenario::Snapshot => ProgressState::Probe, + }; + pr.paused = false; + pr.recent_active = true; + pr.matched = node.raft_log.last_index(); + pr.next_idx = node.raft_log.last_index() + 1; + Some(node) + }) + .collect::>>(); + leader_node.set_prs(prs); + interfaces.insert(0, Some(leader_node)); + let network = Network::new(interfaces, l); + Self { + leader, + last_index, + followers, + network, + } + } + + // Only for `UpToDate` and `NeedEntries` + fn assert_entries_consistent(leader: Vec, target: Vec) { + for (e1, e2) in leader.iter().zip(target) { + assert_eq!(e1.index, e2.index); + assert_eq!(e1.term, e2.term); + } + } + + fn assert_final_state(&self) { + self.network.peers.iter().for_each(|(id, n)| { + assert_eq!( + n.raft_log.last_index(), + self.last_index, + "The peer {} last index should be up-to-date", + id + ) + }); + // The ProgressSet should be updated + self.network + .peers + .get(&self.leader) + .unwrap() + .prs() + .iter() + .for_each(|(_, pr)| assert!(pr.matched == self.last_index)) + } + + // Get mutable Interface of the leader + fn leader_mut(&mut self) -> &mut Interface { + let leader = self.leader; + self.network.peers.get_mut(&leader).unwrap() + } + + // Get immutable Interface of the leader + fn leader(&self) -> &Interface { + let leader = self.leader; + self.network.peers.get(&leader).unwrap() + } + + // Get a mutable Interface by given id + fn get_mut(&mut self, id: u64) -> &mut Interface { + self.network.peers.get_mut(&id).unwrap() + } + + // Send a MsgPropose to the leader + fn propose(&mut self, only_dispatch: bool) { + let proposal = new_message(self.leader, self.leader, MessageType::MsgPropose, 1); + if only_dispatch { + self.network.dispatch(vec![proposal]).unwrap(); + } else { + self.network.send(vec![proposal]); + } + self.last_index += 1; + } +} + +fn new_storage(peers: Vec, snapshot_index: u64, last_index: u64) -> MemStorage { + let s = MemStorage::new_with_conf_state((peers.clone(), vec![])); + let snapshot = new_snapshot(snapshot_index, 1, peers); + s.wl().apply_snapshot(snapshot).unwrap(); + if snapshot_index < last_index { + let mut ents = vec![]; + for index in snapshot_index + 1..=last_index { + ents.push(empty_entry(1, index)); + } + s.wl().append(&ents).unwrap(); + } + s +} + +fn new_storage_by_scenario( + scenario: FollowerScenario, + peers: Vec, + snapshot_index: u64, + last_index: u64, +) -> MemStorage { + let s = MemStorage::new_with_conf_state((peers.clone(), vec![])); + match scenario { + FollowerScenario::UpToDate => { + let snapshot = new_snapshot(snapshot_index, 1, peers); + s.wl().apply_snapshot(snapshot).unwrap(); + let mut ents = vec![]; + for index in snapshot_index + 1..last_index { + ents.push(empty_entry(1, index)); + } + ents.push(empty_entry(2, last_index)); + s.wl().append(&ents).unwrap(); + } + FollowerScenario::NeedEntries(index) => { + assert!(index > snapshot_index); + let snapshot = new_snapshot(snapshot_index, 1, peers); + s.wl().apply_snapshot(snapshot).unwrap(); + let mut ents = vec![]; + for i in snapshot_index + 1..index { + ents.push(empty_entry(1, i)); + } + if index == last_index { + ents.push(empty_entry(2, index)); + } + s.wl().append(&ents).unwrap(); + } + FollowerScenario::Snapshot => { + let mut ents = vec![]; + for index in 2..snapshot_index { + ents.push(empty_entry(1, index)) + } + s.wl().append(&ents).unwrap(); + } + }; + s +} + +// test_pick_delegate ensures that the delegate should be able to send entries to the other group +// members in leader's view. +#[test] +fn test_pick_group_delegate() { + let l = default_logger(); + let group_config = vec![(2, vec![1]), (1, vec![2, 3, 4])]; + let tests = vec![ + ( + vec![4], + MessageType::MsgAppend, + vec![ + (2, FollowerScenario::NeedEntries(6)), + (3, FollowerScenario::NeedEntries(7)), + (4, FollowerScenario::NeedEntries(8)), + ], + ), + ( + vec![2, 3, 4], + MessageType::MsgSnapshot, + vec![ + (2, FollowerScenario::Snapshot), + (3, FollowerScenario::Snapshot), + (4, FollowerScenario::Snapshot), + ], + ), + ( + vec![2], + MessageType::MsgAppend, + vec![ + (2, FollowerScenario::UpToDate), + (3, FollowerScenario::Snapshot), + (4, FollowerScenario::NeedEntries(7)), + ], + ), + ]; + for (i, (expected_delegate, expected_msg_type, input)) in tests.into_iter().enumerate() { + let mut sandbox = Sandbox::new(&l, 1, input.clone(), group_config.clone(), 5, 10); + + sandbox.propose(true); + let mut msgs = sandbox.leader_mut().read_messages(); + assert_eq!( + 1, + msgs.len(), + "#{} Should only send one msg: {:?}", + i, + input + ); + + let m = msgs.pop().unwrap(); + assert_eq!( + m.msg_type, expected_msg_type, + "#{} The sent msg type should be {:?} but got {:?}", + i, expected_delegate, m.msg_type, + ); + + let delegate = m.to; + let delegate_set: HashSet = HashSet::from_iter(expected_delegate); + assert!( + delegate_set.contains(&delegate), + "#{} set {:?}, delegate {}", + i, + &delegate_set, + delegate + ); + assert_eq!( + sandbox.leader().groups.get_delegate(2), + delegate, + "#{} The picked delegate should be cached", + i + ); + } +} + +// test_delegate_in_group_containing_leader ensures that the leader send msgs directly to the followers in the same group +#[test] +fn test_delegate_in_group_containing_leader() { + let l = default_logger(); + let group_config = vec![(1, vec![1, 2, 3, 4])]; + let followers = vec![ + (2, FollowerScenario::NeedEntries(7)), + (3, FollowerScenario::Snapshot), + (4, FollowerScenario::UpToDate), + ]; + let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, 10); + + sandbox.propose(true); + let msgs = sandbox.leader_mut().read_messages(); + assert_eq!(msgs.len(), 3); + msgs.iter() + .for_each(|m| assert!(m.bcast_targets.is_empty())); +} + +#[test] +fn test_broadcast_append_use_delegate() { + let l = default_logger(); + let mut sandbox = Sandbox::new( + &l, + 1, + vec![ + (2, FollowerScenario::NeedEntries(8)), + (3, FollowerScenario::NeedEntries(7)), + (4, FollowerScenario::NeedEntries(6)), + ], + vec![(2, vec![1]), (1, vec![2, 3, 4])], + 5, + 10, + ); + + sandbox.propose(true); + let mut msgs = sandbox.leader_mut().read_messages(); + assert_eq!(1, msgs.len()); + + let m = msgs.pop().unwrap(); + assert_eq!(m.msg_type, MessageType::MsgAppend); + assert!(m.bcast_targets.contains(&3)); + assert!(m.bcast_targets.contains(&4)); + + let delegate = m.to; + assert_eq!(delegate, 2); + + sandbox.network.dispatch(vec![m]).unwrap(); + assert_eq!(2, sandbox.leader().groups.get_delegate(2)); + let mut msgs = sandbox.get_mut(delegate).read_messages(); + assert_eq!(3, msgs.len()); + + let bcast_resp = msgs.remove(0); // Send to leader first + assert_eq!(bcast_resp.msg_type, MessageType::MsgAppendResponse); + let to_send_ids = sandbox + .followers + .iter() + .filter(|(id, _)| *id != delegate) + .map(|(id, _)| *id) + .collect::>(); + let set: HashSet = HashSet::from_iter(to_send_ids); + msgs.iter().for_each(|m| { + assert_eq!( + m.from, 1, + "the delegated message must looks like coming from leader" + ); + assert_eq!(m.delegate, 2, "'delegate' must be set"); + assert_eq!(m.msg_type, MessageType::MsgAppend); + assert!(set.contains(&m.to)); + }); + sandbox.network.send(vec![bcast_resp]); + sandbox.network.send(msgs); + sandbox.assert_final_state(); +} + +// test_no_delegate_in_group_containing_leader ensures that the picked delegate rejects broadcast +// request when its raft logs are not consistent with the leader +#[test] +fn test_delegate_reject_broadcast() { + let l = default_logger(); + let group_config = vec![(2, vec![1]), (1, vec![2, 3, 4])]; + let followers = vec![ + (2, FollowerScenario::NeedEntries(7)), + (3, FollowerScenario::Snapshot), + (4, FollowerScenario::NeedEntries(12)), + ]; + let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, 20); + + sandbox.leader_mut().mut_prs().get_mut(4).unwrap().next_idx = 15; // make a conflict next_idx + sandbox.propose(true); + let mut msgs = sandbox.leader_mut().read_messages(); + let m = msgs.pop().unwrap(); + assert_eq!(4, m.to); + sandbox.network.dispatch(vec![m]).unwrap(); + + let mut msgs = sandbox.get_mut(4).read_messages(); + assert_eq!(1, msgs.len()); + let m = msgs.pop().unwrap(); + assert_eq!(MessageType::MsgAppendResponse, m.msg_type); + assert!(m.reject); + assert_eq!(1, m.to); + sandbox.network.dispatch(vec![m]).unwrap(); + assert_eq!( + 4, + sandbox.leader().groups.get_delegate(2), + "The delegate won't be dismissed when rejecting MsgAppend" + ); + + let mut msgs = sandbox.leader_mut().read_messages(); + assert_eq!(1, msgs.len()); + let m = msgs.pop().unwrap(); + assert_eq!(4, m.to); + assert_eq!(2, m.get_bcast_targets().len()); + sandbox.network.send(vec![m]); + sandbox.assert_final_state(); +} + +#[test] +fn test_follower_only_send_reject_to_delegate() { + let l = default_logger(); + let group_config = vec![(2, vec![1]), (1, vec![2, 3])]; + let followers = vec![ + (2, FollowerScenario::NeedEntries(10)), + (3, FollowerScenario::NeedEntries(7)), + ]; + let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, 20); + + sandbox.propose(true); + let msgs = sandbox.leader_mut().read_messages(); + + // Pick peer 2 as the delegate + assert_eq!(2, sandbox.leader().groups.get_delegate(3)); + sandbox.network.dispatch(msgs).unwrap(); + let mut msgs = sandbox.get_mut(2).read_messages(); + // MsgAppendResponse to 1 and MsgAppend to 3 + // We only care about the latter + assert_eq!(msgs.len(), 2); + let m = msgs.remove(1); + assert_eq!(m.get_msg_type(), MessageType::MsgAppend); + assert_eq!(m.to, 3); + assert_eq!(m.from, 1); + assert_eq!(m.delegate, 2); + sandbox.network.dispatch(vec![m]).unwrap(); + let mut msgs = sandbox.get_mut(3).read_messages(); + assert_eq!(msgs.len(), 1); + let m = msgs.pop().unwrap(); + assert_eq!(m.to, 2); + assert!(m.reject); +} + +#[test] +fn test_paused_delegate() { + let l = default_logger(); + let group_config = vec![(2, vec![1]), (1, vec![2, 3, 4])]; + let followers = vec![ + (2, FollowerScenario::NeedEntries(10)), + (3, FollowerScenario::NeedEntries(7)), + (4, FollowerScenario::Snapshot), + ]; + let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, 20); + for id in 1..=4 { + // Reset inflights capacity to 1. + let r = sandbox.network.peers.get_mut(&id).unwrap(); + r.max_inflight = 1; + for (_, pr) in r.mut_prs().iter_mut() { + pr.ins = Inflights::new(1); + } + } + + // Leader will send append only to peer 2. + sandbox.propose(true); + let msgs = sandbox.get_mut(1).read_messages(); + assert_eq!(msgs.len(), 1); + sandbox.network.dispatch(msgs).unwrap(); + + // More proposals wont' cause more messages sent out. + sandbox.propose(true); + let msgs = sandbox.get_mut(1).read_messages(); + assert_eq!(msgs.len(), 0); + + // Step the append response from peer 2, then the leader can send more. + // And all append messages should contain `bcast_targets`. + let append_resp = sandbox.get_mut(2).read_messages()[0].clone(); + sandbox.network.dispatch(vec![append_resp]).unwrap(); + let msgs = sandbox.get_mut(1).read_messages(); + assert!(!msgs[0].get_bcast_targets().is_empty()); +} + +#[test] +fn test_dismiss_delegate_when_not_active() { + let l = default_logger(); + let group_config = vec![(2, vec![1]), (1, vec![2, 3, 4])]; + let followers = vec![ + (2, FollowerScenario::NeedEntries(10)), + (3, FollowerScenario::NeedEntries(7)), + (4, FollowerScenario::NeedEntries(6)), + ]; + let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, 20); + + for id in 1..=4 { + // Reset check_quorum to true. + let r = sandbox.network.peers.get_mut(&id).unwrap(); + r.check_quorum = true; + } + + // Leader will send append only to peer 2. + sandbox.propose(true); + let msgs = sandbox.get_mut(1).read_messages(); + assert_eq!(msgs.len(), 1); + sandbox.network.dispatch(msgs).unwrap(); + + // Let the leader check quorum twice. Then delegates should be dismissed. + for _ in 0..2 { + { + let r = sandbox.network.peers.get_mut(&1).unwrap(); + for (id, pr) in r.mut_prs().iter_mut() { + if *id == 3 || *id == 4 { + pr.recent_active = true; + } + } + } + let s = sandbox.network.peers[&1].election_elapsed; + let e = sandbox.network.peers[&1].election_timeout(); + for _ in s..=e { + sandbox.leader_mut().tick(); + } + } + + // After the delegate is dismissed, leader will send append to it and an another new delegate. + sandbox.propose(true); + let mut msgs = sandbox.get_mut(1).read_messages(); + msgs = msgs + .into_iter() + .filter(|m| m.get_msg_type() == MessageType::MsgAppend) + .collect(); + assert_eq!(msgs.len(), 2); + sandbox.network.send(msgs); + sandbox.assert_final_state(); +} + +#[test] +fn test_update_group_by_group_id_in_message() { + let l = default_logger(); + let group_config = vec![(1, vec![1]), (2, vec![2, 3, 4]), (3, vec![5])]; + let followers = vec![ + (2, FollowerScenario::NeedEntries(10)), + (3, FollowerScenario::NeedEntries(9)), + (4, FollowerScenario::NeedEntries(8)), + (5, FollowerScenario::NeedEntries(7)), + ]; + let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, 20); + + // Change peer 4 group id from 2 to 3. + sandbox.network.peers.get_mut(&4).unwrap().group_id = 3; + + sandbox.propose(false); + sandbox.assert_final_state(); + + sandbox.propose(false); + sandbox.assert_final_state(); + assert_eq!( + sandbox.leader().groups.dump(), + vec![(1, vec![1]), (2, vec![2, 3]), (3, vec![4, 5])], + ); +} + +#[test] +fn test_delegate_must_be_able_to_send_logs_to_targets() { + let l = default_logger(); + let group_config = vec![(1, vec![1]), (2, vec![2, 3, 4])]; + let followers = vec![ + (2, FollowerScenario::UpToDate), + (3, FollowerScenario::NeedEntries(9)), + (4, FollowerScenario::Snapshot), + ]; + let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, 20); + let max_inflight = sandbox.network.peers.get(&2).unwrap().max_inflight; + // Make Inflights 3 full + let node2 = sandbox.get_mut(2); + let pr3 = node2.mut_prs().get_mut(3).unwrap(); + pr3.become_replicate(); + for i in 1..=max_inflight { + pr3.ins.add(i as u64); + } + assert!(pr3.is_paused()); + // Make Progress 4 paused + let pr4 = node2.mut_prs().get_mut(4).unwrap(); + pr4.become_probe(); + pr4.pause(); + assert!(pr4.is_paused()); + sandbox.propose(false); + sandbox.assert_final_state(); +} diff --git a/harness/tests/integration_cases/test_raw_node.rs b/harness/tests/integration_cases/test_raw_node.rs index cbc93e319..8c32ccea7 100644 --- a/harness/tests/integration_cases/test_raw_node.rs +++ b/harness/tests/integration_cases/test_raw_node.rs @@ -154,12 +154,14 @@ fn test_raw_node_read_index_to_old_leader() { // verify r1(follower) forwards these messages again to r3(new leader) assert_eq!(nt.peers[&1].msgs.len(), 2); - - let read_index_msg3 = - new_message_with_entries(1, 3, MessageType::MsgReadIndex, vec![test_entries]); - - assert_eq!(nt.peers[&1].msgs[0], read_index_msg3); - assert_eq!(nt.peers[&1].msgs[1], read_index_msg3); + assert_eq!( + nt.peers[&1].msgs[0], + new_message_with_entries(2, 3, MessageType::MsgReadIndex, vec![test_entries.clone()]) + ); + assert_eq!( + nt.peers[&1].msgs[1], + new_message_with_entries(3, 3, MessageType::MsgReadIndex, vec![test_entries]) + ); } // test_raw_node_propose_and_conf_change ensures that RawNode.propose and diff --git a/proto/proto/eraftpb.proto b/proto/proto/eraftpb.proto index 18467d636..32be9fd0c 100644 --- a/proto/proto/eraftpb.proto +++ b/proto/proto/eraftpb.proto @@ -75,10 +75,17 @@ message Message { repeated Entry entries = 7; uint64 commit = 8; Snapshot snapshot = 9; - uint64 request_snapshot = 13; bool reject = 10; uint64 reject_hint = 11; bytes context = 12; + uint64 request_snapshot = 13; + + // The group id of the `from` + uint64 group_id = 14; + // The targets to send raft logs through the delegate + repeated uint64 bcast_targets = 16; + // Whether the message comes from a delegate actually + uint64 delegate = 17; } message HardState { diff --git a/src/config.rs b/src/config.rs index fdee33fd9..9ed0f1a33 100644 --- a/src/config.rs +++ b/src/config.rs @@ -17,7 +17,7 @@ pub use super::read_only::{ReadOnlyOption, ReadState}; use super::{ errors::{Error, Result}, - INVALID_ID, + INVALID_ID, INVALID_INDEX, }; /// Config contains the parameters to start a raft. @@ -91,16 +91,19 @@ pub struct Config { /// Function to custom `quorum` for Raft. The return value will be normalized into range /// [majority, voters_len]. pub quorum_fn: fn(usize) -> usize, + + /// The Group ID of this node in the feature Follower Replication. + pub group_id: u64, } impl Default for Config { fn default() -> Self { const HEARTBEAT_TICK: usize = 2; Self { - id: 0, + id: INVALID_ID, election_tick: HEARTBEAT_TICK * 10, heartbeat_tick: HEARTBEAT_TICK, - applied: 0, + applied: INVALID_INDEX, max_size_per_msg: 0, max_inflight_msgs: 256, check_quorum: false, @@ -111,6 +114,7 @@ impl Default for Config { skip_bcast_commit: false, batch_append: false, quorum_fn: crate::majority, + group_id: INVALID_ID, } } } diff --git a/src/group/mod.rs b/src/group/mod.rs new file mode 100644 index 000000000..fb1852edb --- /dev/null +++ b/src/group/mod.rs @@ -0,0 +1,318 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! The module includes the definition of new feature 'raft group' which is used for +//! **Follower Replication** +//! +//! # Follower Replication +//! See https://github.com/tikv/rfcs/pull/33 + +use std::collections::HashMap; + +use crate::progress::progress_set::ProgressSet; +use crate::raft::INVALID_ID; + +/// Maintain all the groups info in Follower Replication +/// +/// # Notice +/// +/// A node only belongs to one group +/// +#[derive(Debug, Clone, Default)] +pub struct Groups { + // node id => (group id, delegate id). + indexes: HashMap, + + // Use to construct `bcast_targets` for delegates quickly. + bcast_targets: HashMap>, + + // Peers without chosen delegates. + unresolved: Vec, + + leader_group_id: u64, +} + +impl Groups { + /// Create new Groups with given configuration. + pub(crate) fn new(config: Vec<(u64, Vec)>) -> Self { + let mut indexes = HashMap::new(); + let mut unresolved = Vec::new(); + for (group_id, members) in config { + for id in members { + indexes.insert(id, (group_id, INVALID_ID)); + unresolved.push(id); + } + } + + Self { + indexes, + unresolved, + ..Default::default() + } + } + + pub(crate) fn set_leader_group_id(&mut self, leader_group: u64) { + self.leader_group_id = leader_group; + } + + /// Get group id by member id. + pub(crate) fn get_group_id(&self, member: u64) -> Option { + self.indexes.get(&member).map(|(gid, _)| *gid) + } + + /// Get a delegate for `to`. The return value could be `to` itself. + pub fn get_delegate(&self, to: u64) -> u64 { + match self.indexes.get(&to) { + Some((_, delegate)) => *delegate, + None => INVALID_ID, + } + } + + // Pick a delegate for the given peer. + // + // The delegate must satisfy conditions below: + // 1. The progress state should be `ProgressState::Replicate`; + // 2. The progress has biggest `match`; + // If all the members are requiring snapshots, use given `to`. + fn pick_delegate(&mut self, to: u64, prs: &ProgressSet) { + let group_id = match self.indexes.get(&to) { + Some((_, delegate)) if *delegate != INVALID_ID => return, + Some((gid, _)) if *gid == self.leader_group_id => return, + Some((gid, _)) => *gid, + None => return, + }; + + let (mut chosen, mut matched, mut bcast_targets) = (INVALID_ID, 0, vec![]); + for id in self.candidate_delegates(group_id) { + let pr = prs.get(id).unwrap(); + if matched < pr.matched { + if chosen != INVALID_ID { + bcast_targets.push(chosen); + } + chosen = id; + matched = pr.matched; + } else { + bcast_targets.push(id); + } + } + + // If there is only one member in the group, it remains unresolved. + if chosen != INVALID_ID && !bcast_targets.is_empty() { + let (_, d) = self.indexes.get_mut(&chosen).unwrap(); + *d = chosen; + for id in &bcast_targets { + let (_, d) = self.indexes.get_mut(id).unwrap(); + *d = chosen; + } + self.bcast_targets.insert(chosen, bcast_targets); + } + } + + fn candidate_delegates(&self, group_id: u64) -> impl Iterator + '_ { + self.indexes.iter().filter_map(move |(peer, (gid, _))| { + if group_id == *gid { + return Some(*peer); + } + None + }) + } + + /// Unset the delegate by delegate id. If the peer is not delegate, do nothing. + pub(crate) fn remove_delegate(&mut self, delegate: u64) { + if self.bcast_targets.remove(&delegate).is_some() { + // Remove the delegate from the group system since it's temorary unreachable. + // And the peer will be re-added after the leader receives a message from it. + self.indexes.remove(&delegate); + for (peer, (_, d)) in self.indexes.iter_mut() { + if *d == delegate { + *d = INVALID_ID; + self.unresolved.push(*peer); + } + } + } + } + + pub(crate) fn is_delegated(&self, to: u64) -> bool { + self.indexes + .get(&to) + .map_or(false, |x| x.1 != INVALID_ID && x.1 != to) + } + + pub(crate) fn get_bcast_targets(&self, delegate: u64) -> Option<&Vec> { + debug_assert!(self.unresolved.is_empty()); + self.bcast_targets.get(&delegate) + } + + /// Update given `peer`'s group ID. Return `true` if any peers are unresolved. + pub(crate) fn update_group_id(&mut self, peer: u64, group_id: u64) -> bool { + if group_id == INVALID_ID { + self.unmark_peer(peer); + } else if let Some((gid, _)) = self.indexes.get(&peer) { + if *gid == group_id { + return false; + } + self.unmark_peer(peer); + self.mark_peer(peer, group_id); + } else { + self.mark_peer(peer, group_id); + } + !self.unresolved.is_empty() + } + + fn unmark_peer(&mut self, peer: u64) { + if let Some((_, del)) = self.indexes.remove(&peer) { + if peer == del { + self.remove_delegate(del); + return; + } + let mut targets = self.bcast_targets.remove(&del).unwrap(); + let pos = targets.iter().position(|id| *id == peer).unwrap(); + targets.swap_remove(pos); + if !targets.is_empty() { + self.bcast_targets.insert(del, targets); + } + } + } + + fn mark_peer(&mut self, peer: u64, group_id: u64) { + let (found, delegate) = self + .indexes + .iter() + .find(|(_, (gid, _))| *gid == group_id) + .map_or((false, INVALID_ID), |(_, (_, d))| (true, *d)); + + let _x = self.indexes.insert(peer, (group_id, delegate)); + debug_assert!(_x.is_none(), "peer can't exist before mark"); + + if delegate != INVALID_ID { + self.bcast_targets.get_mut(&delegate).unwrap().push(peer); + } else if found { + // We have found a peer in the same group but haven't been delegated, add it to + // `unresolved`. + self.unresolved.push(peer); + } + } + + // Pick delegates for all peers if need. + // TODO: change to `pub(crate)` after we find a simple way to test. + pub fn resolve_delegates(&mut self, prs: &ProgressSet) { + if !self.unresolved.is_empty() { + for peer in std::mem::replace(&mut self.unresolved, vec![]) { + self.pick_delegate(peer, prs); + } + } + } + + // Return the collection of mapping: group id => members + pub fn dump(&self) -> Vec<(u64, Vec)> { + let mut m: HashMap> = HashMap::new(); + for (peer, (group, _)) in &self.indexes { + let v = m.entry(*group).or_default(); + v.push(*peer); + } + for v in m.values_mut() { + v.sort(); + } + let mut v: Vec<_> = m.into_iter().collect(); + v.sort_by(|a1, a2| a1.0.cmp(&a2.0)); + v + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::progress::Progress; + + fn next_delegate_and_bcast_targets(group: &Groups) -> (u64, Vec) { + group + .bcast_targets + .iter() + .next() + .map(|(k, v)| (*k, v.clone())) + .unwrap() + } + + #[test] + fn test_group() { + let mut group = Groups::new(vec![(1, vec![1, 2]), (2, vec![3, 4, 5]), (3, vec![6])]); + assert_eq!(group.unresolved.len(), 6); + group.set_leader_group_id(1); + + let mut prs = ProgressSet::new(crate::default_logger()); + for id in 1..=6 { + let mut pr = Progress::new(100, 100); + pr.matched = 99; + prs.insert_voter(id, pr).unwrap(); + } + + // After the resolving, only group 2 should be delegated. + group.resolve_delegates(&prs); + assert_eq!(group.bcast_targets.len(), 1); + let (delegate, mut targets) = next_delegate_and_bcast_targets(&group); + targets.push(delegate); + targets.sort(); + assert_eq!(targets, [3, 4, 5]); + + // Remove a delegate which doesn't exists. + group.remove_delegate(6); + assert!(group.unresolved.is_empty()); + + // Remove a peer which is not delegate. + let remove = match delegate { + 3 => 4, + 4 => 5, + 5 => 3, + _ => unreachable!(), + }; + group.remove_delegate(remove); + assert!(group.unresolved.is_empty()); + let (_, targets) = next_delegate_and_bcast_targets(&group); + assert_eq!(targets.len(), 2); + + // Remove a delegate. + group.remove_delegate(delegate); + assert_eq!(group.unresolved.len(), 2); + group.resolve_delegates(&prs); + let (_, targets) = next_delegate_and_bcast_targets(&group); + assert_eq!(targets.len(), 1); + + // Add the removed peer back, without group id. + let peer = delegate; + group.update_group_id(peer, INVALID_ID); + assert!(group.unresolved.is_empty()); + + // Add the removed peer back, with group id. + assert!(!group.update_group_id(peer, 2)); + let (_, targets) = next_delegate_and_bcast_targets(&group); + assert_eq!(targets.len(), 2); + + // Get the new delegate. + let (delegate, _) = next_delegate_and_bcast_targets(&group); + assert_ne!(peer, delegate); + + // The peer reports to the group again, without group id. + group.update_group_id(peer, INVALID_ID); + let (_, targets) = next_delegate_and_bcast_targets(&group); + assert_eq!(targets.len(), 1); + + // The delegate changes group to 3. + assert!(group.update_group_id(delegate, 3)); + assert!(group.bcast_targets.is_empty()); + group.resolve_delegates(&prs); + let (_, targets) = next_delegate_and_bcast_targets(&group); + assert!(targets.contains(&delegate) || targets.contains(&6)); + assert!(group.get_delegate(6) == 6 || group.get_delegate(6) == delegate); + } +} diff --git a/src/lib.rs b/src/lib.rs index 87f35d158..746697fd1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -472,6 +472,7 @@ macro_rules! fatal { mod config; mod errors; +mod group; mod log_unstable; mod progress; #[cfg(test)] diff --git a/src/progress/mod.rs b/src/progress/mod.rs index 70c4b9778..3c80ddc65 100644 --- a/src/progress/mod.rs +++ b/src/progress/mod.rs @@ -27,7 +27,7 @@ pub enum ProgressState { Probe, /// Whether it's replicating. Replicate, - /// Whethers it's a snapshot. + /// Whether it's a snapshot. Snapshot, } @@ -98,7 +98,7 @@ impl Progress { } } - fn reset_state(&mut self, state: ProgressState) { + pub(crate) fn reset_state(&mut self, state: ProgressState) { self.paused = false; self.pending_snapshot = 0; self.state = state; diff --git a/src/progress/progress_set.rs b/src/progress/progress_set.rs index 629801db3..33d084c7a 100644 --- a/src/progress/progress_set.rs +++ b/src/progress/progress_set.rs @@ -27,13 +27,14 @@ use crate::{DefaultHashBuilder, HashMap, HashSet}; /// A Raft internal representation of a Configuration. /// /// This is corollary to a ConfState, but optimized for `contains` calls. -#[derive(Clone, Debug, Default, PartialEq, Getters)] +#[derive(Clone, Debug, Default, PartialEq, Getters, Setters)] pub struct Configuration { /// The voter set. #[get = "pub"] voters: HashSet, /// The learner set. #[get = "pub"] + #[set = "pub"] learners: HashSet, } @@ -421,22 +422,34 @@ impl ProgressSet { /// Doing this will set the `recent_active` of each peer to false. /// /// This should only be called by the leader. - pub fn quorum_recently_active( + pub fn quorum_recently_active( &mut self, perspective_of: u64, + mut f: F, quorum_fn: fn(usize) -> usize, - ) -> bool { + ) -> bool + where + F: FnMut(u64, bool), + { let mut active = HashSet::default(); - for (&id, pr) in self.voters_mut() { - if id == perspective_of { - active.insert(id); + for id in &self.configuration.voters { + if *id == perspective_of { + active.insert(*id); continue; } + let pr = self.progress.get_mut(id).unwrap(); + f(*id, pr.recent_active); if pr.recent_active { - active.insert(id); + active.insert(*id); } + pr.recent_active = false; } - for pr in self.progress.values_mut() { + for id in &self.configuration.learners { + if *id == perspective_of { + continue; + } + let pr = self.progress.get_mut(id).unwrap(); + f(*id, pr.recent_active); pr.recent_active = false; } self.configuration.has_quorum(&active, quorum_fn) diff --git a/src/raft.rs b/src/raft.rs index 4da5edeb2..b82a968e4 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -21,6 +21,7 @@ use rand::{self, Rng}; use slog::{self, Logger}; use super::errors::{Error, Result, StorageError}; +use super::group::Groups; use super::progress::progress_set::{CandidacyStatus, ProgressSet}; use super::progress::{Progress, ProgressState}; use super::raft_log::RaftLog; @@ -177,6 +178,15 @@ pub struct Raft { quorum_fn: fn(usize) -> usize, + /// Raft groups inner state + pub groups: Groups, + /// The group ID of this node + pub group_id: u64, + // Indicates that self is a delegate or not. + // Become true when receive a `MsgAppend` contains non-empty `bcast_targets` from leader + // Become false when receive a normal `MsgAppend` without `bcast_targets` + is_delegate: bool, + /// The logger for the raft structure. pub(crate) logger: slog::Logger, } @@ -249,6 +259,9 @@ impl Raft { skip_bcast_commit: c.skip_bcast_commit, batch_append: c.batch_append, quorum_fn: c.quorum_fn, + groups: Default::default(), + group_id: c.group_id, + is_delegate: false, logger, }; for p in voters { @@ -347,7 +360,7 @@ impl Raft { /// Returns whether the current raft is in lease. pub fn in_lease(&self) -> bool { - self.state == StateRole::Leader && self.check_quorum + self.is_leader() && self.check_quorum } /// For testing leader lease @@ -398,7 +411,10 @@ impl Raft { to = m.to; "msg" => ?m, ); - m.from = self.id; + if m.from == INVALID_ID { + m.from = self.id; + } + m.group_id = self.group_id; if m.get_msg_type() == MessageType::MsgRequestVote || m.get_msg_type() == MessageType::MsgRequestPreVote || m.get_msg_type() == MessageType::MsgRequestVoteResponse @@ -475,38 +491,16 @@ impl Raft { } let (sindex, sterm) = (snapshot.get_metadata().index, snapshot.get_metadata().term); m.set_snapshot(snapshot); - debug!( - self.logger, - "[firstindex: {first_index}, commit: {committed}] sent snapshot[index: {snapshot_index}, term: {snapshot_term}] to {to}", - first_index = self.raft_log.first_index(), - committed = self.raft_log.committed, - snapshot_index = sindex, - snapshot_term = sterm, - to = to; - "progress" => ?pr, - ); - pr.become_snapshot(sindex); - debug!( - self.logger, - "paused sending replication messages to {}", - to; - "progress" => ?pr, - ); + self.pr_become_snapshot(to, pr, sindex, sterm); true } - fn prepare_send_entries( - &mut self, - m: &mut Message, - pr: &mut Progress, - term: u64, - ents: Vec, - ) { + fn prepare_send_entries(&mut self, m: &mut Message, pr: &mut Progress, ents: Vec) { m.set_msg_type(MessageType::MsgAppend); m.index = pr.next_idx - 1; - m.log_term = term; - m.set_entries(ents.into()); + m.log_term = self.raft_log.term(m.index).unwrap(); m.commit = self.raft_log.committed; + m.set_entries(ents.into()); if !m.entries.is_empty() { let last = m.entries.last().unwrap().index; pr.update_state(last); @@ -537,6 +531,17 @@ impl Raft { is_batched } + // Attach group info to `m` before it's sent out. It can be called on the leader or delegates. + fn attach_group_info(&self, m: &mut Message) { + debug_assert!(m.to != INVALID_ID); + if self.is_delegate { + m.from = self.leader_id; + m.delegate = self.id; + } else if let Some(ids) = self.groups.get_bcast_targets(m.to) { + m.set_bcast_targets(ids.clone()); + } + } + /// Sends RPC, with entries to the given peer. pub fn send_append(&mut self, to: u64, pr: &mut Progress) { if pr.is_paused() { @@ -550,6 +555,7 @@ impl Raft { } let mut m = Message::default(); m.to = to; + self.attach_group_info(&mut m); if pr.pending_request_snapshot != INVALID_INDEX { // Check pending request snapshot first to avoid unnecessary loading entries. if !self.prepare_send_snapshot(&mut m, pr, to) { @@ -568,7 +574,7 @@ impl Raft { if self.batch_append && self.try_batching(to, pr, &mut ents) { return; } - self.prepare_send_entries(&mut m, pr, term.unwrap(), ents); + self.prepare_send_entries(&mut m, pr, ents); } } self.send(m); @@ -596,17 +602,20 @@ impl Raft { /// Sends RPC, with entries to all peers that are not up-to-date /// according to the progress recorded in r.prs(). pub fn bcast_append(&mut self) { - let self_id = self.id; - let mut prs = self.take_prs(); - prs.iter_mut() - .filter(|&(id, _)| *id != self_id) - .for_each(|(id, pr)| self.send_append(*id, pr)); + let (self_id, mut prs) = (self.id, self.take_prs()); + self.groups.resolve_delegates(&prs); + for (id, pr) in prs.iter_mut().filter(|(id, _)| **id != self_id) { + let delegate = self.groups.get_delegate(*id); + if delegate == INVALID_ID || delegate == *id { + self.send_append(*id, pr); + } + } self.set_prs(prs); } /// Broadcasts heartbeats to all the followers if it's leader. pub fn ping(&mut self) { - if self.state == StateRole::Leader { + if self.is_leader() { self.bcast_heartbeat(); } } @@ -734,7 +743,7 @@ impl Raft { has_ready = true; let _ = self.step(m); } - if self.state == StateRole::Leader && self.lead_transferee.is_some() { + if self.is_leader() && self.lead_transferee.is_some() { self.abort_leader_transfer() } } @@ -833,7 +842,8 @@ impl Raft { self.reset(term); self.leader_id = self.id; self.state = StateRole::Leader; - + self.groups.set_leader_group_id(self.group_id); + self.is_delegate = false; // Followers enter replicate mode when they've been successfully probed // (perhaps after having received a snapshot as a result). The leader is // trivially in this state. Note that r.reset() has initialized this @@ -932,7 +942,16 @@ impl Raft { /// Steps the raft along via a message. This should be called everytime your raft receives a /// message from a peer. + #[allow(clippy::collapsible_if)] pub fn step(&mut self, m: Message) -> Result<()> { + if m.term != 0 && m.group_id != INVALID_ID && self.is_leader() { + if self.groups.update_group_id(m.from, m.group_id) { + let prs = self.take_prs(); + self.groups.resolve_delegates(&prs); + self.set_prs(prs); + } + } + // Handle the message term, which may result in our stepping down to a follower. if m.term == 0 { // local message @@ -1120,7 +1139,7 @@ impl Raft { } fn hup(&mut self, transfer_leader: bool) { - if self.state == StateRole::Leader { + if self.is_leader() { debug!( self.logger, "ignoring MsgHup because already leader"; @@ -1247,18 +1266,7 @@ impl Raft { } // Transfer leadership is in progress. - if let Some(lead_transferee) = self.lead_transferee { - let last_index = self.raft_log.last_index(); - if m.from == lead_transferee && pr.matched == last_index { - info!( - self.logger, - "sent MsgTimeoutNow to {from} after received MsgAppResp", - from = m.from; - ); - self.send_timeout_now(m.from); - } - } - + self.process_leader_transfer(m.from, pr.matched); match pr.state { ProgressState::Probe => pr.become_replicate(), ProgressState::Snapshot => { @@ -1278,6 +1286,32 @@ impl Raft { *maybe_commit = true; } + fn handle_append_response_on_delegate(&mut self, m: &Message) { + let mut prs = self.take_prs(); + let mut send_append = false; + let (mut _h1, mut _h2) = (false, false); + self.handle_append_response(&m, &mut prs, &mut _h1, &mut send_append, &mut _h2); + if send_append { + let from = m.from; + self.send_append(from, prs.get_mut(from).unwrap()); + } + self.set_prs(prs); + } + + fn process_leader_transfer(&mut self, from: u64, match_idx: u64) { + if let Some(lead_transferee) = self.lead_transferee { + let last_index = self.raft_log.last_index(); + if from == lead_transferee && match_idx == last_index { + info!( + self.logger, + "sent MsgTimeoutNow to {from} after received MsgAppResp", + from = from; + ); + self.send_timeout_now(from); + } + } + } + fn handle_heartbeat_response( &mut self, m: &Message, @@ -1422,7 +1456,7 @@ impl Raft { /// Check message's progress to decide which action should be taken. fn check_message_with_progress( &mut self, - m: &mut Message, + m: &Message, send_append: &mut bool, old_paused: &mut bool, maybe_commit: &mut bool, @@ -1452,6 +1486,7 @@ impl Raft { } } MessageType::MsgUnreachable => { + self.groups.remove_delegate(m.from); let pr = prs.get_mut(m.from).unwrap(); // During optimistic replication, if the remote becomes unreachable, // there is huge probability that a MsgAppend is lost. @@ -1601,7 +1636,7 @@ impl Raft { let mut old_paused = false; let mut more_to_send = vec![]; self.check_message_with_progress( - &mut m, + &m, &mut send_append, &mut old_paused, &mut maybe_commit, @@ -1619,7 +1654,7 @@ impl Raft { } } - if send_append { + if send_append && !self.groups.is_delegated(m.from) { let from = m.from; let mut prs = self.take_prs(); self.send_append(from, prs.get_mut(from).unwrap()); @@ -1649,7 +1684,7 @@ impl Raft { MessageType::MsgAppend => { debug_assert_eq!(self.term, m.term); self.become_follower(m.term, m.from); - self.handle_append_entries(&m); + self.handle_append_message(m); } MessageType::MsgHeartbeat => { debug_assert_eq!(self.term, m.term); @@ -1731,7 +1766,7 @@ impl Raft { MessageType::MsgAppend => { self.election_elapsed = 0; self.leader_id = m.from; - self.handle_append_entries(&m); + self.handle_append_message(m); } MessageType::MsgHeartbeat => { self.election_elapsed = 0; @@ -1804,6 +1839,9 @@ impl Raft { }; self.read_states.push(rs); } + MessageType::MsgAppendResponse => { + self.handle_append_response_on_delegate(&m); + } _ => {} } Ok(()) @@ -1811,7 +1849,7 @@ impl Raft { /// Request a snapshot from a leader. pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> { - if self.state == StateRole::Leader { + if self.is_leader() { info!( self.logger, "can not request snapshot on leader; dropping request snapshot"; @@ -1840,37 +1878,64 @@ impl Raft { Err(Error::RequestSnapshotDropped) } - // TODO: revoke pub when there is a better way to test. /// For a given message, append the entries to the log. - pub fn handle_append_entries(&mut self, m: &Message) { + pub fn handle_append_message(&mut self, mut m: Message) { if self.pending_request_snapshot != INVALID_INDEX { self.send_request_snapshot(); return; } + + let mut to_send = Message::default(); + to_send.set_msg_type(MessageType::MsgAppendResponse); + to_send.to = m.from; + if m.index < self.raft_log.committed { debug!( self.logger, "got message with lower index than committed."; ); - let mut to_send = Message::default(); - to_send.set_msg_type(MessageType::MsgAppendResponse); - to_send.to = m.from; to_send.index = self.raft_log.committed; + if msg_from_delegate(&m) { + to_send.to = m.delegate; + } self.send(to_send); return; } - debug_assert!(m.log_term != 0, "{:?} log term can't be 0", m); - - let mut to_send = Message::default(); - to_send.to = m.from; - to_send.set_msg_type(MessageType::MsgAppendResponse); + let old_is_delegate = self.is_delegate; + self.is_delegate = !m.get_bcast_targets().is_empty(); + self.handle_append_entries(&m, &mut to_send); + let accepted = !to_send.reject; + self.send(to_send); + if accepted { + let mut prs = self.take_prs(); + for target in m.take_bcast_targets() { + // Self is delegate, sync raft logs to other members. + if let Some(pr) = prs.get_mut(target) { + if !old_is_delegate && self.is_delegate { + // Make sure the delegate can send a message to the target + pr.become_probe(); + pr.optimistic_update(self.raft_log.last_index()); + } + self.send_append(target, pr); + } + } + self.set_prs(prs); + } + } + fn handle_append_entries(&mut self, m: &Message, to_send: &mut Message) { + debug_assert!(m.log_term != 0, "{:?} log term can't be 0", m); if let Some((_, last_idx)) = self .raft_log .maybe_append(m.index, m.log_term, m.commit, &m.entries) { to_send.set_index(last_idx); - self.send(to_send); + if msg_from_delegate(m) { + // Follower receives the message from a delegate, send response back to the delegate + let mut to_delegate = to_send.clone(); + to_delegate.to = m.delegate; + self.send(to_delegate); + } } else { debug!( self.logger, @@ -1882,10 +1947,13 @@ impl Raft { "index" => m.index, "logterm" => ?self.raft_log.term(m.index), ); + if msg_from_delegate(m) { + // Follower only sends rejection to the delegate. + to_send.to = m.delegate; + } to_send.index = m.index; to_send.reject = true; to_send.reject_hint = self.raft_log.last_index(); - self.send(to_send); } } @@ -1908,6 +1976,9 @@ impl Raft { debug_assert!(m.term != 0, "{:?} term can't be 0", m); let metadata = m.get_snapshot().get_metadata(); let (sindex, sterm) = (metadata.index, metadata.term); + let mut to_send = Message::default(); + to_send.set_msg_type(MessageType::MsgAppendResponse); + to_send.to = m.from; if self.restore(m.take_snapshot()) { info!( self.logger, @@ -1917,11 +1988,27 @@ impl Raft { snapshot_index = sindex, snapshot_term = sterm; ); - let mut to_send = Message::default(); - to_send.set_msg_type(MessageType::MsgAppendResponse); - to_send.to = m.from; to_send.index = self.raft_log.last_index(); - self.send(to_send); + if msg_from_delegate(&m) { + // The snapshot comes from the peer's delegate. + let mut to_delegate = to_send.clone(); + to_delegate.to = m.delegate; + self.send(to_delegate); + } else if !m.get_bcast_targets().is_empty() { + let mut prs = self.take_prs(); + for member in m.take_bcast_targets() { + if let Some(pr) = prs.get_mut(member) { + // TODO: send snapshot directly. + let mut to_member = Message::default(); + to_member.from = m.from; + to_member.to = member; + if self.prepare_send_snapshot(&mut to_member, pr, member) { + self.send(to_member); + } + } + } + self.set_prs(prs) + } } else { info!( self.logger, @@ -1930,12 +2017,12 @@ impl Raft { snapshot_index = sindex, snapshot_term = sterm; ); - let mut to_send = Message::default(); - to_send.set_msg_type(MessageType::MsgAppendResponse); - to_send.to = m.from; + if msg_from_delegate(&m) { + to_send.to = m.delegate + } to_send.index = self.raft_log.committed; - self.send(to_send); } + self.send(to_send); } fn restore_raft(&mut self, snap: &Snapshot) -> Option { @@ -2022,6 +2109,11 @@ impl Raft { self.pending_conf_index > self.raft_log.applied } + /// Check if self is the leader now. + pub fn is_leader(&self) -> bool { + self.state == StateRole::Leader + } + /// Specifies if the commit should be broadcast. pub fn should_bcast_commit(&self) -> bool { !self.skip_bcast_commit || self.has_pending_conf() @@ -2108,7 +2200,7 @@ impl Raft { self.bcast_append(); } // If the removed node is the lead_transferee, then abort the leadership transferring. - if self.state == StateRole::Leader && self.lead_transferee == Some(id) { + if self.is_leader() && self.lead_transferee == Some(id) { self.abort_leader_transfer(); } @@ -2128,6 +2220,15 @@ impl Raft { } } + /// Sets the `Groups`. Only should be used for tests. + pub fn set_groups(&mut self, groups: Vec<(u64, Vec)>) { + let groups = Groups::new(groups); + if let Some(gid) = groups.get_group_id(self.id) { + self.group_id = gid; + } + self.groups = groups; + } + /// Takes the progress set (destructively turns to `None`). pub fn take_prs(&mut self) -> ProgressSet { self.prs.take().unwrap() @@ -2195,7 +2296,19 @@ impl Raft { fn check_quorum_active(&mut self) -> bool { let self_id = self.id; let quorum_fn = self.quorum_fn; - self.mut_prs().quorum_recently_active(self_id, quorum_fn) + let mut prs = self.take_prs(); + let res = prs.quorum_recently_active( + self_id, + |id, active| { + if !active { + // Remove the non-active delegate peer + self.groups.remove_delegate(id); + } + }, + quorum_fn, + ); + self.set_prs(prs); + res } /// Issues a message to timeout immediately. @@ -2219,4 +2332,34 @@ impl Raft { m.request_snapshot = self.pending_request_snapshot; self.send(m); } + + fn pr_become_snapshot( + &self, + id: u64, + progress: &mut Progress, + snapshot_index: u64, + snapshot_term: u64, + ) { + debug!( + self.logger, + "[firstindex: {first_index}, commit: {committed}] sent snapshot[index: {snapshot_index}, term: {snapshot_term}] to {to}", + first_index = self.raft_log.first_index(), + committed = self.raft_log.committed, + snapshot_index = snapshot_index, + snapshot_term = snapshot_term, + to = id; + "progress" => ?progress, + ); + progress.become_snapshot(snapshot_index); + debug!( + self.logger, + "paused sending replication messages to {}", + id; + "progress" => ?progress, + ); + } +} + +fn msg_from_delegate(m: &Message) -> bool { + m.delegate != INVALID_ID } diff --git a/src/raft_log.rs b/src/raft_log.rs index 83973cf1f..86abc7795 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -405,7 +405,7 @@ impl RaftLog { } let length = self.last_index() + 1 - first_index; - if low < first_index || high > first_index + length { + if high > first_index + length { fatal!( self.unstable.logger, "slice[{},{}] out of bound[{},{}]", diff --git a/src/raw_node.rs b/src/raw_node.rs index 1dde5220e..8de4dcc9a 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -512,6 +512,13 @@ impl RawNode { pub fn set_batch_append(&mut self, batch_append: bool) { self.raft.set_batch_append(batch_append) } + + /// Update raft groups config for Follower Replication in flight + #[inline] + pub fn update_groups_config(&mut self, config: Vec<(u64, Vec)>) { + // The delegate cache will be removed + self.raft.set_groups(config); + } } #[cfg(test)]