Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LAN Cluster #24

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
368 changes: 338 additions & 30 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ resolver = "2"

members = [
"app/roster",
"lib/lan_protocol",
]

[workspace.package]
Expand All @@ -17,10 +18,15 @@ config = "0.14"
derive_builder = "0.20"
dotenv = "0.15"
monoio = "0.2.2"
openraft = { git = "https://github.com/miaxos/openraft.git", branch = "add-monoio-runtime", features = ["monoio"] }
insta = { version = "1", features = ["yaml"] }
serde = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
url = "2.5.0"
ulid = "1.1.2"

roster-lan-protocol = { version = "0.*.*", path = "./lib/lan_protocol" }

[profile.dev]
panic = "abort"
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ between our two instances, as for Redis & Dragonfly.

The full list of implemented commands can be checked [here](./docs/cmd_list.md).

The full differences between redis & roster can be checked
[here](./docs/differences.md).

## Architecture

### Performance
Expand Down Expand Up @@ -101,6 +104,10 @@ between a fixed number of thread.
<img src="./docs/storage.svg" width="60%" />
</p>

## Contributing

Please, feel free to contribute in any way you want for now.

## References

- [RESP3](https://github.com/redis/redis-specifications/blob/master/protocol/RESP3.md)
Expand Down
4 changes: 4 additions & 0 deletions app/roster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,22 @@ futures-locks = "0.7"
indexmap = "2"
local-sync = "0.1"
monoio = { workspace = true, features = ["bytes", "sync", "iouring"] }
openraft = { workspace = true, features = ["serde", "monoio"] }
rustc-hash = "1.1.0"
scc = "2"
sharded-thread = "1"
serde.workspace = true
thiserror = "1"
url = { workspace = true, features = ["serde"] }
rand = "0.8"
zstd = "0.13"

# Logging
tracing = { workspace = true, features = ["attributes"] }
tracing-subscriber = { workspace = true, features = ["registry", "env-filter", "json"] }

roster-lan-protocol.workspace = true

[target.'cfg(windows)'.dependencies]
monoio = { workspace = true, features = ["bytes", "legacy"] }

Expand Down
38 changes: 38 additions & 0 deletions app/roster/config.local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,41 @@ bind_addr = "0.0.0.0:3456"
# When this limit is reached, the server will stop accepting connections until
# an active connection terminates.
max_connection = 200

# ------------------------------------------------------------------------------

# The lan cluster describe a cluster which should only be available "locally"
# with a quite fast path between each members
[lan_cluster]
# The bind address we are going to listen to for the LAN cluster communication.
bind_addr = "127.0.0.1:20456"
# To identify the actual node in the tracing
node_id = "11111111-1111-1111-1111-111111111111"

# The autopilot allow for automatic management of roster servers.
# It can
# - Clean up old servers
#
# We follow the settings from `consul`.
#
# The configuration entered here is only available at the cluster creation. Once
# the cluster is created, you won't be able to update it (unless it's something
# that you need and you create an issue in the repository).
[lan_cluster.autopilot]
# To remove dead servers everytime a new node is added to the cluster or
# periodically.
clean_up_dead_servers = true
# The maximum amount of time a server can go without contact from leader before
# being considered unhealthy and removed when the cleanup process kick in.
last_contact_threshold = "5s"
# The minimal number of server needed to establish a quorum.
min_quorum = 0

# Retry join describe the address needed to be able to connect to the server
# Right now there are no authorization and encryption, but later we should use
# mTLS to secure connection between members.
[[lan_cluster.retry_join]]
api_addr = "127.0.0.1:27001"
# ca_cert_file = "/path/to/ca1"
# client_cert_file = "/path/to/client/cert1"
# client_key_file = "/path/to/client/key1"
22 changes: 22 additions & 0 deletions app/roster/src/application/server/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,4 +388,26 @@ mod tests {
assert!(Frame::check(&mut cur).is_ok());
}
}

#[test]
fn test_null_frame() {
let test_case: Vec<&[u8]> = vec![b"$-1\r\n"];

for t in test_case {
let b = BytesMut::from(t);
let mut cur = Cursor::new(&b);
assert!(Frame::check(&mut cur).is_ok());
}
}

#[test]
fn test_incomplete_frame() {
let test_case: Vec<&[u8]> = vec![b"*2\r\n$3\r\nGET\r\n$5\r\nhel"];

for t in test_case {
let b = BytesMut::from(t);
let mut cur = Cursor::new(&b);
assert!(Frame::check(&mut cur).is_err());
}
}
}
32 changes: 32 additions & 0 deletions app/roster/src/domain/cluster/lan_cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
mod node;
mod snapshot;
mod state_machine;
mod store;
use std::io::Cursor;

