Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change: move read_vote function from RaftLogStorage to RaftLogReader #1080

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ impl RaftLogReader<TypeConfig> for Arc<LogStore> {

Ok(entries)
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
Ok(self.vote.read().await.clone())
}
}

impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
Expand Down Expand Up @@ -204,10 +208,6 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
Ok(self.vote.read().await.clone())
}

#[tracing::instrument(level = "debug", skip(self))]
async fn truncate(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<NodeId>> {
let mut log = self.log.write().await;
Expand Down
10 changes: 5 additions & 5 deletions examples/memstore/src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ mod impl_log_store {
let mut inner = self.inner.lock().await;
inner.try_get_log_entries(range).await
}

async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>> {
let mut inner = self.inner.lock().await;
inner.read_vote().await
}
}

impl<C: RaftTypeConfig> RaftLogStorage<C> for LogStore<C>
Expand Down Expand Up @@ -183,11 +188,6 @@ mod impl_log_store {
inner.save_vote(vote).await
}

async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>> {
let mut inner = self.inner.lock().await;
inner.read_vote().await
}

async fn append<I>(&mut self, entries: I, callback: LogFlushed<C>) -> Result<(), StorageError<C::NodeId>>
where I: IntoIterator<Item = C::Entry> {
let mut inner = self.inner.lock().await;
Expand Down
8 changes: 4 additions & 4 deletions examples/raft-kv-memstore-singlethreaded/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ impl RaftLogReader<TypeConfig> for Rc<LogStore> {
let response = log.range(range.clone()).map(|(_, val)| val.clone()).collect::<Vec<_>>();
Ok(response)
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
Ok(*self.vote.borrow())
}
}

impl RaftSnapshotBuilder<TypeConfig> for Rc<StateMachineStore> {
Expand Down Expand Up @@ -313,10 +317,6 @@ impl RaftLogStorage<TypeConfig> for Rc<LogStore> {
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
Ok(*self.vote.borrow())
}

#[tracing::instrument(level = "trace", skip(self, entries, callback))]
async fn append<I>(&mut self, entries: I, callback: LogFlushed<TypeConfig>) -> Result<(), StorageError<NodeId>>
where I: IntoIterator<Item = Entry<TypeConfig>> {
Expand Down
8 changes: 4 additions & 4 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ impl RaftLogReader<TypeConfig> for LogStore {
.map(|x| x.1)
.collect()
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
self.get_vote_()
}
}

impl RaftLogStorage<TypeConfig> for LogStore {
Expand Down Expand Up @@ -430,10 +434,6 @@ impl RaftLogStorage<TypeConfig> for LogStore {
self.set_vote_(vote)
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
self.get_vote_()
}

#[tracing::instrument(level = "trace", skip_all)]
async fn append<I>(&mut self, entries: I, callback: LogFlushed<TypeConfig>) -> StorageResult<()>
where
Expand Down
5 changes: 3 additions & 2 deletions openraft/src/docs/getting_started/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ Follow the link to method document to see the details.
| Write log: | [`truncate()`] | () | delete logs `[index, +oo)` |
| Write log: | [`purge()`] | () | purge logs `(-oo, index]` |
| Vote: | [`save_vote()`] | () | save vote |
| Vote: | [`read_vote()`] | [`Vote`] | read vote |

| Kind | [`RaftStateMachine`] method | Return value | Description |
|------------|--------------------------------|------------------------------|---------------------------------------|
Expand All @@ -122,10 +121,12 @@ Most of the APIs are quite straightforward, except two indirect APIs:

[`RaftLogReader`] defines the APIs to read logs, and is an also super trait of [`RaftLogStorage`] :
- [`try_get_log_entries()`] get log entries in a range;
- [`read_vote()`] read vote;

```ignore
trait RaftLogReader<C: RaftTypeConfig> {
async fn try_get_log_entries<RB: RangeBounds<u64>>(&mut self, range: RB) -> Result<Vec<C::Entry>, ...>;
async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, ...>>;
}
```

Expand Down Expand Up @@ -372,6 +373,7 @@ Additionally, two test scripts for setting up a cluster are available:

[`RaftLogReader`]: `crate::storage::RaftLogReader`
[`try_get_log_entries()`]: `crate::storage::RaftLogReader::try_get_log_entries`
[`read_vote()`]: `crate::storage::RaftLogReader::read_vote`


[`RaftLogStorage::SnapshotBuilder`]: `crate::storage::RaftLogStorage::SnapshotBuilder`
Expand All @@ -382,7 +384,6 @@ Additionally, two test scripts for setting up a cluster are available:
[`truncate()`]: `crate::storage::RaftLogStorage::truncate`
[`purge()`]: `crate::storage::RaftLogStorage::purge`
[`save_vote()`]: `crate::storage::RaftLogStorage::save_vote`
[`read_vote()`]: `crate::storage::RaftLogStorage::read_vote`
[`get_log_state()`]: `crate::storage::RaftLogStorage::get_log_state`
[`get_log_reader()`]: `crate::storage::RaftLogStorage::get_log_reader`

Expand Down
4 changes: 4 additions & 0 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::OptionalSync;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::StoredMembership;
use crate::Vote;

#[derive(Debug, Clone, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
Expand Down Expand Up @@ -149,6 +150,9 @@ where C: RaftTypeConfig
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C::NodeId>>;

/// Return the last saved vote by [`RaftLogStorage::save_vote`].
async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>>;
}

/// A trait defining the interface for a Raft state machine snapshot subsystem.
Expand Down
3 changes: 0 additions & 3 deletions openraft/src/storage/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ where C: RaftTypeConfig
/// The vote must be persisted on disk before returning.
async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C::NodeId>>;

/// Return the last saved vote by [`Self::save_vote`].
async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>>;

/// Saves the last committed log id to storage.
///
/// # Optional feature
Expand Down
8 changes: 4 additions & 4 deletions stores/memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ impl RaftLogReader<TypeConfig> for Arc<MemLogStore> {

Ok(entries)
}

async fn read_vote(&mut self) -> Result<Option<Vote<MemNodeId>>, StorageError<MemNodeId>> {
Ok(*self.vote.read().await)
}
}

impl RaftSnapshotBuilder<TypeConfig> for Arc<MemStateMachine> {
Expand Down Expand Up @@ -352,10 +356,6 @@ impl RaftLogStorage<TypeConfig> for Arc<MemLogStore> {
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote<MemNodeId>>, StorageError<MemNodeId>> {
Ok(*self.vote.read().await)
}

async fn save_committed(&mut self, committed: Option<LogId<MemNodeId>>) -> Result<(), StorageError<MemNodeId>> {
tracing::debug!(?committed, "save_committed");
let mut c = self.committed.write().await;
Expand Down
8 changes: 4 additions & 4 deletions stores/rocksstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ impl RaftLogReader<TypeConfig> for RocksLogStore {
}
Ok(res)
}

async fn read_vote(&mut self) -> Result<Option<Vote<RocksNodeId>>, StorageError<RocksNodeId>> {
self.get_meta::<meta::Vote>()
}
}

impl RaftSnapshotBuilder<TypeConfig> for RocksStateMachine {
Expand Down Expand Up @@ -346,10 +350,6 @@ impl RaftLogStorage<TypeConfig> for RocksLogStore {
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote<RocksNodeId>>, StorageError<RocksNodeId>> {
self.get_meta::<meta::Vote>()
}

async fn get_log_reader(&mut self) -> Self::LogReader {
self.clone()
}
Expand Down
8 changes: 4 additions & 4 deletions stores/sledstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ impl RaftLogReader<TypeConfig> for Arc<SledStore> {
.collect();
logs
}

async fn read_vote(&mut self) -> Result<Option<Vote<ExampleNodeId>>, StorageError<ExampleNodeId>> {
self.get_vote_()
}
}

impl RaftSnapshotBuilder<TypeConfig> for Arc<SledStore> {
Expand Down Expand Up @@ -503,10 +507,6 @@ impl RaftLogStorage<TypeConfig> for Arc<SledStore> {
self.set_vote_(vote).await
}

async fn read_vote(&mut self) -> Result<Option<Vote<ExampleNodeId>>, StorageError<ExampleNodeId>> {
self.get_vote_()
}

async fn get_log_reader(&mut self) -> Self::LogReader {
self.clone()
}
Expand Down
1 change: 1 addition & 0 deletions tests/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use openraft::MessageSummary;
use openraft::RPCTypes;
use openraft::Raft;
use openraft::RaftLogId;
use openraft::RaftLogReader;
use openraft::RaftMetrics;
use openraft::RaftState;
use openraft::RaftTypeConfig;
Expand Down
1 change: 1 addition & 0 deletions tests/tests/life_cycle/t50_single_follower_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;
use maplit::btreeset;
use openraft::storage::RaftLogStorage;
use openraft::Config;
use openraft::RaftLogReader;
use openraft::ServerState;
use openraft::Vote;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::Membership;
use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
use openraft::ServerState;
use openraft::SnapshotPolicy;
Expand Down
Loading