diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 87dc5a8ae..21362beba 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -407,6 +407,7 @@ jobs: - "memstore" - "raft-kv-memstore" - "raft-kv-memstore-generic-snapshot-data" + - "raft-kv-memstore-opendal-snapshot-data" - "raft-kv-memstore-singlethreaded" - "raft-kv-rocksdb" diff --git a/Cargo.toml b/Cargo.toml index 45155ade1..9dbd6fb86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,5 +61,6 @@ exclude = [ "examples/raft-kv-memstore", "examples/raft-kv-memstore-singlethreaded", "examples/raft-kv-memstore-generic-snapshot-data", + "examples/raft-kv-memstore-opendal-snapshot-data", "examples/raft-kv-rocksdb", ] diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/.gitignore b/examples/raft-kv-memstore-opendal-snapshot-data/.gitignore new file mode 100644 index 000000000..cb4025390 --- /dev/null +++ b/examples/raft-kv-memstore-opendal-snapshot-data/.gitignore @@ -0,0 +1,5 @@ +target +vendor +.idea + +/*.log diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml b/examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml new file mode 100644 index 000000000..8683d818e --- /dev/null +++ b/examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "raft-kv-memstore-opendal-snapshot-data" +version = "0.1.0" +readme = "README.md" + +edition = "2021" +authors = [ + "drdr xp ", + "Pedro Paulo de Amorim ", + "Xuanwo " +] +categories = ["algorithms", "asynchronous", "data-structures"] +description = "An example distributed key-value store built upon `openraft`." +homepage = "https://github.com/datafuselabs/openraft" +keywords = ["raft", "consensus"] +license = "MIT OR Apache-2.0" +repository = "https://github.com/datafuselabs/openraft" + +[dependencies] +openraft = { path = "../../openraft", features = ["serde", "storage-v2", "generic-snapshot-data"] } + +serde = { version = "1.0.114", features = ["derive"] } +serde_json = "1.0.57" +tokio = { version = "1.0", default-features = false, features = ["sync"] } +tracing = "0.1.29" +tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } +opendal = "0.45.0" + +[dev-dependencies] +maplit = "1.0.2" + +[features] + +[package.metadata.docs.rs] +all-features = true diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/README.md b/examples/raft-kv-memstore-opendal-snapshot-data/README.md new file mode 100644 index 000000000..41fb2ce5a --- /dev/null +++ b/examples/raft-kv-memstore-opendal-snapshot-data/README.md @@ -0,0 +1,19 @@ +# Example Openraft kv-store with snapshot stored in remote storage + +With `generic-snapshot-data` feature flag enabled, Openraft allows application to use any data type for snapshot data, +instead of a single-file like data format with `AsyncSeek + AsyncRead + AsyncWrite + Unpin` bounds. + +This example shows how to save and retrieve snapshot data from remote storage, allowing users to follow a similar pattern for implementing business logic such as snapshot backups. + +This example is similar to the basic raft-kv-memstore example +but focuses on how to store and fetch snapshot data from remote storage. +Other aspects are minimized. + +To send a complete snapshot, Refer to implementation of `RaftNetwork::snapshot()` in this example. + +To receive a complete snapshot, Refer to implementation of `api::snapshot()` in this example. + + +## Run it + +Run it with `cargo test -- --nocaputre`. \ No newline at end of file diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs new file mode 100644 index 000000000..f3e610f6a --- /dev/null +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs @@ -0,0 +1,104 @@ +//! This mod implements a network API for raft node. + +use std::collections::BTreeMap; +use std::collections::BTreeSet; + +use openraft::error::CheckIsLeaderError; +use openraft::error::Infallible; +use openraft::error::RaftError; +use openraft::BasicNode; +use openraft::RaftMetrics; + +use crate::app::App; +use crate::decode; +use crate::encode; +use crate::typ; +use crate::NodeId; + +pub async fn write(app: &mut App, req: String) -> String { + let res = app.raft.client_write(decode(&req)).await; + encode(res) +} + +pub async fn read(app: &mut App, req: String) -> String { + let key: String = decode(&req); + + let ret = app.raft.ensure_linearizable().await; + + let res = match ret { + Ok(_) => { + let state_machine = app.state_machine.state_machine.lock().unwrap(); + let value = state_machine.data.get(&key).cloned(); + + let res: Result>> = + Ok(value.unwrap_or_default()); + res + } + Err(e) => Err(e), + }; + encode(res) +} + +// Raft API + +pub async fn vote(app: &mut App, req: String) -> String { + let res = app.raft.vote(decode(&req)).await; + encode(res) +} + +pub async fn append(app: &mut App, req: String) -> String { + let res = app.raft.append_entries(decode(&req)).await; + encode(res) +} + +/// Receive a snapshot and install it. +pub async fn snapshot(app: &mut App, req: String) -> String { + let (vote, snapshot_meta, snapshot_data): (typ::Vote, typ::SnapshotMeta, typ::SnapshotData) = decode(&req); + let snapshot = typ::Snapshot { + meta: snapshot_meta, + snapshot: Box::new(snapshot_data), + }; + let res = app + .raft + .install_complete_snapshot(vote, snapshot) + .await + .map_err(typ::RaftError::::Fatal); + encode(res) +} + +// Management API + +/// Add a node as **Learner**. +/// +/// A Learner receives log replication from the leader but does not vote. +/// This should be done before adding a node as a member into the cluster +/// (by calling `change-membership`) +pub async fn add_learner(app: &mut App, req: String) -> String { + let node_id: NodeId = decode(&req); + let node = BasicNode { addr: "".to_string() }; + let res = app.raft.add_learner(node_id, node, true).await; + encode(res) +} + +/// Changes specified learners to members, or remove members. +pub async fn change_membership(app: &mut App, req: String) -> String { + let node_ids: BTreeSet = decode(&req); + let res = app.raft.change_membership(node_ids, false).await; + encode(res) +} + +/// Initialize a single-node cluster. +pub async fn init(app: &mut App) -> String { + let mut nodes = BTreeMap::new(); + nodes.insert(app.id, BasicNode { addr: "".to_string() }); + let res = app.raft.initialize(nodes).await; + encode(res) +} + +/// Get the latest metrics of the cluster +pub async fn metrics(app: &mut App) -> String { + let metrics = app.raft.metrics().borrow().clone(); + + let res: Result, Infallible> = Ok(metrics); + encode(res) +} diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/app.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/app.rs new file mode 100644 index 000000000..5d5a05fe6 --- /dev/null +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/app.rs @@ -0,0 +1,73 @@ +use std::sync::Arc; + +use tokio::sync::mpsc; +use tokio::sync::oneshot; + +use crate::api; +use crate::router::Router; +use crate::typ; +use crate::NodeId; +use crate::StateMachineStore; + +pub type Path = String; +pub type Payload = String; +pub type ResponseTx = oneshot::Sender; +pub type RequestTx = mpsc::UnboundedSender<(Path, Payload, ResponseTx)>; + +/// Representation of an application state. +pub struct App { + pub id: NodeId, + pub raft: typ::Raft, + + /// Receive application requests, Raft protocol request or management requests. + pub rx: mpsc::UnboundedReceiver<(Path, Payload, ResponseTx)>, + pub router: Router, + + pub state_machine: Arc, +} + +impl App { + pub fn new(id: NodeId, raft: typ::Raft, router: Router, state_machine: Arc) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + + { + let mut targets = router.targets.lock().unwrap(); + targets.insert(id, tx); + } + + Self { + id, + raft, + rx, + router, + state_machine, + } + } + + pub async fn run(mut self) -> Option<()> { + loop { + let (path, payload, response_tx) = self.rx.recv().await?; + + let res = match path.as_str() { + // Application API + "/app/write" => api::write(&mut self, payload).await, + "/app/read" => api::read(&mut self, payload).await, + + // Raft API + "/raft/append" => api::append(&mut self, payload).await, + "/raft/snapshot" => api::snapshot(&mut self, payload).await, + "/raft/vote" => api::vote(&mut self, payload).await, + + // Management API + "/mng/add-learner" => api::add_learner(&mut self, payload).await, + "/mng/change-membership" => api::change_membership(&mut self, payload).await, + "/mng/init" => api::init(&mut self).await, + "/mng/metrics" => api::metrics(&mut self).await, + + _ => panic!("unknown path: {}", path), + }; + + response_tx.send(res).unwrap(); + } + } +} diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs new file mode 100644 index 000000000..069273af2 --- /dev/null +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs @@ -0,0 +1,106 @@ +#![allow(clippy::uninlined_format_args)] +#![deny(unused_qualifications)] + +use std::sync::Arc; + +use opendal::Operator; +use openraft::BasicNode; +use openraft::Config; +use openraft::TokioRuntime; + +use crate::app::App; +use crate::router::Router; +use crate::store::Request; +use crate::store::Response; + +pub mod router; + +pub mod api; +pub mod app; +pub mod network; +pub mod store; + +pub type NodeId = u64; + +openraft::declare_raft_types!( + /// Declare the type configuration for example K/V store. + pub TypeConfig: + D = Request, + R = Response, + NodeId = NodeId, + Node = BasicNode, + Entry = openraft::Entry, + // In this example, snapshot is a path pointing to a file stored in shared storage. + SnapshotData = String, + AsyncRuntime = TokioRuntime +); + +pub type LogStore = crate::store::LogStore; +pub type StateMachineStore = crate::store::StateMachineStore; + +pub mod typ { + use openraft::BasicNode; + + use crate::NodeId; + use crate::TypeConfig; + + pub type Raft = openraft::Raft; + + pub type Vote = openraft::Vote; + pub type SnapshotMeta = openraft::SnapshotMeta; + pub type SnapshotData = ::SnapshotData; + pub type Snapshot = openraft::Snapshot; + + pub type Infallible = openraft::error::Infallible; + pub type Fatal = openraft::error::Fatal; + pub type RaftError = openraft::error::RaftError; + pub type RPCError = openraft::error::RPCError>; + pub type StreamingError = openraft::error::StreamingError; + + pub type RaftMetrics = openraft::RaftMetrics; + + pub type ClientWriteError = openraft::error::ClientWriteError; + pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; + pub type ForwardToLeader = openraft::error::ForwardToLeader; + pub type InitializeError = openraft::error::InitializeError; + + pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; +} + +pub fn encode(t: T) -> String { + serde_json::to_string(&t).unwrap() +} + +pub fn decode(s: &str) -> T { + serde_json::from_str(s).unwrap() +} + +pub async fn new_raft(node_id: NodeId, router: Router, op: Operator) -> (typ::Raft, App) { + // Create a configuration for the raft instance. + let config = Config { + heartbeat_interval: 500, + election_timeout_min: 1500, + election_timeout_max: 3000, + // Once snapshot is built, delete the logs at once. + // So that all further replication will be based on the snapshot. + max_in_snapshot_log_to_keep: 0, + ..Default::default() + }; + + let config = Arc::new(config.validate().unwrap()); + + // Create a instance of where the Raft logs will be stored. + let log_store = Arc::new(LogStore::default()); + + // Create a instance of where the state machine data will be stored. + let state_machine_store = Arc::new(StateMachineStore::new(op.clone())); + + // Create a local raft instance. + let raft = openraft::Raft::new(node_id, config, router.clone(), log_store, state_machine_store.clone()) + .await + .unwrap(); + + let app = App::new(node_id, raft.clone(), router, state_machine_store); + + (raft, app) +} diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs new file mode 100644 index 000000000..7d1d59a02 --- /dev/null +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs @@ -0,0 +1,76 @@ +use std::future::Future; + +use openraft::error::RemoteError; +use openraft::error::ReplicationClosed; +use openraft::network::RPCOption; +use openraft::raft::AppendEntriesRequest; +use openraft::raft::AppendEntriesResponse; +use openraft::raft::SnapshotResponse; +use openraft::raft::VoteRequest; +use openraft::raft::VoteResponse; +use openraft::OptionalSend; +use openraft::RaftNetwork; +use openraft::RaftNetworkFactory; +use openraft::Snapshot; +use openraft::Vote; + +use crate::router::Router; +use crate::typ; +use crate::BasicNode; +use crate::NodeId; +use crate::TypeConfig; + +pub struct Connection { + router: Router, + target: NodeId, +} + +impl RaftNetworkFactory for Router { + type Network = Connection; + + async fn new_client(&mut self, target: NodeId, _node: &BasicNode) -> Self::Network { + Connection { + router: self.clone(), + target, + } + } +} + +impl RaftNetwork for Connection { + async fn send_append_entries( + &mut self, + req: AppendEntriesRequest, + ) -> Result, typ::RPCError> { + let resp = self + .router + .send(self.target, "/raft/append", req) + .await + .map_err(|e| RemoteError::new(self.target, e))?; + Ok(resp) + } + + /// A real application should replace this method with customized implementation. + async fn snapshot( + &mut self, + vote: Vote, + snapshot: Snapshot, + _cancel: impl Future + OptionalSend, + _option: RPCOption, + ) -> Result, typ::StreamingError> { + let resp = self + .router + .send::<_, _, typ::Infallible>(self.target, "/raft/snapshot", (vote, snapshot.meta, snapshot.snapshot)) + .await + .map_err(|e| RemoteError::new(self.target, e.into_fatal().unwrap()))?; + Ok(resp) + } + + async fn send_vote(&mut self, req: VoteRequest) -> Result, typ::RPCError> { + let resp = self + .router + .send(self.target, "/raft/vote", req) + .await + .map_err(|e| RemoteError::new(self.target, e))?; + Ok(resp) + } +} diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/router.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/router.rs new file mode 100644 index 000000000..c9bf0c54a --- /dev/null +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/router.rs @@ -0,0 +1,44 @@ +use std::collections::BTreeMap; +use std::sync::Arc; +use std::sync::Mutex; + +use tokio::sync::oneshot; + +use crate::app::RequestTx; +use crate::decode; +use crate::encode; +use crate::typ::RaftError; +use crate::NodeId; + +/// Simulate a network router. +#[derive(Debug, Clone)] +#[derive(Default)] +pub struct Router { + pub targets: Arc>>, +} + +impl Router { + /// Send request `Req` to target node `to`, and wait for response `Result>`. + pub async fn send(&self, to: NodeId, path: &str, req: Req) -> Result> + where + Req: serde::Serialize, + Result>: serde::de::DeserializeOwned, + { + let (resp_tx, resp_rx) = oneshot::channel(); + + let encoded_req = encode(req); + tracing::debug!("send to: {}, {}, {}", to, path, encoded_req); + + { + let mut targets = self.targets.lock().unwrap(); + let tx = targets.get_mut(&to).unwrap(); + + tx.send((path.to_string(), encoded_req, resp_tx)).unwrap(); + } + + let resp_str = resp_rx.await.unwrap(); + tracing::debug!("resp from: {}, {}, {}", to, path, resp_str); + + decode::>>(&resp_str) + } +} diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs new file mode 100644 index 000000000..e52b45fb2 --- /dev/null +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs @@ -0,0 +1,366 @@ +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::ops::RangeBounds; +use std::sync::Arc; +use std::sync::Mutex; + +use opendal::Operator; +use openraft::storage::LogFlushed; +use openraft::storage::LogState; +use openraft::storage::RaftLogStorage; +use openraft::storage::RaftStateMachine; +use openraft::storage::Snapshot; +use openraft::BasicNode; +use openraft::Entry; +use openraft::EntryPayload; +use openraft::LogId; +use openraft::RaftLogReader; +use openraft::RaftSnapshotBuilder; +use openraft::RaftTypeConfig; +use openraft::SnapshotMeta; +use openraft::StorageError; +use openraft::StoredMembership; +use openraft::Vote; +use serde::Deserialize; +use serde::Serialize; + +use crate::decode; +use crate::encode; +use crate::typ; +use crate::NodeId; +use crate::TypeConfig; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Request { + Set { key: String, value: String }, +} + +impl Request { + pub fn set(key: impl ToString, value: impl ToString) -> Self { + Self::Set { + key: key.to_string(), + value: value.to_string(), + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Response { + pub value: Option, +} + +#[derive(Debug)] +pub struct StoredSnapshot { + pub meta: SnapshotMeta, + + /// The data of the state machine at the time of this snapshot. + pub data: Box, +} + +/// Data contained in the Raft state machine. Note that we are using `serde` to serialize the +/// `data`, which has a implementation to be serialized. Note that for this test we set both the key +/// and value as String, but you could set any type of value that has the serialization impl. +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct StateMachineData { + pub last_applied: Option>, + + pub last_membership: StoredMembership, + + /// Application data. + pub data: BTreeMap, +} + +/// Defines a state machine for the Raft cluster. This state machine represents a copy of the +/// data for this node. Additionally, it is responsible for storing the last snapshot of the data. +#[derive(Debug)] +pub struct StateMachineStore { + /// The Raft state machine. + pub state_machine: Mutex, + + snapshot_idx: Mutex, + storage: Operator, + + /// The last received snapshot. + current_snapshot: Mutex>, +} + +impl StateMachineStore { + pub fn new(storage: Operator) -> Self { + Self { + state_machine: Mutex::new(StateMachineData::default()), + snapshot_idx: Mutex::new(0), + storage, + current_snapshot: Mutex::new(None), + } + } +} + +#[derive(Debug, Default)] +pub struct LogStore { + last_purged_log_id: Mutex>>, + + /// The Raft log. + log: Mutex>>, + + committed: Mutex>>, + + /// The current granted vote. + vote: Mutex>>, +} + +impl RaftLogReader for Arc { + async fn try_get_log_entries + Clone + Debug>( + &mut self, + range: RB, + ) -> Result>, StorageError> { + let log = self.log.lock().unwrap(); + let response = log.range(range.clone()).map(|(_, val)| val.clone()).collect::>(); + Ok(response) + } +} + +impl RaftSnapshotBuilder for Arc { + #[tracing::instrument(level = "trace", skip(self))] + async fn build_snapshot(&mut self) -> Result, StorageError> { + let data; + let last_applied_log; + let last_membership; + + { + // Serialize the data of the state machine. + let state_machine = self.state_machine.lock().unwrap().clone(); + + last_applied_log = state_machine.last_applied; + last_membership = state_machine.last_membership.clone(); + data = state_machine; + } + + let snapshot_idx = { + let mut l = self.snapshot_idx.lock().unwrap(); + *l += 1; + *l + }; + + let snapshot_id = if let Some(last) = last_applied_log { + format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx) + } else { + format!("--{}", snapshot_idx) + }; + + // Save the snapshot to the storage. + // + // In this example, we use `snapshot_id` as the snapshot store path. + // Users can design their own logic for this like using uuid. + self.storage.write(&snapshot_id, encode(&data)).await.unwrap(); + + let meta = SnapshotMeta { + last_log_id: last_applied_log, + last_membership, + snapshot_id: snapshot_id.clone(), + }; + + let snapshot = StoredSnapshot { + meta: meta.clone(), + data: Box::new(snapshot_id.clone()), + }; + + { + let mut current_snapshot = self.current_snapshot.lock().unwrap(); + *current_snapshot = Some(snapshot); + } + + Ok(Snapshot { + meta, + snapshot: Box::new(snapshot_id), + }) + } +} + +impl RaftStateMachine for Arc { + type SnapshotBuilder = Self; + + async fn applied_state( + &mut self, + ) -> Result<(Option>, StoredMembership), StorageError> { + let state_machine = self.state_machine.lock().unwrap(); + Ok((state_machine.last_applied, state_machine.last_membership.clone())) + } + + #[tracing::instrument(level = "trace", skip(self, entries))] + async fn apply(&mut self, entries: I) -> Result, StorageError> + where I: IntoIterator> { + let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator + + let mut sm = self.state_machine.lock().unwrap(); + + for entry in entries { + tracing::debug!(%entry.log_id, "replicate to sm"); + + sm.last_applied = Some(entry.log_id); + + match entry.payload { + EntryPayload::Blank => res.push(Response { value: None }), + EntryPayload::Normal(ref req) => match req { + Request::Set { key, value, .. } => { + sm.data.insert(key.clone(), value.clone()); + res.push(Response { + value: Some(value.clone()), + }) + } + }, + EntryPayload::Membership(ref mem) => { + sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone()); + res.push(Response { value: None }) + } + }; + } + Ok(res) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn begin_receiving_snapshot( + &mut self, + ) -> Result::SnapshotData>, StorageError> { + Ok(Box::default()) + } + + #[tracing::instrument(level = "trace", skip(self, snapshot))] + async fn install_snapshot( + &mut self, + meta: &SnapshotMeta, + snapshot: Box<::SnapshotData>, + ) -> Result<(), StorageError> { + tracing::info!("install snapshot"); + + let new_snapshot = StoredSnapshot { + meta: meta.clone(), + data: snapshot, + }; + + // Update the state machine. + { + let bs = self.storage.read(&new_snapshot.data).await.unwrap(); + let updated_state_machine: StateMachineData = decode(&String::from_utf8_lossy(&bs)); + let mut state_machine = self.state_machine.lock().unwrap(); + *state_machine = updated_state_machine; + } + + // Update current snapshot. + let mut current_snapshot = self.current_snapshot.lock().unwrap(); + *current_snapshot = Some(new_snapshot); + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { + match &*self.current_snapshot.lock().unwrap() { + Some(snapshot) => { + let data = snapshot.data.clone(); + Ok(Some(Snapshot { + meta: snapshot.meta.clone(), + snapshot: data, + })) + } + None => Ok(None), + } + } + + async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { + self.clone() + } +} + +impl RaftLogStorage for Arc { + type LogReader = Self; + + async fn get_log_state(&mut self) -> Result, StorageError> { + let log = self.log.lock().unwrap(); + let last = log.iter().next_back().map(|(_, ent)| ent.log_id); + + let last_purged = *self.last_purged_log_id.lock().unwrap(); + + let last = match last { + None => last_purged, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id: last_purged, + last_log_id: last, + }) + } + + async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { + let mut c = self.committed.lock().unwrap(); + *c = committed; + Ok(()) + } + + async fn read_committed(&mut self) -> Result>, StorageError> { + let committed = self.committed.lock().unwrap(); + Ok(*committed) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + let mut v = self.vote.lock().unwrap(); + *v = Some(*vote); + Ok(()) + } + + async fn read_vote(&mut self) -> Result>, StorageError> { + Ok(*self.vote.lock().unwrap()) + } + + #[tracing::instrument(level = "trace", skip(self, entries, callback))] + async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> + where I: IntoIterator> { + // Simple implementation that calls the flush-before-return `append_to_log`. + let mut log = self.log.lock().unwrap(); + for entry in entries { + log.insert(entry.log_id.index, entry); + } + callback.log_io_completed(Ok(())); + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: [{:?}, +oo)", log_id); + + let mut log = self.log.lock().unwrap(); + let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: (-oo, {:?}]", log_id); + + { + let mut ld = self.last_purged_log_id.lock().unwrap(); + assert!(*ld <= Some(log_id)); + *ld = Some(log_id); + } + + { + let mut log = self.log.lock().unwrap(); + + let keys = log.range(..=log_id.index).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + } + + Ok(()) + } + + async fn get_log_reader(&mut self) -> Self::LogReader { + self.clone() + } +} diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/test-cluster.sh b/examples/raft-kv-memstore-opendal-snapshot-data/test-cluster.sh new file mode 100755 index 000000000..9b582da4a --- /dev/null +++ b/examples/raft-kv-memstore-opendal-snapshot-data/test-cluster.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +echo "No shell test script for this example" \ No newline at end of file diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/tests/cluster/main.rs b/examples/raft-kv-memstore-opendal-snapshot-data/tests/cluster/main.rs new file mode 100644 index 000000000..5148911f9 --- /dev/null +++ b/examples/raft-kv-memstore-opendal-snapshot-data/tests/cluster/main.rs @@ -0,0 +1,3 @@ +#![allow(clippy::uninlined_format_args)] + +mod test_cluster; diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/tests/cluster/test_cluster.rs b/examples/raft-kv-memstore-opendal-snapshot-data/tests/cluster/test_cluster.rs new file mode 100644 index 000000000..c9d576ab6 --- /dev/null +++ b/examples/raft-kv-memstore-opendal-snapshot-data/tests/cluster/test_cluster.rs @@ -0,0 +1,142 @@ +use std::backtrace::Backtrace; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::panic::PanicInfo; +use std::time::Duration; + +use openraft::BasicNode; +use raft_kv_memstore_opendal_snapshot_data::new_raft; +use raft_kv_memstore_opendal_snapshot_data::router::Router; +use raft_kv_memstore_opendal_snapshot_data::store::Request; +use raft_kv_memstore_opendal_snapshot_data::typ; +use tokio::task; +use tokio::task::LocalSet; +use tracing_subscriber::EnvFilter; + +pub fn log_panic(panic: &PanicInfo) { + let backtrace = format!("{:?}", Backtrace::force_capture()); + + eprintln!("{}", panic); + + if let Some(location) = panic.location() { + tracing::error!( + message = %panic, + backtrace = %backtrace, + panic.file = location.file(), + panic.line = location.line(), + panic.column = location.column(), + ); + eprintln!("{}:{}:{}", location.file(), location.line(), location.column()); + } else { + tracing::error!(message = %panic, backtrace = %backtrace); + } + + eprintln!("{}", backtrace); +} + +/// This test shows how to transfer a snapshot from one node to another: +/// +/// - Setup a single node cluster, write some logs, take a snapshot; +/// - Add a learner node-2 to receive snapshot replication, via the complete-snapshot API: +/// - The sending end sends snapshot with `RaftNetwork::snapshot()`; +/// - The receiving end deliver the received snapshot to `Raft` with +/// `Raft::install_complete_snapshot()`. +#[tokio::test] +async fn test_cluster() { + std::panic::set_hook(Box::new(|panic| { + log_panic(panic); + })); + + tracing_subscriber::fmt() + .with_target(true) + .with_thread_ids(true) + .with_level(true) + .with_ansi(false) + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + // This test only use memory service for simplicity. + // Feel free to test against fs or s3. + let op = opendal::Operator::via_map(opendal::Scheme::Memory, HashMap::default()).unwrap(); + + let router = Router::default(); + + let local = LocalSet::new(); + + let (raft1, app1) = new_raft(1, router.clone(), op.clone()).await; + let (raft2, app2) = new_raft(2, router.clone(), op.clone()).await; + + let rafts = [raft1, raft2]; + + local + .run_until(async move { + task::spawn_local(app1.run()); + task::spawn_local(app2.run()); + + run_test(&rafts, router).await; + }) + .await; +} + +async fn run_test(rafts: &[typ::Raft], router: Router) { + let _ = router; + + // Wait for server to start up. + tokio::time::sleep(Duration::from_millis(200)).await; + + let raft1 = &rafts[0]; + let raft2 = &rafts[1]; + + println!("=== init single node cluster"); + { + let mut nodes = BTreeMap::new(); + nodes.insert(1, BasicNode { addr: "".to_string() }); + raft1.initialize(nodes).await.unwrap(); + } + + println!("=== write 2 logs"); + { + let resp = raft1.client_write(Request::set("foo1", "bar1")).await.unwrap(); + println!("write resp: {:#?}", resp); + let resp = raft1.client_write(Request::set("foo2", "bar2")).await.unwrap(); + println!("write resp: {:#?}", resp); + } + + println!("=== let node-1 take a snapshot"); + { + raft1.trigger().snapshot().await.unwrap(); + + // Wait for a while to let the snapshot get done. + tokio::time::sleep(Duration::from_millis(500)).await; + } + + println!("=== metrics after building snapshot"); + { + let metrics = raft1.metrics().borrow().clone(); + println!("node 1 metrics: {:#?}", metrics); + assert_eq!(Some(3), metrics.snapshot.map(|x| x.index)); + assert_eq!(Some(3), metrics.purged.map(|x| x.index)); + } + + println!("=== add-learner node-2"); + { + let node = BasicNode { addr: "".to_string() }; + let resp = raft1.add_learner(2, node, true).await.unwrap(); + println!("add-learner node-2 resp: {:#?}", resp); + } + + // Wait for a while to let the node 2 to receive snapshot replication. + tokio::time::sleep(Duration::from_millis(500)).await; + + println!("=== metrics of node 2 that received snapshot"); + { + let metrics = raft2.metrics().borrow().clone(); + println!("node 2 metrics: {:#?}", metrics); + assert_eq!(Some(3), metrics.snapshot.map(|x| x.index)); + assert_eq!(Some(3), metrics.purged.map(|x| x.index)); + } + + // In this example, the snapshot is just a copy of the state machine. + let snapshot = raft2.get_snapshot().await.unwrap(); + println!("node 2 received snapshot: {:#?}", snapshot); +}