Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Retrieve Key Log IDs via RaftLogReader::get_key_log_ids() #1264

Merged
merged 1 commit into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions openraft/src/engine/leader_log_ids.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::fmt;
use std::ops::RangeInclusive;

use crate::type_config::alias::LogIdOf;
use crate::RaftTypeConfig;

/// The first and the last log id belonging to a Leader.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct LeaderLogIds<C: RaftTypeConfig> {
log_id_range: Option<RangeInclusive<LogIdOf<C>>>,
}

impl<C> fmt::Display for LeaderLogIds<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.log_id_range {
None => write!(f, "None"),
Some(rng) => write!(f, "({}, {})", rng.start(), rng.end()),
}
}
}

impl<C> LeaderLogIds<C>
where C: RaftTypeConfig
{
pub(crate) fn new(log_id_range: Option<RangeInclusive<LogIdOf<C>>>) -> Self {
Self { log_id_range }
}

/// Used only in tests
#[allow(dead_code)]
pub(crate) fn new_single(log_id: LogIdOf<C>) -> Self {
Self {
log_id_range: Some(log_id.clone()..=log_id),
}
}

/// Used only in tests
#[allow(dead_code)]
pub(crate) fn new_start_end(first: LogIdOf<C>, last: LogIdOf<C>) -> Self {
Self {
log_id_range: Some(first..=last),
}
}

pub(crate) fn first(&self) -> Option<&LogIdOf<C>> {
self.log_id_range.as_ref().map(|x| x.start())
}

pub(crate) fn last(&self) -> Option<&LogIdOf<C>> {
self.log_id_range.as_ref().map(|x| x.end())
}
}
43 changes: 23 additions & 20 deletions openraft/src/engine/log_id_list.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::ops::RangeInclusive;

use crate::engine::leader_log_ids::LeaderLogIds;
use crate::log_id::RaftLogId;
use crate::storage::RaftLogReaderExt;
use crate::type_config::alias::LogIdOf;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::RaftLogReader;
use crate::RaftTypeConfig;
use crate::StorageError;

Expand Down Expand Up @@ -43,24 +48,17 @@ where C: RaftTypeConfig
/// A-------B-------C : find(A,B); find(B,C) // both find `B`, need to de-dup
/// A-------C-------C : find(A,C)
/// ```
pub(crate) async fn load_log_ids<LRX>(
last_purged_log_id: Option<LogId<C::NodeId>>,
last_log_id: Option<LogId<C::NodeId>>,
sto: &mut LRX,
) -> Result<LogIdList<C>, StorageError<C>>
pub(crate) async fn get_key_log_ids<LR>(
range: RangeInclusive<LogId<C::NodeId>>,
sto: &mut LR,
) -> Result<Vec<LogIdOf<C>>, StorageError<C>>
where
LRX: RaftLogReaderExt<C>,
LR: RaftLogReader<C> + ?Sized,
{
let mut res = vec![];
let first = range.start().clone();
let last = range.end().clone();

let last = match last_log_id {
None => return Ok(LogIdList::new(res)),
Some(x) => x,
};
let first = match last_purged_log_id {
None => sto.get_log_id(0).await?,
Some(x) => x,
};
let mut res: Vec<LogIdOf<C>> = vec![];

// Recursion stack
let mut stack = vec![(first, last.clone())];
Expand Down Expand Up @@ -114,13 +112,16 @@ where C: RaftTypeConfig
res.push(last);
}

Ok(LogIdList::new(res))
Ok(res)
}
}

