diff --git a/examples/raft-kv-memstore/Cargo.toml b/examples/raft-kv-memstore/Cargo.toml index 260808749..4bb1a8d55 100644 --- a/examples/raft-kv-memstore/Cargo.toml +++ b/examples/raft-kv-memstore/Cargo.toml @@ -20,7 +20,7 @@ name = "raft-key-value" path = "src/bin/main.rs" [dependencies] -openraft = { path = "../../openraft", features = ["serde"] } +openraft = { path = "../../openraft", features = ["serde", "storage-v2"] } actix-web = "4.0.0-rc.2" async-trait = "0.1.36" diff --git a/examples/raft-kv-memstore/src/app.rs b/examples/raft-kv-memstore/src/app.rs index 87db9887d..0555f6076 100644 --- a/examples/raft-kv-memstore/src/app.rs +++ b/examples/raft-kv-memstore/src/app.rs @@ -1,8 +1,9 @@ use std::sync::Arc; +use crate::LogStore; use crate::NodeId; use crate::Raft; -use crate::Store; +use crate::StateMachineStore; // Representation of an application state. This struct can be shared around to share // instances of raft, store and more. @@ -10,6 +11,7 @@ pub struct App { pub id: NodeId, pub addr: String, pub raft: Raft, - pub store: Arc, + pub log_store: Arc, + pub state_machine_store: Arc, pub config: Arc, } diff --git a/examples/raft-kv-memstore/src/lib.rs b/examples/raft-kv-memstore/src/lib.rs index 8f4381577..0228caa15 100644 --- a/examples/raft-kv-memstore/src/lib.rs +++ b/examples/raft-kv-memstore/src/lib.rs @@ -8,7 +8,6 @@ use actix_web::middleware; use actix_web::middleware::Logger; use actix_web::web::Data; use actix_web::HttpServer; -use openraft::storage::Adaptor; use openraft::BasicNode; use openraft::Config; use openraft::TokioRuntime; @@ -20,7 +19,6 @@ use crate::network::raft; use crate::network::Network; use crate::store::Request; use crate::store::Response; -use crate::store::Store; pub mod app; pub mod client; @@ -35,8 +33,8 @@ openraft::declare_raft_types!( Entry = openraft::Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime ); -pub type LogStore = Adaptor>; -pub type StateMachineStore = Adaptor>; +pub type LogStore = crate::store::LogStore; +pub type StateMachineStore = crate::store::StateMachineStore; pub type Raft = openraft::Raft; pub mod typ { @@ -67,25 +65,34 @@ pub async fn start_example_raft_node(node_id: NodeId, http_addr: String) -> std: let config = Arc::new(config.validate().unwrap()); + // Create a instance of where the Raft logs will be stored. + let log_store = Arc::new(LogStore::default()); // Create a instance of where the Raft data will be stored. - let store = Arc::new(Store::default()); - - let (log_store, state_machine) = Adaptor::new(store.clone()); + let state_machine_store = Arc::new(StateMachineStore::default()); // Create the network layer that will connect and communicate the raft instances and // will be used in conjunction with the store created above. let network = Network {}; // Create a local raft instance. - let raft = openraft::Raft::new(node_id, config.clone(), network, log_store, state_machine).await.unwrap(); + let raft = openraft::Raft::new( + node_id, + config.clone(), + network, + log_store.clone(), + state_machine_store.clone(), + ) + .await + .unwrap(); // Create an application that will store all the instances created above, this will - // be later used on the actix-web services. + // later be used on the actix-web services. let app_data = Data::new(App { id: node_id, addr: http_addr.clone(), raft, - store, + log_store, + state_machine_store, config, }); diff --git a/examples/raft-kv-memstore/src/network/api.rs b/examples/raft-kv-memstore/src/network/api.rs index ebb55deba..0cf9a0204 100644 --- a/examples/raft-kv-memstore/src/network/api.rs +++ b/examples/raft-kv-memstore/src/network/api.rs @@ -29,7 +29,7 @@ pub async fn write(app: Data, req: Json) -> actix_web::Result, req: Json) -> actix_web::Result { - let state_machine = app.store.state_machine.read().await; + let state_machine = app.state_machine_store.state_machine.read().await; let key = req.0; let value = state_machine.data.get(&key).cloned(); @@ -43,7 +43,7 @@ pub async fn consistent_read(app: Data, req: Json) -> actix_web::Re match ret { Ok(_) => { - let state_machine = app.store.state_machine.read().await; + let state_machine = app.state_machine_store.state_machine.read().await; let key = req.0; let value = state_machine.data.get(&key).cloned(); diff --git a/examples/raft-kv-memstore/src/store/mod.rs b/examples/raft-kv-memstore/src/store/mod.rs index c984eef95..289c3a13c 100644 --- a/examples/raft-kv-memstore/src/store/mod.rs +++ b/examples/raft-kv-memstore/src/store/mod.rs @@ -6,7 +6,10 @@ use std::sync::Arc; use std::sync::Mutex; use openraft::async_trait::async_trait; +use openraft::storage::LogFlushed; use openraft::storage::LogState; +use openraft::storage::RaftLogStorage; +use openraft::storage::RaftStateMachine; use openraft::storage::Snapshot; use openraft::BasicNode; use openraft::Entry; @@ -14,7 +17,6 @@ use openraft::EntryPayload; use openraft::LogId; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; -use openraft::RaftStorage; use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; @@ -60,14 +62,11 @@ pub struct StoredSnapshot { pub data: Vec, } -/** - * Here defines a state machine of the raft, this state represents a copy of the data - * between each node. Note that we are using `serde` to serialize the `data`, which has - * a implementation to be serialized. Note that for this test we set both the key and - * value as String, but you could set any type of value that has the serialization impl. - */ +/// Data contained in the Raft state machine. Note that we are using `serde` to serialize the +/// `data`, which has a implementation to be serialized. Note that for this test we set both the key +/// and value as String, but you could set any type of value that has the serialization impl. #[derive(Serialize, Deserialize, Debug, Default, Clone)] -pub struct StateMachine { +pub struct StateMachineData { pub last_applied_log: Option>, pub last_membership: StoredMembership, @@ -76,26 +75,32 @@ pub struct StateMachine { pub data: BTreeMap, } +/// Defines a state machine for the Raft cluster. This state machine represents a copy of the +/// data for this node. Additionally, it is responsible for storing the last snapshot of the data. #[derive(Debug, Default)] -pub struct Store { +pub struct StateMachineStore { + /// The Raft state machine. + pub state_machine: RwLock, + + snapshot_idx: Arc>, + + /// The last received snapshot. + current_snapshot: RwLock>, +} + +#[derive(Debug, Default)] +pub struct LogStore { last_purged_log_id: RwLock>>, /// The Raft log. log: RwLock>>, - /// The Raft state machine. - pub state_machine: RwLock, - /// The current granted vote. vote: RwLock>>, - - snapshot_idx: Arc>, - - current_snapshot: RwLock>, } #[async_trait] -impl RaftLogReader for Arc { +impl RaftLogReader for Arc { async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, @@ -107,7 +112,7 @@ impl RaftLogReader for Arc { } #[async_trait] -impl RaftSnapshotBuilder for Arc { +impl RaftSnapshotBuilder for Arc { #[tracing::instrument(level = "trace", skip(self))] async fn build_snapshot(&mut self) -> Result, StorageError> { let data; @@ -159,84 +164,10 @@ impl RaftSnapshotBuilder for Arc { } #[async_trait] -impl RaftStorage for Arc { - type LogReader = Self; +impl RaftStateMachine for Arc { type SnapshotBuilder = Self; - async fn get_log_state(&mut self) -> Result, StorageError> { - let log = self.log.read().await; - let last = log.iter().next_back().map(|(_, ent)| ent.log_id); - - let last_purged = *self.last_purged_log_id.read().await; - - let last = match last { - None => last_purged, - Some(x) => Some(x), - }; - - Ok(LogState { - last_purged_log_id: last_purged, - last_log_id: last, - }) - } - - #[tracing::instrument(level = "trace", skip(self))] - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { - let mut v = self.vote.write().await; - *v = Some(*vote); - Ok(()) - } - - async fn read_vote(&mut self) -> Result>, StorageError> { - Ok(*self.vote.read().await) - } - - #[tracing::instrument(level = "trace", skip(self, entries))] - async fn append_to_log(&mut self, entries: I) -> Result<(), StorageError> - where I: IntoIterator> + Send { - let mut log = self.log.write().await; - for entry in entries { - log.insert(entry.log_id.index, entry); - } - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn delete_conflict_logs_since(&mut self, log_id: LogId) -> Result<(), StorageError> { - tracing::debug!("delete_log: [{:?}, +oo)", log_id); - - let mut log = self.log.write().await; - let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); - for key in keys { - log.remove(&key); - } - - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn purge_logs_upto(&mut self, log_id: LogId) -> Result<(), StorageError> { - tracing::debug!("delete_log: (-oo, {:?}]", log_id); - - { - let mut ld = self.last_purged_log_id.write().await; - assert!(*ld <= Some(log_id)); - *ld = Some(log_id); - } - - { - let mut log = self.log.write().await; - - let keys = log.range(..=log_id.index).map(|(k, _v)| *k).collect::>(); - for key in keys { - log.remove(&key); - } - } - - Ok(()) - } - - async fn last_applied_state( + async fn applied_state( &mut self, ) -> Result<(Option>, StoredMembership), StorageError> { let state_machine = self.state_machine.read().await; @@ -244,11 +175,9 @@ impl RaftStorage for Arc { } #[tracing::instrument(level = "trace", skip(self, entries))] - async fn apply_to_state_machine( - &mut self, - entries: &[Entry], - ) -> Result, StorageError> { - let mut res = Vec::with_capacity(entries.len()); + async fn apply(&mut self, entries: I) -> Result, StorageError> + where I: IntoIterator> + Send { + let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator let mut sm = self.state_machine.write().await; @@ -301,7 +230,7 @@ impl RaftStorage for Arc { // Update the state machine. { - let updated_state_machine: StateMachine = serde_json::from_slice(&new_snapshot.data) + let updated_state_machine: StateMachineData = serde_json::from_slice(&new_snapshot.data) .map_err(|e| StorageIOError::read_snapshot(Some(new_snapshot.meta.signature()), &e))?; let mut state_machine = self.state_machine.write().await; *state_machine = updated_state_machine; @@ -327,11 +256,92 @@ impl RaftStorage for Arc { } } - async fn get_log_reader(&mut self) -> Self::LogReader { + async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { self.clone() } +} - async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { +#[async_trait] +impl RaftLogStorage for Arc { + type LogReader = Self; + + async fn get_log_state(&mut self) -> Result, StorageError> { + let log = self.log.read().await; + let last = log.iter().next_back().map(|(_, ent)| ent.log_id); + + let last_purged = *self.last_purged_log_id.read().await; + + let last = match last { + None => last_purged, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id: last_purged, + last_log_id: last, + }) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + let mut v = self.vote.write().await; + *v = Some(*vote); + Ok(()) + } + + async fn read_vote(&mut self) -> Result>, StorageError> { + Ok(*self.vote.read().await) + } + + #[tracing::instrument(level = "trace", skip(self, entries, callback))] + async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> + where I: IntoIterator> + Send { + // Simple implementation that calls the flush-before-return `append_to_log`. + let mut log = self.log.write().await; + for entry in entries { + log.insert(entry.log_id.index, entry); + } + callback.log_io_completed(Ok(())); + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: [{:?}, +oo)", log_id); + + let mut log = self.log.write().await; + let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: (-oo, {:?}]", log_id); + + { + let mut ld = self.last_purged_log_id.write().await; + assert!(*ld <= Some(log_id)); + *ld = Some(log_id); + } + + { + let mut log = self.log.write().await; + + let keys = log.range(..=log_id.index).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + } + + Ok(()) + } + + async fn get_log_reader(&mut self) -> Self::LogReader { self.clone() } } diff --git a/openraft/src/storage/v2.rs b/openraft/src/storage/v2.rs index 254411ef3..dff25fc37 100644 --- a/openraft/src/storage/v2.rs +++ b/openraft/src/storage/v2.rs @@ -107,7 +107,7 @@ where C: RaftTypeConfig /// It should returns immediately after saving the input log entries in memory, and calls the /// `callback` when the entries are persisted on disk, i.e., avoid blocking. /// - /// This method is still async because preparing preparing the IO is usually async. + /// This method is still async because preparing the IO is usually async. /// /// ### To ensure correctness: ///