Skip to content

Commit

Permalink
Refactor: update example/raft-kv-rocksstore to use storage-v2 API
Browse files Browse the repository at this point in the history
- Fix: #976
  • Loading branch information
drmingdrmer committed Dec 21, 2023
1 parent 5403e0e commit 4bc9083
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 381 deletions.
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ name = "raft-key-value-rocks"
path = "src/bin/main.rs"

[dependencies]
openraft = { path = "../../openraft", features = ["serde"] }
openraft = { path = "../../openraft", features = ["serde", "storage-v2"] }

async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
async-trait = "0.1.36"
Expand Down
5 changes: 3 additions & 2 deletions examples/raft-kv-rocksdb/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use async_std::sync::RwLock;
use openraft::Config;

use crate::ExampleRaft;
use crate::NodeId;
use crate::Store;

// Representation of an application state. This struct can be shared around to share
// instances of raft, store and more.
Expand All @@ -13,6 +14,6 @@ pub struct App {
pub api_addr: String,
pub rcp_addr: String,
pub raft: ExampleRaft,
pub store: Arc<Store>,
pub key_values: Arc<RwLock<BTreeMap<String, String>>>,
pub config: Arc<Config>,
}
52 changes: 13 additions & 39 deletions examples/raft-kv-rocksdb/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,20 @@ use std::collections::BTreeSet;
use std::sync::Arc;
use std::sync::Mutex;

use openraft::error::CheckIsLeaderError;
use openraft::error::ClientWriteError;
use openraft::error::ForwardToLeader;
use openraft::error::InitializeError;
use openraft::error::NetworkError;
use openraft::error::RPCError;
use openraft::error::RaftError;
use openraft::error::RemoteError;
use openraft::raft::ClientWriteResponse;
use openraft::RaftMetrics;
use openraft::TryAsRef;
use reqwest::Client;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::Serialize;

