Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: update example/raft-kv-rocksstore to use storage-v2 API #979

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ name = "raft-key-value-rocks"
path = "src/bin/main.rs"

[dependencies]
openraft = { path = "../../openraft", features = ["serde"] }
openraft = { path = "../../openraft", features = ["serde", "storage-v2"] }

async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
async-trait = "0.1.36"
Expand Down
5 changes: 3 additions & 2 deletions examples/raft-kv-rocksdb/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use async_std::sync::RwLock;
use openraft::Config;

use crate::ExampleRaft;
use crate::NodeId;
use crate::Store;

// Representation of an application state. This struct can be shared around to share
// instances of raft, store and more.
Expand All @@ -13,6 +14,6 @@ pub struct App {
pub api_addr: String,
pub rcp_addr: String,
pub raft: ExampleRaft,
pub store: Arc<Store>,
pub key_values: Arc<RwLock<BTreeMap<String, String>>>,
pub config: Arc<Config>,
}
52 changes: 13 additions & 39 deletions examples/raft-kv-rocksdb/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,20 @@ use std::collections::BTreeSet;
use std::sync::Arc;
use std::sync::Mutex;

use openraft::error::CheckIsLeaderError;
use openraft::error::ClientWriteError;
use openraft::error::ForwardToLeader;
use openraft::error::InitializeError;
use openraft::error::NetworkError;
use openraft::error::RPCError;
use openraft::error::RaftError;
use openraft::error::RemoteError;
use openraft::raft::ClientWriteResponse;
use openraft::RaftMetrics;
use openraft::TryAsRef;
use reqwest::Client;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::Serialize;

