Skip to content

Commit

Permalink
Merge branch 'main' into oneshot-runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
Miaxos committed Feb 28, 2024
2 parents 3929a52 + 685fe8d commit 090fb36
Show file tree
Hide file tree
Showing 35 changed files with 147 additions and 305 deletions.
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-generic-snapshot-data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This example is similar to the basic raft-kv-memstore example
but focuses on how to handle snapshot with `generic-snapshot-data` enabled.
Other aspects are minimized.

To send a complete snapshot, Refer to implementation of `RaftNetwork::snapshot()` in this example.
To send a complete snapshot, Refer to implementation of `RaftNetwork::full_snapshot()` in this example.

To receive a complete snapshot, Refer to implementation of `api::snapshot()` in this example.

Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-generic-snapshot-data/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub async fn snapshot(app: &mut App, req: String) -> String {
};
let res = app
.raft
.install_complete_snapshot(vote, snapshot)
.install_full_snapshot(vote, snapshot)
.await
.map_err(typ::RaftError::<typ::Infallible>::Fatal);
encode(res)
Expand Down
11 changes: 8 additions & 3 deletions examples/raft-kv-memstore-generic-snapshot-data/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ impl RaftNetworkFactory<TypeConfig> for Router {
}

impl RaftNetwork<TypeConfig> for Connection {
async fn send_append_entries(
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, typ::RPCError> {
let resp = self
.router
Expand All @@ -50,7 +51,7 @@ impl RaftNetwork<TypeConfig> for Connection {
}

/// A real application should replace this method with customized implementation.
async fn snapshot(
async fn full_snapshot(
&mut self,
vote: Vote<NodeId>,
snapshot: Snapshot<TypeConfig>,
Expand All @@ -65,7 +66,11 @@ impl RaftNetwork<TypeConfig> for Connection {
Ok(resp)
}

async fn send_vote(&mut self, req: VoteRequest<NodeId>) -> Result<VoteResponse<NodeId>, typ::RPCError> {
async fn vote(
&mut self,
req: VoteRequest<NodeId>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, typ::RPCError> {
let resp = self
.router
.send(self.target, "/raft/vote", req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ pub fn log_panic(panic: &PanicInfo) {
///
/// - Setup a single node cluster, write some logs, take a snapshot;
/// - Add a learner node-2 to receive snapshot replication, via the complete-snapshot API:
/// - The sending end sends snapshot with `RaftNetwork::snapshot()`;
/// - The sending end sends snapshot with `RaftNetwork::full_snapshot()`;
/// - The receiving end deliver the received snapshot to `Raft` with
/// `Raft::install_complete_snapshot()`.
/// `Raft::install_full_snapshot()`.
#[tokio::test]
async fn test_cluster() {
std::panic::set_hook(Box::new(|panic| {
Expand Down
1 change: 1 addition & 0 deletions examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ license = "MIT OR Apache-2.0"
repository = "https://github.com/datafuselabs/openraft"

[dependencies]
memstore = { path = "../memstore", features = [] }
openraft = { path = "../../openraft", features = ["serde", "storage-v2", "generic-snapshot-data"] }

serde = { version = "1.0.114", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-opendal-snapshot-data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This example is similar to the basic raft-kv-memstore example
but focuses on how to store and fetch snapshot data from remote storage.
Other aspects are minimized.

To send a complete snapshot, Refer to implementation of `RaftNetwork::snapshot()` in this example.
To send a complete snapshot, Refer to implementation of `RaftNetwork::full_snapshot()` in this example.

To receive a complete snapshot, Refer to implementation of `api::snapshot()` in this example.

Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub async fn snapshot(app: &mut App, req: String) -> String {
};
let res = app
.raft
.install_complete_snapshot(vote, snapshot)
.install_full_snapshot(vote, snapshot)
.await
.map_err(typ::RaftError::<typ::Infallible>::Fatal);
encode(res)
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub async fn new_raft(node_id: NodeId, router: Router, op: Operator) -> (typ::Ra
let config = Arc::new(config.validate().unwrap());

// Create a instance of where the Raft logs will be stored.
let log_store = Arc::new(LogStore::default());
let log_store = LogStore::default();

// Create a instance of where the state machine data will be stored.
let state_machine_store = Arc::new(StateMachineStore::new(op.clone()));
Expand Down
11 changes: 8 additions & 3 deletions examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ impl RaftNetworkFactory<TypeConfig> for Router {
}

impl RaftNetwork<TypeConfig> for Connection {
async fn send_append_entries(
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, typ::RPCError> {
let resp = self
.router
Expand All @@ -50,7 +51,7 @@ impl RaftNetwork<TypeConfig> for Connection {
}

/// A real application should replace this method with customized implementation.
async fn snapshot(
async fn full_snapshot(
&mut self,
vote: Vote<NodeId>,
snapshot: Snapshot<TypeConfig>,
Expand All @@ -65,7 +66,11 @@ impl RaftNetwork<TypeConfig> for Connection {
Ok(resp)
}

async fn send_vote(&mut self, req: VoteRequest<NodeId>) -> Result<VoteResponse<NodeId>, typ::RPCError> {
async fn vote(
&mut self,
req: VoteRequest<NodeId>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, typ::RPCError> {
let resp = self
.router
.send(self.target, "/raft/vote", req)
Expand Down
133 changes: 2 additions & 131 deletions examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::ops::RangeBounds;
use std::sync::Arc;
use std::sync::Mutex;

use opendal::Operator;
use openraft::storage::LogFlushed;
use openraft::storage::LogState;
use openraft::storage::RaftLogStorage;
use openraft::storage::RaftStateMachine;
use openraft::storage::Snapshot;
use openraft::BasicNode;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
use openraft::RaftTypeConfig;
use openraft::SnapshotMeta;
use openraft::StorageError;
use openraft::StoredMembership;
use openraft::Vote;
use serde::Deserialize;
use serde::Serialize;

Expand All @@ -30,6 +24,8 @@ use crate::typ;
use crate::NodeId;
use crate::TypeConfig;

pub type LogStore = memstore::LogStore<TypeConfig>;

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Request {
Set { key: String, value: String },
Expand Down Expand Up @@ -95,30 +91,6 @@ impl StateMachineStore {
}
}

#[derive(Debug, Default)]
pub struct LogStore {
last_purged_log_id: Mutex<Option<LogId<NodeId>>>,

/// The Raft log.
log: Mutex<BTreeMap<u64, Entry<TypeConfig>>>,

committed: Mutex<Option<LogId<NodeId>>>,

/// The current granted vote.
vote: Mutex<Option<Vote<NodeId>>>,
}

impl RaftLogReader<TypeConfig> for Arc<LogStore> {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug>(
&mut self,
range: RB,
) -> Result<Vec<Entry<TypeConfig>>, StorageError<NodeId>> {
let log = self.log.lock().unwrap();
let response = log.range(range.clone()).map(|(_, val)| val.clone()).collect::<Vec<_>>();
Ok(response)
}
}

impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self))]
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<NodeId>> {
Expand Down Expand Up @@ -269,104 +241,3 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
self.clone()
}
}

impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
type LogReader = Self;

async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<NodeId>> {
let log = self.log.lock().unwrap();
let last = log.iter().next_back().map(|(_, ent)| ent.log_id);

let last_purged = *self.last_purged_log_id.lock().unwrap();

let last = match last {
None => last_purged,
Some(x) => Some(x),
};

Ok(LogState {
last_purged_log_id: last_purged,
last_log_id: last,
})
}

async fn save_committed(&mut self, committed: Option<LogId<NodeId>>) -> Result<(), StorageError<NodeId>> {
let mut c = self.committed.lock().unwrap();
*c = committed;
Ok(())
}

async fn read_committed(&mut self) -> Result<Option<LogId<NodeId>>, StorageError<NodeId>> {
let committed = self.committed.lock().unwrap();
Ok(*committed)
}

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<NodeId>> {
let mut v = self.vote.lock().unwrap();
*v = Some(*vote);
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
Ok(*self.vote.lock().unwrap())
}

#[tracing::instrument(level = "trace", skip(self, entries, callback))]
async fn append<I>(
&mut self,
entries: I,
callback: LogFlushed<<TypeConfig as RaftTypeConfig>::AsyncRuntime, NodeId>,
) -> Result<(), StorageError<NodeId>>
where
I: IntoIterator<Item = Entry<TypeConfig>>,
{
// Simple implementation that calls the flush-before-return `append_to_log`.
let mut log = self.log.lock().unwrap();
for entry in entries {
log.insert(entry.log_id.index, entry);
}
callback.log_io_completed(Ok(()));

Ok(())
}

#[tracing::instrument(level = "debug", skip(self))]
async fn truncate(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<NodeId>> {
tracing::debug!("delete_log: [{:?}, +oo)", log_id);

let mut log = self.log.lock().unwrap();
let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::<Vec<_>>();
for key in keys {
log.remove(&key);
}

Ok(())
}

#[tracing::instrument(level = "debug", skip(self))]
async fn purge(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<NodeId>> {
tracing::debug!("delete_log: (-oo, {:?}]", log_id);

{
let mut ld = self.last_purged_log_id.lock().unwrap();
assert!(*ld <= Some(log_id));
*ld = Some(log_id);
}

{
let mut log = self.log.lock().unwrap();

let keys = log.range(..=log_id.index).map(|(k, _v)| *k).collect::<Vec<_>>();
for key in keys {
log.remove(&key);
}
}

Ok(())
}

async fn get_log_reader(&mut self) -> Self::LogReader {
self.clone()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ pub fn log_panic(panic: &PanicInfo) {
///
/// - Setup a single node cluster, write some logs, take a snapshot;
/// - Add a learner node-2 to receive snapshot replication, via the complete-snapshot API:
/// - The sending end sends snapshot with `RaftNetwork::snapshot()`;
/// - The sending end sends snapshot with `RaftNetwork::full_snapshot()`;
/// - The receiving end deliver the received snapshot to `Raft` with
/// `Raft::install_complete_snapshot()`.
/// `Raft::install_full_snapshot()`.
#[tokio::test]
async fn test_cluster() {
std::panic::set_hook(Box::new(|panic| {
Expand Down
13 changes: 10 additions & 3 deletions examples/raft-kv-memstore-singlethreaded/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use openraft::error::InstallSnapshotError;
use openraft::error::RemoteError;
use openraft::network::RPCOption;
use openraft::raft::AppendEntriesRequest;
use openraft::raft::AppendEntriesResponse;
use openraft::raft::InstallSnapshotRequest;
Expand Down Expand Up @@ -32,9 +33,10 @@ impl RaftNetworkFactory<TypeConfig> for Router {
}

impl RaftNetwork<TypeConfig> for Connection {
async fn send_append_entries(
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, typ::RPCError> {
let resp = self
.router
Expand All @@ -44,9 +46,10 @@ impl RaftNetwork<TypeConfig> for Connection {
Ok(resp)
}

async fn send_install_snapshot(
async fn install_snapshot(
&mut self,
req: InstallSnapshotRequest<TypeConfig>,
_option: RPCOption,
) -> Result<InstallSnapshotResponse<NodeId>, typ::RPCError<InstallSnapshotError>> {
let resp = self
.router
Expand All @@ -56,7 +59,11 @@ impl RaftNetwork<TypeConfig> for Connection {
Ok(resp)
}

async fn send_vote(&mut self, req: VoteRequest<NodeId>) -> Result<VoteResponse<NodeId>, typ::RPCError> {
async fn vote(
&mut self,
req: VoteRequest<NodeId>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, typ::RPCError> {
let resp = self
.router
.send(self.target, "/raft/vote", req)
Expand Down
13 changes: 10 additions & 3 deletions examples/raft-kv-memstore/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use openraft::error::InstallSnapshotError;
use openraft::error::NetworkError;
use openraft::error::RemoteError;
use openraft::network::RPCOption;
use openraft::network::RaftNetwork;
use openraft::network::RaftNetworkFactory;
use openraft::raft::AppendEntriesRequest;
Expand Down Expand Up @@ -77,21 +78,27 @@ pub struct NetworkConnection {
}

impl RaftNetwork<TypeConfig> for NetworkConnection {
async fn send_append_entries(
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, typ::RPCError> {
self.owner.send_rpc(self.target, &self.target_node, "raft-append", req).await
}

async fn send_install_snapshot(
async fn install_snapshot(
&mut self,
req: InstallSnapshotRequest<TypeConfig>,
_option: RPCOption,
) -> Result<InstallSnapshotResponse<NodeId>, typ::RPCError<InstallSnapshotError>> {
self.owner.send_rpc(self.target, &self.target_node, "raft-snapshot", req).await
}

async fn send_vote(&mut self, req: VoteRequest<NodeId>) -> Result<VoteResponse<NodeId>, typ::RPCError> {
async fn vote(
&mut self,
req: VoteRequest<NodeId>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, typ::RPCError> {
self.owner.send_rpc(self.target, &self.target_node, "raft-vote", req).await
}
}
Loading

0 comments on commit 090fb36

Please sign in to comment.