use crate::typ;
use crate::Node;
use crate::NodeId;
use crate::Request;
use crate::TypeConfig;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Empty {}
Expand Down Expand Up @@ -52,30 +46,21 @@ impl ExampleClient {
/// will be applied to state machine.
///
/// The result of applying the request will be returned.
pub async fn write(
&self,
req: &Request,
) -> Result<
ClientWriteResponse<TypeConfig>,
RPCError<NodeId, Node, RaftError<NodeId, ClientWriteError<NodeId, Node>>>,
> {
pub async fn write(&self, req: &Request) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
self.send_rpc_to_leader("api/write", Some(req)).await
}

/// Read value by key, in an inconsistent mode.
///
/// This method may return stale value because it does not force to read on a legal leader.
pub async fn read(&self, req: &String) -> Result<String, RPCError<NodeId, Node, RaftError<NodeId>>> {
pub async fn read(&self, req: &String) -> Result<String, typ::RPCError> {
self.do_send_rpc_to_leader("api/read", Some(req)).await
}

/// Consistent Read value by key, in an inconsistent mode.
///
/// This method MUST return consistent value or CheckIsLeaderError.
pub async fn consistent_read(
&self,
req: &String,
) -> Result<String, RPCError<NodeId, Node, RaftError<NodeId, CheckIsLeaderError<NodeId, Node>>>> {
pub async fn consistent_read(&self, req: &String) -> Result<String, typ::RPCError<typ::CheckIsLeaderError>> {
self.do_send_rpc_to_leader("api/consistent_read", Some(req)).await
}

Expand All @@ -87,7 +72,7 @@ impl ExampleClient {
/// With a initialized cluster, new node can be added with [`write`].
/// Then setup replication with [`add_learner`].
/// Then make the new node a member with [`change_membership`].
pub async fn init(&self) -> Result<(), RPCError<NodeId, Node, RaftError<NodeId, InitializeError<NodeId, Node>>>> {
pub async fn init(&self) -> Result<(), typ::RPCError<typ::InitializeError>> {
self.do_send_rpc_to_leader("cluster/init", Some(&Empty {})).await
}

Expand All @@ -97,10 +82,7 @@ impl ExampleClient {
pub async fn add_learner(
&self,
req: (NodeId, String, String),
) -> Result<
ClientWriteResponse<TypeConfig>,
RPCError<NodeId, Node, RaftError<NodeId, ClientWriteError<NodeId, Node>>>,
> {
) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
self.send_rpc_to_leader("cluster/add-learner", Some(&req)).await
}

Expand All @@ -111,10 +93,7 @@ impl ExampleClient {
pub async fn change_membership(
&self,
req: &BTreeSet<NodeId>,
) -> Result<
ClientWriteResponse<TypeConfig>,
RPCError<NodeId, Node, RaftError<NodeId, ClientWriteError<NodeId, Node>>>,
> {
) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
self.send_rpc_to_leader("cluster/change-membership", Some(req)).await
}

Expand All @@ -123,7 +102,7 @@ impl ExampleClient {
/// Metrics contains various information about the cluster, such as current leader,
/// membership config, replication status etc.
/// See [`RaftMetrics`].
pub async fn metrics(&self) -> Result<RaftMetrics<NodeId, Node>, RPCError<NodeId, Node, RaftError<NodeId>>> {
pub async fn metrics(&self) -> Result<RaftMetrics<NodeId, Node>, typ::RPCError> {
self.do_send_rpc_to_leader("cluster/metrics", None::<&()>).await
}

Expand Down Expand Up @@ -179,32 +158,27 @@ impl ExampleClient {
///
/// If the target node is not a leader, a `ForwardToLeader` error will be
/// returned and this client will retry at most 3 times to contact the updated leader.
async fn send_rpc_to_leader<Req, Resp, Err>(
&self,
uri: &str,
req: Option<&Req>,
) -> Result<Resp, RPCError<NodeId, Node, RaftError<NodeId, Err>>>
async fn send_rpc_to_leader<Req, Resp, Err>(&self, uri: &str, req: Option<&Req>) -> Result<Resp, typ::RPCError<Err>>
where
Req: Serialize + 'static,
Resp: Serialize + DeserializeOwned,
Err: std::error::Error + Serialize + DeserializeOwned + TryAsRef<ForwardToLeader<NodeId, Node>> + Clone,
Err: std::error::Error + Serialize + DeserializeOwned + TryAsRef<typ::ForwardToLeader> + Clone,
{
// Retry at most 3 times to find a valid leader.
let mut n_retry = 3;

loop {
let res: Result<Resp, RPCError<NodeId, Node, RaftError<NodeId, Err>>> =
self.do_send_rpc_to_leader(uri, req).await;
let res: Result<Resp, typ::RPCError<Err>> = self.do_send_rpc_to_leader(uri, req).await;

let rpc_err = match res {
Ok(x) => return Ok(x),
Err(rpc_err) => rpc_err,
};

if let RPCError::RemoteError(remote_err) = &rpc_err {
let raft_err: &RaftError<NodeId, _> = &remote_err.source;
let raft_err: &typ::RaftError<_> = &remote_err.source;

if let Some(ForwardToLeader {
if let Some(typ::ForwardToLeader {
leader_id: Some(leader_id),
leader_node: Some(leader_node),
..
Expand Down
47 changes: 35 additions & 12 deletions examples/raft-kv-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@ use std::sync::Arc;

use async_std::net::TcpListener;
use async_std::task;
use openraft::storage::Adaptor;
use openraft::Config;
use openraft::TokioRuntime;

use crate::app::App;
use crate::network::api;
use crate::network::management;
use crate::network::Network;
use crate::store::new_storage;
use crate::store::Request;
use crate::store::Response;
use crate::store::Store;

pub mod app;
pub mod client;
Expand All @@ -39,14 +38,39 @@ impl Display for Node {
}
}

pub type SnapshotData = Cursor<Vec<u8>>;

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = Node,
Entry = openraft::Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>, AsyncRuntime = TokioRuntime
pub TypeConfig:
D = Request,
R = Response,
NodeId = NodeId,
Node = Node,
Entry = openraft::Entry<TypeConfig>,
SnapshotData = SnapshotData,
AsyncRuntime = TokioRuntime
);

pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;
pub type StateMachineStore = Adaptor<TypeConfig, Arc<Store>>;
pub mod typ {
use openraft::error::Infallible;

use crate::Node;
use crate::NodeId;
use crate::TypeConfig;

pub type Entry = openraft::Entry<TypeConfig>;

pub type RaftError<E = Infallible> = openraft::error::RaftError<NodeId, E>;
pub type RPCError<E = Infallible> = openraft::error::RPCError<NodeId, Node, RaftError<E>>;

pub type ClientWriteError = openraft::error::ClientWriteError<NodeId, Node>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<NodeId, Node>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<NodeId, Node>;
pub type InitializeError = openraft::error::InitializeError<NodeId, Node>;

pub type ClientWriteResponse = openraft::raft::ClientWriteResponse<TypeConfig>;
}

pub type ExampleRaft = openraft::Raft<TypeConfig>;

type Server = tide::Server<Arc<App>>;
Expand All @@ -69,24 +93,23 @@ where

let config = Arc::new(config.validate().unwrap());

// Create a instance of where the Raft data will be stored.
let store = Store::new(&dir).await;
let (log_store, state_machine_store) = new_storage(&dir).await;

let (log_store, state_machine) = Adaptor::new(store.clone());
let kvs = state_machine_store.data.kvs.clone();

// Create the network layer that will connect and communicate the raft instances and
// will be used in conjunction with the store created above.
let network = Network {};

// Create a local raft instance.
let raft = openraft::Raft::new(node_id, config.clone(), network, log_store, state_machine).await.unwrap();
let raft = openraft::Raft::new(node_id, config.clone(), network, log_store, state_machine_store).await.unwrap();

let app = Arc::new(App {
id: node_id,
api_addr: http_addr.clone(),
rcp_addr: rcp_addr.clone(),
raft,
store,
key_values: kvs,
config,
});

Expand Down
12 changes: 6 additions & 6 deletions examples/raft-kv-rocksdb/src/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ async fn write(mut req: Request<Arc<App>>) -> tide::Result {

async fn read(mut req: Request<Arc<App>>) -> tide::Result {
let key: String = req.body_json().await?;
let state_machine = req.state().store.state_machine.read().await;
let value = state_machine.get(&key)?;
let kvs = req.state().key_values.read().await;
let value = kvs.get(&key);

let res: Result<String, Infallible> = Ok(value.unwrap_or_default());
let res: Result<String, Infallible> = Ok(value.cloned().unwrap_or_default());
Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&res)?).build())
}

Expand All @@ -48,11 +48,11 @@ async fn consistent_read(mut req: Request<Arc<App>>) -> tide::Result {
match ret {
Ok(_) => {
let key: String = req.body_json().await?;
let state_machine = req.state().store.state_machine.read().await;
let kvs = req.state().key_values.read().await;

let value = state_machine.get(&key)?;
let value = kvs.get(&key);

let res: Result<String, CheckIsLeaderError<NodeId, Node>> = Ok(value.unwrap_or_default());
let res: Result<String, CheckIsLeaderError<NodeId, Node>> = Ok(value.cloned().unwrap_or_default());
Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&res)?).build())
}
e => Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&e)?).build()),
Expand Down
Loading

0 comments on commit 4bc9083

Please sign in to comment.