use crate::typ;
use crate::Node;
use crate::NodeId;
use crate::Request;
use crate::TypeConfig;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Empty {}
Expand Down Expand Up @@ -52,30 +46,21 @@ impl ExampleClient {
/// will be applied to state machine.
///
/// The result of applying the request will be returned.
pub async fn write(
&self,
req: &Request,
) -> Result<
ClientWriteResponse<TypeConfig>,
RPCError<NodeId, Node, RaftError<NodeId, ClientWriteError<NodeId, Node>>>,
> {
pub async fn write(&self, req: &Request) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
self.send_rpc_to_leader("api/write", Some(req)).await
}

/// Read value by key, in an inconsistent mode.
///
/// This method may return stale value because it does not force to read on a legal leader.
pub async fn read(&self, req: &String) -> Result<String, RPCError<NodeId, Node, RaftError<NodeId>>> {
pub async fn read(&self, req: &String) -> Result<String, typ::RPCError> {
self.do_send_rpc_to_leader("api/read", Some(req)).await
}

/// Consistent Read value by key, in an inconsistent mode.
///
/// This method MUST return consistent value or CheckIsLeaderError.
pub async fn consistent_read(
&self,
req: &String,
) -> Result<String, RPCError<NodeId, Node, RaftError<NodeId, CheckIsLeaderError<NodeId, Node>>>> {
pub async fn consistent_read(&self, req: &String) -> Result<String, typ::RPCError<typ::CheckIsLeaderError>> {
self.do_send_rpc_to_leader("api/consistent_read", Some(req)).await
}

Expand All @@ -87,7 +72,7 @@ impl ExampleClient {
/// With a initialized cluster, new node can be added with [`write`].
/// Then setup replication with [`add_learner`].
/// Then make the new node a member with [`change_membership`].
pub async fn init(&self) -> Result<(), RPCError<NodeId, Node, RaftError<NodeId, InitializeError<NodeId, Node>>>> {
pub async fn init(&self) -> Result<(), typ::RPCError<typ::InitializeError>> {
self.do_send_rpc_to_leader("cluster/init", Some(&Empty {})).await
}

Expand All @@ -97,10 +82,7 @@ impl ExampleClient {
pub async fn add_learner(
&self,
req: (NodeId, String, String),
) -> Result<
ClientWriteResponse<TypeConfig>,
RPCError<NodeId, Node, RaftError<NodeId, ClientWriteError<NodeId, Node>>>,
> {
) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
self.send_rpc_to_leader("cluster/add-learner", Some(&req)).await
}

Expand All @@ -111,10 +93,7 @@ impl ExampleClient {
pub async fn change_membership(
&self,
req: &BTreeSet<NodeId>,
) -> Result<
ClientWriteResponse<TypeConfig>,
RPCError<NodeId, Node, RaftError<NodeId, ClientWriteError<NodeId, Node>>>,
> {
) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
self.send_rpc_to_leader("cluster/change-membership", Some(req)).await
}

Expand All @@ -123,7 +102,7 @@ impl ExampleClient {
/// Metrics contains various information about the cluster, such as current leader,
/// membership config, replication status etc.
/// See [`RaftMetrics`].
pub async fn metrics(&self) -> Result<RaftMetrics<NodeId, Node>, RPCError<NodeId, Node, RaftError<NodeId>>> {
pub async fn metrics(&self) -> Result<RaftMetrics<NodeId, Node>, typ::RPCError> {
self.do_send_rpc_to_leader("cluster/metrics", None::<&()>).await
}

Expand Down Expand Up @@ -179,32 +158,27 @@ impl ExampleClient {
///
/// If the target node is not a leader, a `ForwardToLeader` error will be
/// returned and this client will retry at most 3 times to contact the updated leader.
async fn send_rpc_to_leader<Req, Resp, Err>(
&self,
uri: &str,
req: Option<&Req>,
) -> Result<Resp, RPCError<NodeId, Node, RaftError<NodeId, Err>>>
async fn send_rpc_to_leader<Req, Resp, Err>(&self, uri: &str, req: Option<&Req>) -> Result<Resp, typ::RPCError<Err>>
where
Req: Serialize + 'static,
Resp: Serialize + DeserializeOwned,
Err: std::error::Error + Serialize + DeserializeOwned + TryAsRef<ForwardToLeader<NodeId, Node>> + Clone,
Err: std::error::Error + Serialize + DeserializeOwned + TryAsRef<typ::ForwardToLeader> + Clone,
{
// Retry at most 3 times to find a valid leader.
let mut n_retry = 3;

loop {
let res: Result<Resp, RPCError<NodeId, Node, RaftError<NodeId, Err>>> =
self.do_send_rpc_to_leader(uri, req).await;
let res: Result<Resp, typ::RPCError<Err>> = self.do_send_rpc_to_leader(uri, req).await;

let rpc_err = match res {
Ok(x) => return Ok(x),
Err(rpc_err) => rpc_err,
};

if let RPCError::RemoteError(remote_err) = &rpc_err {
let raft_err: &RaftError<NodeId, _> = &remote_err.source;
let raft_err: &typ::RaftError<_> = &remote_err.source;

if let Some(ForwardToLeader {
if let Some(typ::ForwardToLeader {
leader_id: Some(leader_id),
leader_node: Some(leader_node),
..
Expand Down
47 changes: 35 additions & 12 deletions examples/raft-kv-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@ use std::sync::Arc;

use async_std::net::TcpListener;
use async_std::task;
use openraft::storage::Adaptor;
use openraft::Config;
use openraft::TokioRuntime;

use crate::app::App;
use crate::network::api;
use crate::network::management;
use crate::network::Network;
use crate::store::new_storage;
use crate::store::Request;
use crate::store::Response;
use crate::store::Store;

pub mod app;
pub mod client;
Expand All @@ -39,14 +38,39 @@ impl Display for Node {
}
}

pub type SnapshotData = Cursor<Vec<u8>>;

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = Node,
Entry = openraft::Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>, AsyncRuntime = TokioRuntime
pub TypeConfig:
D = Request,
R = Response,
NodeId = NodeId,
Node = Node,
Entry = openraft::Entry<TypeConfig>,
SnapshotData = SnapshotData,
AsyncRuntime = TokioRuntime
);

pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;
pub type StateMachineStore = Adaptor<TypeConfig, Arc<Store>>;
pub mod typ {
use openraft::error::Infallible;

use crate::Node;
use crate::NodeId;
use crate::TypeConfig;

pub type Entry = openraft::Entry<TypeConfig>;

pub type RaftError<E = Infallible> = openraft::error::RaftError<NodeId, E>;
pub type RPCError<E = Infallible> = openraft::error::RPCError<NodeId, Node, RaftError<E>>;

pub type ClientWriteError = openraft::error::ClientWriteError<NodeId, Node>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<NodeId, Node>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<NodeId, Node>;
pub type InitializeError = openraft::error::InitializeError<NodeId, Node>;

pub type ClientWriteResponse = openraft::raft::ClientWriteResponse<TypeConfig>;
}

pub type ExampleRaft = openraft::Raft<TypeConfig>;

type Server = tide::Server<Arc<App>>;
Expand All @@ -69,24 +93,23 @@ where

let config = Arc::new(config.validate().unwrap());

// Create a instance of where the Raft data will be stored.
let store = Store::new(&dir).await;
let (log_store, state_machine_store) = new_storage(&dir).await;

let (log_store, state_machine) = Adaptor::new(store.clone());
let kvs = state_machine_store.data.kvs.clone();

// Create the network layer that will connect and communicate the raft instances and
// will be used in conjunction with the store created above.
let network = Network {};

// Create a local raft instance.
let raft = openraft::Raft::new(node_id, config.clone(), network, log_store, state_machine).await.unwrap();
let raft = openraft::Raft::new(node_id, config.clone(), network, log_store, state_machine_store).await.unwrap();

let app = Arc::new(App {
id: node_id,
api_addr: http_addr.clone(),
rcp_addr: rcp_addr.clone(),
raft,
store,
key_values: kvs,
config,
});

Expand Down
12 changes: 6 additions & 6 deletions examples/raft-kv-rocksdb/src/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ async fn write(mut req: Request<Arc<App>>) -> tide::Result {

async fn read(mut req: Request<Arc<App>>) -> tide::Result {
let key: String = req.body_json().await?;
let state_machine = req.state().store.state_machine.read().await;
let value = state_machine.get(&key)?;
let kvs = req.state().key_values.read().await;
let value = kvs.get(&key);

let res: Result<String, Infallible> = Ok(value.unwrap_or_default());
let res: Result<String, Infallible> = Ok(value.cloned().unwrap_or_default());
Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&res)?).build())
}

Expand All @@ -48,11 +48,11 @@ async fn consistent_read(mut req: Request<Arc<App>>) -> tide::Result {
match ret {
Ok(_) => {
let key: String = req.body_json().await?;
let state_machine = req.state().store.state_machine.read().await;
let kvs = req.state().key_values.read().await;

let value = state_machine.get(&key)?;
let value = kvs.get(&key);

let res: Result<String, CheckIsLeaderError<NodeId, Node>> = Ok(value.unwrap_or_default());
let res: Result<String, CheckIsLeaderError<NodeId, Node>> = Ok(value.cloned().unwrap_or_default());
Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&res)?).build())
}
e => Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&e)?).build()),
Expand Down
Loading
Loading