Skip to content

Commit

Permalink
Change: use RaftTypeConfig to replace NID
Browse files Browse the repository at this point in the history
- `StorageError<NID>` to `StorageError<C>`
- `StorageIOError<NID>` to `StorageIOError<C>`
- `ErrorSubject<NID>` to `ErrorSubject<C>`
- `DefensiveError<NID>` to `DefensiveError<C>`
- `Violation<NID>` to `Violation<C>`
- `SnapshotSignature<NID>` to `SnapshotSignature<C>`
- `ChangeMembers<NID, N>` to `ChangeMembers<C>`
- `LogIdList<NID>` to `LogIdList<C>`

Upgrade tip:

Replace `StorageError<NID>` with `StorageError<C>`, such as:
```diff
-  async fn apply<I>(&mut self, entries: I) -> Result<Vec<Response>, StorageError<NodeId>>
+  async fn apply<I>(&mut self, entries: I) -> Result<Vec<Response>, StorageError<TypeConfig>>
```

Types to replace:

- `StorageError<NID>` to `StorageError<C>`
- `StorageIOError<NID>` to `StorageIOError<C>`
- `ErrorSubject<NID>` to `ErrorSubject<C>`
- `DefensiveError<NID>` to `DefensiveError<C>`
- `Violation<NID>` to `Violation<C>`
- `SnapshotSignature<NID>` to `SnapshotSignature<C>`
- `ChangeMembers<NID, N>` to `ChangeMembers<C>`
- `LogIdList<NID>` to `LogIdList<C>`
  • Loading branch information
