Skip to content

Commit

Permalink
Merge pull request #950 from drmingdrmer/31-refactor
Browse files Browse the repository at this point in the history
Refactor: replication/mod.rs
  • Loading branch information
drmingdrmer authored Nov 24, 2023
2 parents 9d032f0 + 2c0116b commit f5d7e54
Showing 1 changed file with 42 additions and 23 deletions.
65 changes: 42 additions & 23 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use crate::core::notify::Notify;
use crate::display_ext::DisplayOption;
use crate::display_ext::DisplayOptionExt;
use crate::error::HigherVote;
use crate::error::Infallible;
use crate::error::RPCError;
use crate::error::RaftError;
use crate::error::ReplicationClosed;
use crate::error::ReplicationError;
use crate::error::Timeout;
Expand Down Expand Up @@ -231,29 +233,15 @@ where
ReplicationError::RPCError(err) => {
tracing::error!(err = display(&err), "RPCError");

if let Some(request_id) = repl_id {
let _ = self.tx_raft_core.send(Notify::Network {
response: Response::Progress {
target: self.target,
request_id,
result: Err(err.to_string()),
session_id: self.session_id,
},
});
} else {
tracing::warn!(
err = display(&err),
"encountered RPCError but request_id is None, no response is sent"
);
}

// If there is an [`Unreachable`] error, we will backoff for a period of time
// Backoff will be reset if there is a successful RPC is sent.
if let RPCError::Unreachable(_unreachable) = err {
if let RPCError::Unreachable(_unreachable) = &err {
if self.backoff.is_none() {
self.backoff = Some(self.network.backoff());
}
}

self.send_progress_error(repl_id, err);
}
};
}
Expand Down Expand Up @@ -355,13 +343,13 @@ where

match append_resp {
AppendEntriesResponse::Success => {
self.update_matching(request_id, leader_time, log_id_range.last_log_id);
self.send_progress_matching(request_id, leader_time, log_id_range.last_log_id);
Ok(None)
}
AppendEntriesResponse::PartialSuccess(matching) => {
Self::debug_assert_partial_success(log_id_range, &matching);

self.update_matching(request_id, leader_time, matching);
self.send_progress_matching(request_id, leader_time, matching);
if matching < log_id_range.last_log_id {
// TODO(9): an RPC has already been made, it should use a newer time
Ok(Some(Data::new_logs(
Expand Down Expand Up @@ -391,14 +379,45 @@ where
debug_assert!(conflict.is_some(), "prev_log_id=None never conflict");

let conflict = conflict.unwrap();
self.update_conflicting(request_id, leader_time, conflict);
self.send_progress_conflicting(request_id, leader_time, conflict);

Ok(None)
}
}
}

fn update_conflicting(&mut self, request_id: Option<u64>, leader_time: InstantOf<C>, conflict: LogId<C::NodeId>) {
/// Send the error result to RaftCore.
/// RaftCore will then submit another replication command.
fn send_progress_error(
&mut self,
request_id: Option<u64>,
err: RPCError<C::NodeId, C::Node, RaftError<C::NodeId, Infallible>>,
) {
if let Some(request_id) = request_id {
let _ = self.tx_raft_core.send(Notify::Network {
response: Response::Progress {
target: self.target,
request_id,
result: Err(err.to_string()),
session_id: self.session_id,
},
});
} else {
tracing::warn!(
err = display(&err),
"encountered RPCError but request_id is None, no response is sent"
);
}
}

/// Send a `conflict` message to RaftCore.
/// RaftCore will then submit another replication command.
fn send_progress_conflicting(
&mut self,
request_id: Option<u64>,
leader_time: InstantOf<C>,
conflict: LogId<C::NodeId>,
) {
tracing::debug!(
target = display(self.target),
request_id = display(request_id.display()),
Expand Down Expand Up @@ -430,7 +449,7 @@ where
/// Update the `matching` log id, which is for tracking follower replication, and report the
/// matched log id to `RaftCore` to commit an entry.
#[tracing::instrument(level = "trace", skip(self))]
fn update_matching(
fn send_progress_matching(
&mut self,
request_id: Option<u64>,
leader_time: InstantOf<C>,
Expand Down Expand Up @@ -724,7 +743,7 @@ where
);

// TODO: update leader lease for every successfully sent chunk.
self.update_matching(request_id, leader_time, snapshot.meta.last_log_id);
self.send_progress_matching(request_id, leader_time, snapshot.meta.last_log_id);

return Ok(None);
}
Expand Down

0 comments on commit f5d7e54

Please sign in to comment.