pub use node::LANNode;
use openraft::storage::Adaptor;
use openraft::MonoioRuntime;
use roster_lan_protocol::{
RequestEnveloppe, RequestLAN, ResponseEnveloppe, ResponseLAN,
};
use store::Store;

pub type NodeRaftID = u64;

openraft::declare_raft_types!(
pub TypeConfig: D = RequestLAN, R = ResponseLAN, NodeId = NodeRaftID, Node = LANNode,
Entry = openraft::Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>, AsyncRuntime = MonoioRuntime
);

pub type LogStore = Adaptor<TypeConfig, Store>;
/*
pub type StateMachineStore = Adaptor<TypeConfig, Arc<Store>>;
pub type Raft =
openraft::Raft<TypeConfig, Network, LogStore, StateMachineStore>;
*/

/// [LANCluster] is the cluster where we'll distribute every hash keys between
/// roster instances
#[derive(Debug, Clone)]
pub struct LANCluster {}
35 changes: 35 additions & 0 deletions app/roster/src/domain/cluster/lan_cluster/node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::fmt::Display;

/// An implementation of trait [`Node`] that contains minimal node information.
///
/// The most common usage is to store the connecting address of a node.
/// So that an application does not need an additional store to support its
/// [`RaftNetwork`](crate::RaftNetwork) implementation.
///
/// An application is also free not to use this storage and implements its own
/// node-id to address mapping.
#[derive(
Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize,
)]
pub struct LANNode {
/// User defined string that represent the endpoint of the target node.
///
/// It is used by [`RaftNetwork`](crate::RaftNetwork) for connecting to
/// target node.
pub addr: String,
}

impl LANNode {
/// Creates as [`BasicNode`].
pub fn new(addr: impl ToString) -> Self {
Self {
addr: addr.to_string(),
}
}
}

impl Display for LANNode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.addr)
}
}
72 changes: 72 additions & 0 deletions app/roster/src/domain/cluster/lan_cluster/snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use openraft::{
add_async_trait, BasicNode, RaftSnapshotBuilder, Snapshot, SnapshotMeta,
StorageError, StorageIOError,
};

use super::store::Store;
use super::{NodeRaftID, TypeConfig};

#[derive(Debug)]
pub struct StoredSnapshot {
pub meta: SnapshotMeta<NodeRaftID, BasicNode>,

/// The data of the state machine at the time of this snapshot.
pub data: Vec<u8>,
}

#[add_async_trait]
impl RaftSnapshotBuilder<TypeConfig> for Store {
async fn build_snapshot(
&mut self,
) -> Result<Snapshot<TypeConfig>, StorageError<NodeRaftID>> {
todo!()
/*
let data;
let last_applied_log;
let last_membership;

{
// Serialize the data of the state machine.
let state_machine = self.state_machine.read().await;
data = serde_json::to_vec(&*state_machine)
.map_err(|e| StorageIOError::read_state_machine(&e))?;

last_applied_log = state_machine.last_applied_log;
last_membership = state_machine.last_membership.clone();
}

let snapshot_idx = {
let mut l = self.snapshot_idx.lock().unwrap();
*l += 1;
*l
};

let snapshot_id = if let Some(last) = last_applied_log {
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
} else {
format!("--{}", snapshot_idx)
};

let meta = SnapshotMeta {
last_log_id: last_applied_log,
last_membership,
snapshot_id,
};

let snapshot = StoredSnapshot {
meta: meta.clone(),
data: data.clone(),
};

{
let mut current_snapshot = self.current_snapshot.write().await;
*current_snapshot = Some(snapshot);
}

Ok(Snapshot {
meta,
snapshot: Box::new(Cursor::new(data)),
})
*/
}
}
13 changes: 13 additions & 0 deletions app/roster/src/domain/cluster/lan_cluster/state_machine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use openraft::{LogId, StoredMembership};
use serde::{Deserialize, Serialize};

use super::{LANNode, NodeRaftID};

/// This state represents a copy of the data between each node. We have to be
/// careful with what is stored here as it'll be shared with every Node of the
/// LocalCluster.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct StateMachine {
pub last_applied_log: Option<LogId<NodeRaftID>>,
pub last_membership: StoredMembership<NodeRaftID, LANNode>,
}
Loading
Loading