drmingdrmer committed Jul 5, 2024
1 parent c6004cd commit 9dbcd4c
Show file tree
Hide file tree
Showing 46 changed files with 444 additions and 439 deletions.
35 changes: 20 additions & 15 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use openraft::SnapshotMeta;
use openraft::StorageError;
use openraft::StorageIOError;
use openraft::StoredMembership;
use openraft::TokioRuntime;
use openraft::Vote;
use serde::Deserialize;
use serde::Serialize;
Expand Down Expand Up @@ -106,7 +105,7 @@ impl RaftLogReader<TypeConfig> for Arc<LogStore> {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
&mut self,
range: RB,
) -> Result<Vec<Entry<TypeConfig>>, StorageError<NodeId>> {
) -> Result<Vec<Entry<TypeConfig>>, StorageError<TypeConfig>> {
let mut entries = vec![];
{
let log = self.log.read().await;
Expand All @@ -118,14 +117,14 @@ impl RaftLogReader<TypeConfig> for Arc<LogStore> {
Ok(entries)
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<TypeConfig>> {
Ok(self.vote.read().await.clone())
}
}

impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self))]
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<NodeId>> {
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<TypeConfig>> {
let data;
let last_applied_log;
let last_membership;
Expand Down Expand Up @@ -175,7 +174,7 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
}

impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<NodeId>> {
async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<TypeConfig>> {
let log = self.log.read().await;
let last_serialized = log.iter().rev().next().map(|(_, ent)| ent);

Expand All @@ -198,22 +197,22 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<NodeId>> {
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<TypeConfig>> {
let mut v = self.vote.write().await;
*v = Some(*vote);
Ok(())
}

#[tracing::instrument(level = "debug", skip(self))]
async fn truncate(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<NodeId>> {
async fn truncate(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<TypeConfig>> {
let mut log = self.log.write().await;
log.split_off(&log_id.index);

Ok(())
}

#[tracing::instrument(level = "debug", skip_all)]
async fn purge(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<NodeId>> {
async fn purge(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<TypeConfig>> {
{
let mut p = self.last_purged_log_id.write().await;
*p = Some(log_id);
Expand All @@ -226,8 +225,14 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
}

#[tracing::instrument(level = "trace", skip_all)]
async fn append<I>(&mut self, entries: I, callback: LogFlushed<TypeConfig>) -> Result<(), StorageError<NodeId>>
where I: IntoIterator<Item = Entry<TypeConfig>> + Send {
async fn append<I>(
&mut self,
entries: I,
callback: LogFlushed<TypeConfig>,
) -> Result<(), StorageError<TypeConfig>>
where
I: IntoIterator<Item = Entry<TypeConfig>> + Send,
{
{
let mut log = self.log.write().await;
log.extend(entries.into_iter().map(|entry| (entry.get_log_id().index, entry)));
Expand All @@ -246,12 +251,12 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
async fn applied_state(
&mut self,
) -> Result<(Option<LogId<NodeId>>, StoredMembership<TypeConfig>), StorageError<NodeId>> {
) -> Result<(Option<LogId<NodeId>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
let sm = self.sm.read().await;
Ok((sm.last_applied_log, sm.last_membership.clone()))
}

async fn apply<I>(&mut self, entries: I) -> Result<Vec<ClientResponse>, StorageError<NodeId>>
async fn apply<I>(&mut self, entries: I) -> Result<Vec<ClientResponse>, StorageError<TypeConfig>>
where I: IntoIterator<Item = Entry<TypeConfig>> + Send {
let mut sm = self.sm.write().await;

Expand All @@ -274,7 +279,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn begin_receiving_snapshot(&mut self) -> Result<Box<SnapshotDataOf<TypeConfig>>, StorageError<NodeId>> {
async fn begin_receiving_snapshot(&mut self) -> Result<Box<SnapshotDataOf<TypeConfig>>, StorageError<TypeConfig>> {
Ok(Box::new(Cursor::new(Vec::new())))
}

Expand All @@ -283,7 +288,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
&mut self,
meta: &SnapshotMeta<TypeConfig>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
) -> Result<(), StorageError<TypeConfig>> {
let new_snapshot = StoredSnapshot {
meta: meta.clone(),
data: snapshot.into_inner(),
Expand All @@ -304,7 +309,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, StorageError<NodeId>> {
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, StorageError<TypeConfig>> {
match &*self.current_snapshot.read().await {
Some(snapshot) => {
let data = snapshot.data.clone();
Expand Down
5 changes: 2 additions & 3 deletions cluster_benchmark/tests/benchmark/store/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,21 @@ use openraft::testing::Suite;
use openraft::StorageError;

use crate::store::LogStore;
use crate::store::NodeId;
use crate::store::StateMachineStore;
use crate::store::TypeConfig;

struct Builder {}

impl StoreBuilder<TypeConfig, Arc<LogStore>, Arc<StateMachineStore>> for Builder {
async fn build(&self) -> Result<((), Arc<LogStore>, Arc<StateMachineStore>), StorageError<NodeId>> {
async fn build(&self) -> Result<((), Arc<LogStore>, Arc<StateMachineStore>), StorageError<TypeConfig>> {
let log_store = LogStore::new_async().await;
let sm = Arc::new(StateMachineStore::new());
Ok(((), log_store, sm))
}
}

#[test]
pub fn test_store() -> Result<(), StorageError<NodeId>> {
pub fn test_store() -> Result<(), StorageError<TypeConfig>> {
Suite::test_all(Builder {})?;
Ok(())
}
36 changes: 18 additions & 18 deletions examples/memstore/src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C::NodeId>>
) -> Result<Vec<C::Entry>, StorageError<C>>
where
C::Entry: Clone,
{
let response = self.log.range(range.clone()).map(|(_, val)| val.clone()).collect::<Vec<_>>();
Ok(response)
}

async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C::NodeId>> {
async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C>> {
let last = self.log.iter().next_back().map(|(_, ent)| *ent.get_log_id());

let last_purged = self.last_purged_log_id;
Expand All @@ -75,25 +75,25 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
})
}

async fn save_committed(&mut self, committed: Option<LogId<C::NodeId>>) -> Result<(), StorageError<C::NodeId>> {
async fn save_committed(&mut self, committed: Option<LogId<C::NodeId>>) -> Result<(), StorageError<C>> {
self.committed = committed;
Ok(())
}

async fn read_committed(&mut self) -> Result<Option<LogId<C::NodeId>>, StorageError<C::NodeId>> {
async fn read_committed(&mut self) -> Result<Option<LogId<C::NodeId>>, StorageError<C>> {
Ok(self.committed)
}

async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C>> {
self.vote = Some(*vote);
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>> {
async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C>> {
Ok(self.vote)
}

async fn append<I>(&mut self, entries: I, callback: LogFlushed<C>) -> Result<(), StorageError<C::NodeId>>
async fn append<I>(&mut self, entries: I, callback: LogFlushed<C>) -> Result<(), StorageError<C>>
where I: IntoIterator<Item = C::Entry> {
// Simple implementation that calls the flush-before-return `append_to_log`.
for entry in entries {
Expand All @@ -104,7 +104,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
Ok(())
}

async fn truncate(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
async fn truncate(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C>> {
let keys = self.log.range(log_id.index..).map(|(k, _v)| *k).collect::<Vec<_>>();
for key in keys {
self.log.remove(&key);
Expand All @@ -113,7 +113,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
Ok(())
}

async fn purge(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
async fn purge(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C>> {
{
let ld = &mut self.last_purged_log_id;
assert!(*ld <= Some(log_id));
Expand Down Expand Up @@ -152,12 +152,12 @@ mod impl_log_store {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C::NodeId>> {
) -> Result<Vec<C::Entry>, StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.try_get_log_entries(range).await
}

async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>> {
async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.read_vote().await
}
Expand All @@ -168,38 +168,38 @@ mod impl_log_store {
{
type LogReader = Self;

async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C::NodeId>> {
async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.get_log_state().await
}

async fn save_committed(&mut self, committed: Option<LogId<C::NodeId>>) -> Result<(), StorageError<C::NodeId>> {
async fn save_committed(&mut self, committed: Option<LogId<C::NodeId>>) -> Result<(), StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.save_committed(committed).await
}

async fn read_committed(&mut self) -> Result<Option<LogId<C::NodeId>>, StorageError<C::NodeId>> {
async fn read_committed(&mut self) -> Result<Option<LogId<C::NodeId>>, StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.read_committed().await
}

async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.save_vote(vote).await
}

async fn append<I>(&mut self, entries: I, callback: LogFlushed<C>) -> Result<(), StorageError<C::NodeId>>
async fn append<I>(&mut self, entries: I, callback: LogFlushed<C>) -> Result<(), StorageError<C>>
where I: IntoIterator<Item = C::Entry> {
let mut inner = self.inner.lock().await;
inner.append(entries, callback).await
}

async fn truncate(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
async fn truncate(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.truncate(log_id).await
}

async fn purge(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
async fn purge(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.purge(log_id).await
}
Expand Down
12 changes: 6 additions & 6 deletions examples/raft-kv-memstore-network-v2/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub struct StateMachineStore {

impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self))]
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<NodeId>> {
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<TypeConfig>> {
let data;
let last_applied_log;
let last_membership;
Expand Down Expand Up @@ -131,13 +131,13 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {

async fn applied_state(
&mut self,
) -> Result<(Option<LogId<NodeId>>, StoredMembership<TypeConfig>), StorageError<NodeId>> {
) -> Result<(Option<LogId<NodeId>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
let state_machine = self.state_machine.lock().unwrap();
Ok((state_machine.last_applied, state_machine.last_membership.clone()))
}

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn apply<I>(&mut self, entries: I) -> Result<Vec<Response>, StorageError<NodeId>>
async fn apply<I>(&mut self, entries: I) -> Result<Vec<Response>, StorageError<TypeConfig>>
where I: IntoIterator<Item = Entry<TypeConfig>> {
let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator

Expand Down Expand Up @@ -168,7 +168,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn begin_receiving_snapshot(&mut self) -> Result<Box<SnapshotDataOf<TypeConfig>>, StorageError<NodeId>> {
async fn begin_receiving_snapshot(&mut self) -> Result<Box<SnapshotDataOf<TypeConfig>>, StorageError<TypeConfig>> {
Ok(Box::default())
}

Expand All @@ -177,7 +177,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
&mut self,
meta: &SnapshotMeta<TypeConfig>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
) -> Result<(), StorageError<TypeConfig>> {
tracing::info!("install snapshot");

let new_snapshot = StoredSnapshot {
Expand All @@ -199,7 +199,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, StorageError<NodeId>> {
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, StorageError<TypeConfig>> {
match &*self.current_snapshot.lock().unwrap() {
Some(snapshot) => {
let data = snapshot.data.clone();
Expand Down
12 changes: 6 additions & 6 deletions examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl StateMachineStore {

impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self))]
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<NodeId>> {
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<TypeConfig>> {
let data;
let last_applied_log;
let last_membership;
Expand Down Expand Up @@ -152,13 +152,13 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {

async fn applied_state(
&mut self,
) -> Result<(Option<LogId<NodeId>>, StoredMembership<TypeConfig>), StorageError<NodeId>> {
) -> Result<(Option<LogId<NodeId>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
let state_machine = self.state_machine.lock().unwrap();
Ok((state_machine.last_applied, state_machine.last_membership.clone()))
}

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn apply<I>(&mut self, entries: I) -> Result<Vec<Response>, StorageError<NodeId>>
async fn apply<I>(&mut self, entries: I) -> Result<Vec<Response>, StorageError<TypeConfig>>
where I: IntoIterator<Item = Entry<TypeConfig>> {
let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator

Expand Down Expand Up @@ -189,7 +189,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn begin_receiving_snapshot(&mut self) -> Result<Box<SnapshotDataOf<TypeConfig>>, StorageError<NodeId>> {
async fn begin_receiving_snapshot(&mut self) -> Result<Box<SnapshotDataOf<TypeConfig>>, StorageError<TypeConfig>> {
Ok(Box::default())
}

Expand All @@ -198,7 +198,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
&mut self,
meta: &SnapshotMeta<TypeConfig>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
) -> Result<(), StorageError<TypeConfig>> {
tracing::info!("install snapshot");

let new_snapshot = StoredSnapshot {
Expand All @@ -221,7 +221,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, StorageError<NodeId>> {
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, StorageError<TypeConfig>> {
match &*self.current_snapshot.lock().unwrap() {
Some(snapshot) => {
let data = snapshot.data.clone();
Expand Down
Loading

0 comments on commit 9dbcd4c

Please sign in to comment.