impl<C> LogIdList<C>
where C: RaftTypeConfig
{
/// Create a new `LogIdList`.
///
/// It stores the last purged log id, and a series of key log ids.
pub fn new(key_log_ids: impl IntoIterator<Item = LogId<C::NodeId>>) -> Self {
Self {
key_log_ids: key_log_ids.into_iter().collect(),
Expand Down Expand Up @@ -310,18 +311,20 @@ where C: RaftTypeConfig
/// Note that the 0-th log does not belong to any leader(but a membership log to initialize a
/// cluster) but this method does not differentiate between them.
#[allow(dead_code)]
pub(crate) fn by_last_leader(&self) -> &[LogId<C::NodeId>] {
pub(crate) fn by_last_leader(&self) -> LeaderLogIds<C> {
let ks = &self.key_log_ids;
let l = ks.len();
if l < 2 {
return ks;
let last = self.last();
return LeaderLogIds::new(last.map(|x| x.clone()..=x.clone()));
}

// There are at most two(adjacent) key log ids with the same leader_id
if ks[l - 1].leader_id() == ks[l - 2].leader_id() {
&ks[l - 2..]
LeaderLogIds::new_start_end(ks[l - 2].clone(), ks[l - 1].clone())
} else {
&ks[l - 1..]
let last = self.last().cloned().unwrap();
LeaderLogIds::new_single(last)
}
}
}
3 changes: 2 additions & 1 deletion openraft/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ mod command_kind;
mod engine_config;
mod engine_impl;
mod engine_output;
mod log_id_list;
mod replication_progress;

pub(crate) mod command;
pub(crate) mod handler;
pub(crate) mod leader_log_ids;
pub(crate) mod log_id_list;
pub(crate) mod time_state;

#[cfg(test)]
Expand Down
17 changes: 12 additions & 5 deletions openraft/src/engine/tests/log_id_list_test.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::engine::leader_log_ids::LeaderLogIds;
use crate::engine::testing::UTConfig;
use crate::engine::LogIdList;
use crate::testing::log_id;
Expand Down Expand Up @@ -357,23 +358,29 @@ fn test_log_id_list_get_log_id() -> anyhow::Result<()> {
fn test_log_id_list_by_last_leader() -> anyhow::Result<()> {
// len == 0
let ids = LogIdList::<UTConfig>::default();
assert_eq!(ids.by_last_leader(), &[]);
assert_eq!(ids.by_last_leader(), LeaderLogIds::new(None));

// len == 1
let ids = LogIdList::<UTConfig>::new([log_id(1, 1, 1)]);
assert_eq!(&[log_id(1, 1, 1)], ids.by_last_leader());
assert_eq!(LeaderLogIds::new_single(log_id(1, 1, 1)), ids.by_last_leader());

// len == 2, the last leader has only one log
let ids = LogIdList::<UTConfig>::new([log_id(1, 1, 1), log_id(3, 1, 3)]);
assert_eq!(&[log_id(3, 1, 3)], ids.by_last_leader());
assert_eq!(LeaderLogIds::new_single(log_id(3, 1, 3)), ids.by_last_leader());

// len == 2, the last leader has two logs
let ids = LogIdList::<UTConfig>::new([log_id(1, 1, 1), log_id(1, 1, 3)]);
assert_eq!(&[log_id(1, 1, 1), log_id(1, 1, 3)], ids.by_last_leader());
assert_eq!(
LeaderLogIds::new_start_end(log_id(1, 1, 1), log_id(1, 1, 3)),
ids.by_last_leader()
);

// len > 2, the last leader has only more than one logs
let ids = LogIdList::<UTConfig>::new([log_id(1, 1, 1), log_id(7, 1, 8), log_id(7, 1, 10)]);
assert_eq!(&[log_id(7, 1, 8), log_id(7, 1, 10)], ids.by_last_leader());
assert_eq!(
LeaderLogIds::new_start_end(log_id(7, 1, 8), log_id(7, 1, 10)),
ids.by_last_leader()
);

Ok(())
}
10 changes: 8 additions & 2 deletions openraft/src/proposer/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::fmt;

use crate::display_ext::DisplayInstantExt;
use crate::display_ext::DisplayOptionExt;
use crate::engine::leader_log_ids::LeaderLogIds;
use crate::progress::Progress;
use crate::progress::VecProgress;
use crate::proposer::Leader;
Expand Down Expand Up @@ -110,8 +111,13 @@ where
vote.into_committed()
};

let last_leader_log_ids = self.last_log_id().cloned().into_iter().collect::<Vec<_>>();
// TODO: tricky: the new LeaderId is different from the last log id
// Thus only the last().index is used.
// Thus the first() is ignored.
// But we should not fake the first() there.
let last = self.last_log_id();
let last_leader_log_ids = LeaderLogIds::new(last.map(|last| last.clone()..=last.clone()));

Leader::new(vote, self.quorum_set.clone(), self.learner_ids, &last_leader_log_ids)
Leader::new(vote, self.quorum_set.clone(), self.learner_ids, last_leader_log_ids)
}
}
60 changes: 45 additions & 15 deletions openraft/src/proposer/leader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt;

use crate::display_ext::DisplayInstantExt;
use crate::display_ext::DisplaySliceExt;
use crate::engine::leader_log_ids::LeaderLogIds;
use crate::progress::entry::ProgressEntry;
use crate::progress::Progress;
use crate::progress::VecProgress;
Expand Down Expand Up @@ -82,19 +82,19 @@ where
vote: CommittedVote<C>,
quorum_set: QS,
learner_ids: impl IntoIterator<Item = C::NodeId>,
last_leader_log_id: &[LogIdOf<C>],
last_leader_log_id: LeaderLogIds<C>,
) -> Self {
debug_assert!(
Some(vote.committed_leader_id()) >= last_leader_log_id.last().map(|x| x.committed_leader_id().clone()),
"vote {} must GE last_leader_log_id.last() {}",
vote,
last_leader_log_id.display()
last_leader_log_id
);
debug_assert!(
Some(vote.committed_leader_id()) >= last_leader_log_id.first().map(|x| x.committed_leader_id().clone()),
"vote {} must GE last_leader_log_id.first() {}",
vote,
last_leader_log_id.display()
last_leader_log_id
);

let learner_ids = learner_ids.into_iter().collect::<Vec<_>>();
Expand Down Expand Up @@ -222,6 +222,7 @@ where

#[cfg(test)]
mod tests {
use crate::engine::leader_log_ids::LeaderLogIds;
use crate::engine::testing::UTConfig;
use crate::entry::RaftEntry;
use crate::progress::Progress;
Expand All @@ -238,7 +239,12 @@ mod tests {
tracing::info!("--- vote greater than last log id, create new noop_log_id");
{
let vote = Vote::new(2, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 1), log_id(1, 2, 3)]);
let leader = Leader::<UTConfig, _>::new(
vote,
vec![1, 2, 3],
vec![],
LeaderLogIds::new_start_end(log_id(1, 2, 1), log_id(1, 2, 3)),
);

assert_eq!(leader.noop_log_id(), Some(&log_id(2, 2, 4)));
assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 3)));
Expand All @@ -247,7 +253,12 @@ mod tests {
tracing::info!("--- vote equals last log id, reuse noop_log_id");
{
let vote = Vote::new(1, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 1), log_id(1, 2, 3)]);
let leader = Leader::<UTConfig, _>::new(
vote,
vec![1, 2, 3],
vec![],
LeaderLogIds::new_start_end(log_id(1, 2, 1), log_id(1, 2, 3)),
);

assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 1)));
assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 3)));
Expand All @@ -256,7 +267,8 @@ mod tests {
tracing::info!("--- vote equals last log id, reuse noop_log_id, last_leader_log_id.len()==1");
{
let vote = Vote::new(1, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 3)]);
let leader =
Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], LeaderLogIds::new_single(log_id(1, 2, 3)));

assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 3)));
assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 3)));
Expand All @@ -265,7 +277,7 @@ mod tests {
tracing::info!("--- no last log ids, create new noop_log_id, last_leader_log_id.len()==0");
{
let vote = Vote::new(1, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[]);
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], LeaderLogIds::new(None));

assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 0)));
assert_eq!(leader.last_log_id(), None);
Expand All @@ -275,7 +287,8 @@ mod tests {
#[test]
fn test_leader_established() {
let vote = Vote::new(2, 2).into_committed();
let mut leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 3)]);
let mut leader =
Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], LeaderLogIds::new_single(log_id(1, 2, 3)));

let mut entries = vec![Entry::<UTConfig>::new_blank(log_id(5, 5, 2))];
leader.assign_log_ids(&mut entries);
Expand All @@ -291,7 +304,7 @@ mod tests {
#[test]
fn test_1_entry_none_last_log_id() {
let vote = Vote::new(0, 0).into_committed();
let mut leading = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[]);
let mut leading = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], LeaderLogIds::new(None));

let mut entries: Vec<Entry<UTConfig>> = vec![blank_ent(1, 1, 1)];
leading.assign_log_ids(&mut entries);
Expand All @@ -303,7 +316,8 @@ mod tests {
#[test]
fn test_no_entries_provided() {
let vote = Vote::new(2, 2).into_committed();
let mut leading = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 1, 8)]);
let mut leading =
Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], LeaderLogIds::new_single(log_id(1, 1, 8)));

let mut entries: Vec<Entry<UTConfig>> = vec![];
leading.assign_log_ids(&mut entries);
Expand All @@ -313,7 +327,8 @@ mod tests {
#[test]
fn test_multiple_entries() {
let vote = Vote::new(2, 2).into_committed();
let mut leading = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], [], &[log_id(1, 1, 8)]);
let mut leading =
Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], [], LeaderLogIds::new_single(log_id(1, 1, 8)));

let mut entries: Vec<Entry<UTConfig>> = vec![blank_ent(1, 1, 1), blank_ent(1, 1, 1), blank_ent(1, 1, 1)];

Expand All @@ -326,7 +341,12 @@ mod tests {

#[test]
fn test_leading_last_quorum_acked_time_leader_is_voter() {
let mut leading = Leader::<UTConfig, Vec<u64>>::new(Vote::new(2, 1).into_committed(), vec![1, 2, 3], [4], &[]);
let mut leading = Leader::<UTConfig, Vec<u64>>::new(
Vote::new(2, 1).into_committed(),
vec![1, 2, 3],
[4],
LeaderLogIds::new(None),
);

let now1 = UTConfig::<()>::now();

Expand All @@ -337,7 +357,12 @@ mod tests {

#[test]
fn test_leading_last_quorum_acked_time_leader_is_learner() {
let mut leading = Leader::<UTConfig, Vec<u64>>::new(Vote::new(2, 4).into_committed(), vec![1, 2, 3], [4], &[]);
let mut leading = Leader::<UTConfig, Vec<u64>>::new(
Vote::new(2, 4).into_committed(),
vec![1, 2, 3],
[4],
LeaderLogIds::new(None),
);

let t2 = UTConfig::<()>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
Expand All @@ -352,7 +377,12 @@ mod tests {

#[test]
fn test_leading_last_quorum_acked_time_leader_is_not_member() {
let mut leading = Leader::<UTConfig, Vec<u64>>::new(Vote::new(2, 5).into_committed(), vec![1, 2, 3], [4], &[]);
let mut leading = Leader::<UTConfig, Vec<u64>>::new(
Vote::new(2, 5).into_committed(),
vec![1, 2, 3],
[4],
LeaderLogIds::new(None),
);

let t2 = UTConfig::<()>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
Expand Down
Loading
Loading