Skip to content

Commit

Permalink
Change: move read_vote function from RaftLogStorage to RaftLogReader
Browse files Browse the repository at this point in the history
  • Loading branch information
guojidan committed Mar 25, 2024
1 parent 0ccf70d commit 54acd77
Show file tree
Hide file tree
Showing 13 changed files with 39 additions and 34 deletions.
8 changes: 4 additions & 4 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ impl RaftLogReader<TypeConfig> for Arc<LogStore> {

Ok(entries)
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
Ok(self.vote.read().await.clone())
}
}

impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
Expand Down Expand Up @@ -204,10 +208,6 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
Ok(self.vote.read().await.clone())
}

#[tracing::instrument(level = "debug", skip(self))]
async fn truncate(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<NodeId>> {
let mut log = self.log.write().await;
Expand Down
10 changes: 5 additions & 5 deletions examples/memstore/src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ mod impl_log_store {
let mut inner = self.inner.lock().await;
inner.try_get_log_entries(range).await
}

async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>> {
let mut inner = self.inner.lock().await;
inner.read_vote().await
}
}

impl<C: RaftTypeConfig> RaftLogStorage<C> for LogStore<C>
Expand Down Expand Up @@ -183,11 +188,6 @@ mod impl_log_store {
inner.save_vote(vote).await
}

async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>> {
let mut inner = self.inner.lock().await;
inner.read_vote().await
}

async fn append<I>(&mut self, entries: I, callback: LogFlushed<C>) -> Result<(), StorageError<C::NodeId>>
where I: IntoIterator<Item = C::Entry> {
let mut inner = self.inner.lock().await;
Expand Down
8 changes: 4 additions & 4 deletions examples/raft-kv-memstore-singlethreaded/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ impl RaftLogReader<TypeConfig> for Rc<LogStore> {
let response = log.range(range.clone()).map(|(_, val)| val.clone()).collect::<Vec<_>>();
Ok(response)
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
Ok(*self.vote.borrow())
}
}

impl RaftSnapshotBuilder<TypeConfig> for Rc<StateMachineStore> {
Expand Down Expand Up @@ -313,10 +317,6 @@ impl RaftLogStorage<TypeConfig> for Rc<LogStore> {
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
Ok(*self.vote.borrow())
}

#[tracing::instrument(level = "trace", skip(self, entries, callback))]
async fn append<I>(&mut self, entries: I, callback: LogFlushed<TypeConfig>) -> Result<(), StorageError<NodeId>>
where I: IntoIterator<Item = Entry<TypeConfig>> {
Expand Down
8 changes: 4 additions & 4 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ impl RaftLogReader<TypeConfig> for LogStore {
.map(|x| x.1)
.collect()
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
self.get_vote_()
}
}

impl RaftLogStorage<TypeConfig> for LogStore {
Expand Down Expand Up @@ -430,10 +434,6 @@ impl RaftLogStorage<TypeConfig> for LogStore {
self.set_vote_(vote)
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
self.get_vote_()
}

#[tracing::instrument(level = "trace", skip_all)]
async fn append<I>(&mut self, entries: I, callback: LogFlushed<TypeConfig>) -> StorageResult<()>
where
Expand Down
5 changes: 3 additions & 2 deletions openraft/src/docs/getting_started/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ Follow the link to method document to see the details.
| Write log: | [`truncate()`] | () | delete logs `[index, +oo)` |
| Write log: | [`purge()`] | () | purge logs `(-oo, index]` |
| Vote: | [`save_vote()`] | () | save vote |
| Vote: | [`read_vote()`] | [`Vote`] | read vote |

| Kind | [`RaftStateMachine`] method | Return value | Description |
|------------|--------------------------------|------------------------------|---------------------------------------|
Expand All @@ -122,10 +121,12 @@ Most of the APIs are quite straightforward, except two indirect APIs:
[`RaftLogReader`] defines the APIs to read logs, and is an also super trait of [`RaftLogStorage`] :
- [`try_get_log_entries()`] get log entries in a range;
- [`read_vote()`] read vote;
```ignore
trait RaftLogReader<C: RaftTypeConfig> {
async fn try_get_log_entries<RB: RangeBounds<u64>>(&mut self, range: RB) -> Result<Vec<C::Entry>, ...>;
async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, ...>>;
}
```
Expand Down Expand Up @@ -372,6 +373,7 @@ Additionally, two test scripts for setting up a cluster are available:

[`RaftLogReader`]: `crate::storage::RaftLogReader`
[`try_get_log_entries()`]: `crate::storage::RaftLogReader::try_get_log_entries`
[`read_vote()`]: `crate::storage::RaftLogReader::read_vote`


[`RaftLogStorage::SnapshotBuilder`]: `crate::storage::RaftLogStorage::SnapshotBuilder`
Expand All @@ -382,7 +384,6 @@ Additionally, two test scripts for setting up a cluster are available:
[`truncate()`]: `crate::storage::RaftLogStorage::truncate`
[`purge()`]: `crate::storage::RaftLogStorage::purge`
[`save_vote()`]: `crate::storage::RaftLogStorage::save_vote`
[`read_vote()`]: `crate::storage::RaftLogStorage::read_vote`
[`get_log_state()`]: `crate::storage::RaftLogStorage::get_log_state`
[`get_log_reader()`]: `crate::storage::RaftLogStorage::get_log_reader`

Expand Down
4 changes: 4 additions & 0 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::OptionalSync;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::StoredMembership;
use crate::Vote;

#[derive(Debug, Clone, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
Expand Down Expand Up @@ -149,6 +150,9 @@ where C: RaftTypeConfig
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C::NodeId>>;

/// Return the last saved vote by [`RaftLogStorage::save_vote`].
async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>>;
}

/// A trait defining the interface for a Raft state machine snapshot subsystem.
Expand Down
3 changes: 0 additions & 3 deletions openraft/src/storage/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ where C: RaftTypeConfig
/// The vote must be persisted on disk before returning.
async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C::NodeId>>;

/// Return the last saved vote by [`Self::save_vote`].
async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>>;

/// Saves the last committed log id to storage.
///
/// # Optional feature
Expand Down
8 changes: 4 additions & 4 deletions stores/memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ impl RaftLogReader<TypeConfig> for Arc<MemLogStore> {

Ok(entries)
}

async fn read_vote(&mut self) -> Result<Option<Vote<MemNodeId>>, StorageError<MemNodeId>> {
Ok(*self.vote.read().await)
}
}

impl RaftSnapshotBuilder<TypeConfig> for Arc<MemStateMachine> {
Expand Down Expand Up @@ -352,10 +356,6 @@ impl RaftLogStorage<TypeConfig> for Arc<MemLogStore> {
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote<MemNodeId>>, StorageError<MemNodeId>> {
Ok(*self.vote.read().await)
}

async fn save_committed(&mut self, committed: Option<LogId<MemNodeId>>) -> Result<(), StorageError<MemNodeId>> {
tracing::debug!(?committed, "save_committed");
let mut c = self.committed.write().await;
Expand Down
8 changes: 4 additions & 4 deletions stores/rocksstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ impl RaftLogReader<TypeConfig> for RocksLogStore {
}
Ok(res)
}

async fn read_vote(&mut self) -> Result<Option<Vote<RocksNodeId>>, StorageError<RocksNodeId>> {
self.get_meta::<meta::Vote>()
}
}

impl RaftSnapshotBuilder<TypeConfig> for RocksStateMachine {
Expand Down Expand Up @@ -346,10 +350,6 @@ impl RaftLogStorage<TypeConfig> for RocksLogStore {
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote<RocksNodeId>>, StorageError<RocksNodeId>> {
self.get_meta::<meta::Vote>()
}

async fn get_log_reader(&mut self) -> Self::LogReader {
self.clone()
}
Expand Down
8 changes: 4 additions & 4 deletions stores/sledstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ impl RaftLogReader<TypeConfig> for Arc<SledStore> {
.collect();
logs
}

async fn read_vote(&mut self) -> Result<Option<Vote<ExampleNodeId>>, StorageError<ExampleNodeId>> {
self.get_vote_()
}
}

impl RaftSnapshotBuilder<TypeConfig> for Arc<SledStore> {
Expand Down Expand Up @@ -503,10 +507,6 @@ impl RaftLogStorage<TypeConfig> for Arc<SledStore> {
self.set_vote_(vote).await
}

async fn read_vote(&mut self) -> Result<Option<Vote<ExampleNodeId>>, StorageError<ExampleNodeId>> {
self.get_vote_()
}

async fn get_log_reader(&mut self) -> Self::LogReader {
self.clone()
}
Expand Down
1 change: 1 addition & 0 deletions tests/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use openraft::MessageSummary;
use openraft::RPCTypes;
use openraft::Raft;
use openraft::RaftLogId;
use openraft::RaftLogReader;
use openraft::RaftMetrics;
use openraft::RaftState;
use openraft::RaftTypeConfig;
Expand Down
1 change: 1 addition & 0 deletions tests/tests/life_cycle/t50_single_follower_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;
use maplit::btreeset;
use openraft::storage::RaftLogStorage;
use openraft::Config;
use openraft::RaftLogReader;
use openraft::ServerState;
use openraft::Vote;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::Membership;
use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
use openraft::ServerState;
use openraft::SnapshotPolicy;
Expand Down

0 comments on commit 54acd77

Please sign in to comment.