Skip to content

Commit

Permalink
Fix: throw unreachable error if connection fails
Browse files Browse the repository at this point in the history
  • Loading branch information
sainad2222 committed Dec 12, 2024
1 parent 725bfaa commit 70273de
Showing 1 changed file with 34 additions and 12 deletions.
46 changes: 34 additions & 12 deletions examples/raft-kv-memstore-grpc/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bincode::deserialize;
use bincode::serialize;
use openraft::error::NetworkError;
use openraft::error::Unreachable;
use openraft::network::v2::RaftNetworkV2;
use openraft::network::RPCOption;
use openraft::raft::AppendEntriesRequest;
Expand All @@ -24,8 +25,6 @@ use crate::TypeConfig;
/// Provides the networking layer for Raft nodes to communicate with each other.
pub struct Network {}

type RaftServiceClient = InternalServiceClient<Channel>;

impl Network {}

/// Implementation of the RaftNetworkFactory trait for creating new network connections.
Expand All @@ -35,21 +34,20 @@ impl RaftNetworkFactory<TypeConfig> for Network {

#[tracing::instrument(level = "debug", skip_all)]
async fn new_client(&mut self, _: NodeId, node: &Node) -> Self::Network {
let channel = Channel::builder(format!("http://{}", node.rpc_addr).parse().unwrap()).connect().await.unwrap();
NetworkConnection::new(InternalServiceClient::new(channel))
NetworkConnection::new(node.clone())
}
}

/// Represents an active network connection to a remote Raft node.
/// Handles serialization and deserialization of Raft messages over gRPC.
pub struct NetworkConnection {
client: RaftServiceClient,
target_node: Node,
}

impl NetworkConnection {
/// Creates a new NetworkConnection with the provided gRPC client.
pub fn new(client: RaftServiceClient) -> Self {
NetworkConnection { client }
pub fn new(target_node: Node) -> Self {
NetworkConnection { target_node }
}
}

Expand All @@ -61,10 +59,18 @@ impl RaftNetworkV2<TypeConfig> for NetworkConnection {
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<TypeConfig>, RPCError> {
let server_addr = self.target_node.rpc_addr.clone();
let channel = match Channel::builder(format!("http://{}", server_addr).parse().unwrap()).connect().await {
Ok(channel) => channel,
Err(e) => {
return Err(openraft::error::RPCError::Unreachable(Unreachable::new(&e)));
}
};
let mut client = InternalServiceClient::new(channel);

let value = serialize(&req).map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
let request = RaftRequestBytes { value };
let response =
self.client.append_entries(request).await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
let response = client.append_entries(request).await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
let message = response.into_inner();
let result = deserialize(&message.value).map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
Ok(result)
Expand All @@ -77,6 +83,14 @@ impl RaftNetworkV2<TypeConfig> for NetworkConnection {
_cancel: impl std::future::Future<Output = openraft::error::ReplicationClosed> + openraft::OptionalSend + 'static,
_option: RPCOption,
) -> Result<openraft::raft::SnapshotResponse<TypeConfig>, crate::typ::StreamingError> {
let server_addr = self.target_node.rpc_addr.clone();
let channel = match Channel::builder(format!("http://{}", server_addr).parse().unwrap()).connect().await {
Ok(channel) => channel,
Err(e) => {
return Err(openraft::error::RPCError::Unreachable(Unreachable::new(&e)).into());
}
};
let mut client = InternalServiceClient::new(channel);
// Serialize the vote and snapshot metadata
let rpc_meta =
serialize(&(vote, snapshot.meta.clone())).map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
Expand Down Expand Up @@ -106,8 +120,7 @@ impl RaftNetworkV2<TypeConfig> for NetworkConnection {
let requests_stream = futures::stream::iter(requests);

// Send the streaming snapshot request
let response =
self.client.snapshot(requests_stream).await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
let response = client.snapshot(requests_stream).await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
let message = response.into_inner();

// Deserialize the response
Expand All @@ -120,14 +133,23 @@ impl RaftNetworkV2<TypeConfig> for NetworkConnection {
req: VoteRequest<TypeConfig>,
_option: RPCOption,
) -> Result<VoteResponse<TypeConfig>, RPCError> {
let server_addr = self.target_node.rpc_addr.clone();
let channel = match Channel::builder(format!("http://{}", server_addr).parse().unwrap()).connect().await {
Ok(channel) => channel,
Err(e) => {
return Err(openraft::error::RPCError::Unreachable(Unreachable::new(&e)));
}
};
let mut client = InternalServiceClient::new(channel);

// Convert the openraft VoteRequest to protobuf VoteRequest
let proto_vote_req: PbVoteRequest = req.into();

// Create a tonic Request with the protobuf VoteRequest
let request = tonic::Request::new(proto_vote_req);

// Send the vote request
let response = self.client.vote(request).await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
let response = client.vote(request).await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;

// Convert the response back to openraft VoteResponse
let proto_vote_resp: PbVoteResponse = response.into_inner();
Expand Down

0 comments on commit 70273de

Please sign in to comment.