diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 3d86d1cfdda..f005bc77124 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -226,7 +226,7 @@ impl Iroha { telemetry: Option, ) -> Result { let listen_addr = config.torii.p2p_addr.clone(); - let network = IrohaNetwork::start(listen_addr, config.public_key.clone()) + let network = IrohaNetwork::start(listen_addr, config.sumeragi.key_pair.clone()) .await .wrap_err("Unable to start P2P-network")?; diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 7d785eec1e9..c9a27efe5e0 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -73,6 +73,8 @@ pub enum CryptographicError { Encrypt(aead::Error), /// Ursa Cryptography error Ursa(CryptoError), + /// Iroha Cryptography error + Ihora(iroha_crypto::error::Error), } impl> From for Error { diff --git a/p2p/src/network.rs b/p2p/src/network.rs index aa8c9ce67a6..751eb779d3d 100644 --- a/p2p/src/network.rs +++ b/p2p/src/network.rs @@ -6,7 +6,7 @@ use std::{ }; use futures::{stream::FuturesUnordered, StreamExt}; -use iroha_crypto::PublicKey; +use iroha_crypto::{KeyPair, PublicKey}; use iroha_data_model::prelude::PeerId; use iroha_logger::prelude::*; use iroha_primitives::addr::SocketAddr; @@ -65,8 +65,8 @@ impl NetworkBaseHandle { /// /// # Errors /// - If binding to address fail - #[log(skip(public_key))] - pub async fn start(listen_addr: SocketAddr, public_key: PublicKey) -> Result { + #[log(skip(key_pair))] + pub async fn start(listen_addr: SocketAddr, key_pair: KeyPair) -> Result { let listener = TcpListener::bind(&listen_addr.to_string()).await?; iroha_logger::info!("Network bound to listener"); let (online_peers_sender, online_peers_receiver) = watch::channel(HashSet::new()); @@ -82,7 +82,7 @@ impl NetworkBaseHandle { listener, peers: HashMap::new(), connecting_peers: HashMap::new(), - public_key, + key_pair, subscribers_to_peers_messages: Vec::new(), subscribe_to_peers_messages_receiver, online_peers_sender, @@ -166,8 +166,8 @@ struct NetworkBase { connecting_peers: HashMap, /// [`TcpListener`] that is accepting [`Peer`]s' connections listener: TcpListener, - /// Our app-level public key - public_key: PublicKey, + /// Our app-level key pair + key_pair: KeyPair, /// Recipients of messages received from other peers in the network. subscribers_to_peers_messages: Vec>, /// Receiver to subscribe for messages received from other peers in the network. @@ -199,7 +199,7 @@ struct NetworkBase { impl NetworkBase { /// [`Self`] task. - #[log(skip(self), fields(listen_addr=%self.listen_addr, public_key=%self.public_key))] + #[log(skip(self), fields(listen_addr=%self.listen_addr, public_key=%self.key_pair.public_key()))] async fn run(mut self) { // TODO: probably should be configuration parameter let mut update_topology_interval = tokio::time::interval(Duration::from_millis(100)); @@ -273,7 +273,8 @@ impl NetworkBase { let conn_id = self.get_conn_id(); let service_message_sender = self.service_message_sender.clone(); connected_from::( - PeerId::new(addr, &self.public_key), + addr.clone(), + self.key_pair.clone(), Connection::new(conn_id, stream), service_message_sender, ); @@ -281,7 +282,7 @@ impl NetworkBase { fn set_current_topology(&mut self, UpdateTopology(topology): UpdateTopology) { iroha_logger::debug!(?topology, "Network receive new topology"); - let self_public_key_hash = blake2b_hash(self.public_key.payload()); + let self_public_key_hash = blake2b_hash(self.key_pair.public_key().payload()); let topology = topology .into_iter() .map(|peer_id| { @@ -335,7 +336,8 @@ impl NetworkBase { let service_message_sender = self.service_message_sender.clone(); connecting::( // NOTE: we intentionally use peer's address and our public key, it's used during handshake - PeerId::new(&peer.address, &self.public_key), + peer.address.clone(), + self.key_pair.clone(), conn_id, service_message_sender, ); @@ -398,11 +400,13 @@ impl NetworkBase { fn peer_terminated(&mut self, Terminated { peer_id, conn_id }: Terminated) { self.connecting_peers.remove(&conn_id); - if let Some(peer) = self.peers.get(&peer_id.public_key) { - if peer.conn_id == conn_id { - iroha_logger::debug!(conn_id, peer=%peer_id, "Peer terminated"); - self.peers.remove(&peer_id.public_key); - Self::remove_online_peer(&self.online_peers_sender, &peer_id); + if let Some(peer_id) = peer_id { + if let Some(peer) = self.peers.get(&peer_id.public_key) { + if peer.conn_id == conn_id { + iroha_logger::debug!(conn_id, peer=%peer_id, "Peer terminated"); + self.peers.remove(&peer_id.public_key); + Self::remove_online_peer(&self.online_peers_sender, &peer_id); + } } } } @@ -417,7 +421,7 @@ impl NetworkBase { Self::remove_online_peer(&self.online_peers_sender, &peer_id); } } - None if peer_id.public_key == self.public_key => { + None if &peer_id.public_key == self.key_pair.public_key() => { #[cfg(debug_assertions)] iroha_logger::trace!("Not sending message to myself") } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 22d78171984..32e4efd7a6a 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -25,19 +25,23 @@ pub const DEFAULT_AAD: &[u8; 10] = b"Iroha2 AAD"; pub mod handles { //! Module with functions to start peer actor and handle to interact with it. + use iroha_crypto::KeyPair; use iroha_logger::Instrument; + use iroha_primitives::addr::SocketAddr; use super::{run::RunPeerArgs, *}; use crate::unbounded_with_len; /// Start Peer in [`state::Connecting`] state pub fn connecting( - peer_id: PeerId, + peer_addr: SocketAddr, + key_pair: KeyPair, connection_id: ConnectionId, service_message_sender: mpsc::Sender>, ) { let peer = state::Connecting { - peer_id, + peer_addr, + key_pair, connection_id, }; let peer = RunPeerArgs { @@ -49,12 +53,14 @@ pub mod handles { /// Start Peer in [`state::ConnectedFrom`] state pub fn connected_from( - peer_id: PeerId, + peer_addr: SocketAddr, + key_pair: KeyPair, connection: Connection, service_message_sender: mpsc::Sender>, ) { let peer = state::ConnectedFrom { - peer_id, + peer_addr, + key_pair, connection, }; let peer = RunPeerArgs { @@ -105,7 +111,7 @@ mod run { }: RunPeerArgs, ) { let conn_id = peer.connection_id(); - let mut peer_id = peer.peer_id().clone(); + let mut peer_id = None; iroha_logger::trace!("Peer created"); @@ -130,7 +136,7 @@ mod run { }, cryptographer, } = peer; - peer_id = new_peer_id; + let peer_id = peer_id.insert(new_peer_id); let disambiguator = blake2b_hash(&cryptographer.shared_key); @@ -228,28 +234,18 @@ mod run { /// Trait for peer stages that might be used as starting point for peer's [`run`] function. pub(super) trait Entrypoint: Handshake + Send + 'static { fn connection_id(&self) -> ConnectionId; - - fn peer_id(&self) -> &PeerId; } impl Entrypoint for Connecting { fn connection_id(&self) -> ConnectionId { self.connection_id } - - fn peer_id(&self) -> &PeerId { - &self.peer_id - } } impl Entrypoint for ConnectedFrom { fn connection_id(&self) -> ConnectionId { self.connection.id } - - fn peer_id(&self) -> &PeerId { - &self.peer_id - } } /// Cancellation-safe way to read messages from tcp stream @@ -374,28 +370,32 @@ mod run { mod state { //! Module for peer stages. - use iroha_crypto::ursa::keys::PublicKey; + use iroha_crypto::{ursa::keys::PublicKey, KeyPair, Signature}; + use iroha_primitives::addr::SocketAddr; use super::{cryptographer::Cryptographer, *}; /// Peer that is connecting. This is the initial stage of a new /// outgoing peer. pub(super) struct Connecting { - pub peer_id: PeerId, + pub peer_addr: SocketAddr, + pub key_pair: KeyPair, pub connection_id: ConnectionId, } impl Connecting { pub(super) async fn connect_to( Self { - peer_id, + peer_addr, + key_pair, connection_id, }: Self, ) -> Result { - let stream = TcpStream::connect(peer_id.address.to_string()).await?; + let stream = TcpStream::connect(peer_addr.to_string()).await?; let connection = Connection::new(connection_id, stream); Ok(ConnectedTo { - peer_id, + peer_addr, + key_pair, connection, }) } @@ -403,36 +403,40 @@ mod state { /// Peer that is being connected to. pub(super) struct ConnectedTo { - peer_id: PeerId, + peer_addr: SocketAddr, + key_pair: KeyPair, connection: Connection, } impl ConnectedTo { pub(super) async fn send_client_hello( Self { - peer_id, + peer_addr, + key_pair, mut connection, }: Self, ) -> Result, crate::Error> { let key_exchange = K::new(); - let (local_public_key, local_private_key) = key_exchange.keypair(None)?; + let (kx_local_pk, kx_local_sk) = key_exchange.keypair(None)?; let write_half = &mut connection.write; garbage::write(write_half).await?; - write_half.write_all(local_public_key.as_ref()).await?; + write_half.write_all(kx_local_pk.as_ref()).await?; // Read server hello with node's public key let read_half = &mut connection.read; - let remote_public_key = { + let kx_remote_pk = { garbage::read(read_half).await?; // Then we have servers public key let mut key = vec![0_u8; 32]; let _ = read_half.read_exact(&mut key).await?; PublicKey(key) }; - let shared_key = - key_exchange.compute_shared_secret(&local_private_key, &remote_public_key)?; + let shared_key = key_exchange.compute_shared_secret(&kx_local_sk, &kx_remote_pk)?; let cryptographer = Cryptographer::new(shared_key); Ok(SendKey { - peer_id, + peer_addr, + key_pair, + kx_local_pk, + kx_remote_pk, connection, cryptographer, }) @@ -441,22 +445,24 @@ mod state { /// Peer that is being connected from pub(super) struct ConnectedFrom { - pub peer_id: PeerId, + pub peer_addr: SocketAddr, + pub key_pair: KeyPair, pub connection: Connection, } impl ConnectedFrom { pub(super) async fn read_client_hello( Self { - peer_id, + peer_addr, + key_pair, mut connection, .. }: Self, ) -> Result, crate::Error> { let key_exchange = K::new(); - let (local_public_key, local_private_key) = key_exchange.keypair(None)?; + let (kx_local_pk, kx_local_sk) = key_exchange.keypair(None)?; let read_half = &mut connection.read; - let remote_public_key = { + let kx_remote_pk = { garbage::read(read_half).await?; // And then we have clients public key let mut key = vec![0_u8; 32]; @@ -465,12 +471,14 @@ mod state { }; let write_half = &mut connection.write; garbage::write(write_half).await?; - write_half.write_all(local_public_key.as_ref()).await?; - let shared_key = - key_exchange.compute_shared_secret(&local_private_key, &remote_public_key)?; + write_half.write_all(kx_local_pk.as_ref()).await?; + let shared_key = key_exchange.compute_shared_secret(&kx_local_sk, &kx_remote_pk)?; let cryptographer = Cryptographer::new(shared_key); Ok(SendKey { - peer_id, + peer_addr, + key_pair, + kx_local_pk, + kx_remote_pk, connection, cryptographer, }) @@ -479,7 +487,10 @@ mod state { /// Peer that needs to send key. pub(super) struct SendKey { - peer_id: PeerId, + peer_addr: SocketAddr, + key_pair: KeyPair, + kx_local_pk: PublicKey, + kx_remote_pk: PublicKey, connection: Connection, cryptographer: Cryptographer, } @@ -487,16 +498,19 @@ mod state { impl SendKey { pub(super) async fn send_our_public_key( Self { - peer_id, + peer_addr, + key_pair, + kx_local_pk, + kx_remote_pk, mut connection, cryptographer, }: Self, ) -> Result, crate::Error> { let write_half = &mut connection.write; - // We take our public key from our `id` and will replace it with theirs when we read it - // Packing length and message in one network packet for efficiency - let data = peer_id.public_key().encode(); + let payload = create_payload(&kx_local_pk, &kx_remote_pk); + let signature = Signature::new(key_pair, &payload)?; + let data = signature.encode(); let data = &cryptographer.encrypt(data.as_slice())?; @@ -507,8 +521,10 @@ mod state { write_half.write_all(&buf).await?; Ok(GetKey { - peer_id, + peer_addr, connection, + kx_local_pk, + kx_remote_pk, cryptographer, }) } @@ -516,8 +532,10 @@ mod state { /// Peer that needs to get key. pub struct GetKey { - peer_id: PeerId, + peer_addr: SocketAddr, connection: Connection, + kx_local_pk: PublicKey, + kx_remote_pk: PublicKey, cryptographer: Cryptographer, } @@ -525,8 +543,10 @@ mod state { /// Read the peer's public key pub(super) async fn read_their_public_key( Self { - mut peer_id, + peer_addr, mut connection, + kx_local_pk, + kx_remote_pk, cryptographer, }: Self, ) -> Result, crate::Error> { @@ -538,9 +558,19 @@ mod state { let data = cryptographer.decrypt(data.as_slice())?; - let pub_key = DecodeAll::decode_all(&mut data.as_slice())?; + let signature: Signature = DecodeAll::decode_all(&mut data.as_slice())?; + + // Swap order of keys since we are verifying for other peer order remote/local keys is reversed + let payload = create_payload(&kx_remote_pk, &kx_local_pk); + signature.verify(&payload)?; + + let (remote_pub_key, _) = signature.into(); + + let peer_id = PeerId { + address: peer_addr, + public_key: remote_pub_key, + }; - peer_id.public_key = pub_key; Ok(Ready { peer_id, connection, @@ -556,6 +586,14 @@ mod state { pub connection: Connection, pub cryptographer: Cryptographer, } + + fn create_payload(kx_local_pk: &PublicKey, kx_remote_pk: &PublicKey) -> Vec { + let mut payload = + Vec::with_capacity(kx_local_pk.as_ref().len() + kx_remote_pk.as_ref().len()); + payload.extend(kx_local_pk.as_ref()); + payload.extend(kx_remote_pk.as_ref()); + payload + } } mod handshake { @@ -660,7 +698,7 @@ pub mod message { /// Peer faced error or `Terminate` message, send to indicate that it is terminated pub struct Terminated { /// Peer Id - pub peer_id: PeerId, + pub peer_id: Option, /// Connection Id pub conn_id: ConnectionId, } diff --git a/p2p/tests/integration/p2p.rs b/p2p/tests/integration/p2p.rs index fc9a958401e..4cacc3187dc 100644 --- a/p2p/tests/integration/p2p.rs +++ b/p2p/tests/integration/p2p.rs @@ -1,7 +1,6 @@ use std::{ collections::HashSet, fmt::Debug, - str::FromStr, sync::{ atomic::{AtomicU32, Ordering}, Arc, Once, @@ -49,11 +48,9 @@ async fn network_create() { setup_logger(); info!("Starting network tests..."); let address = socket_addr!(127.0.0.1:12_000); - let public_key = iroha_crypto::PublicKey::from_str( - "ed01207233BFC89DCBD68C19FDE6CE6158225298EC1131B6A130D1AEB454C1AB5183C0", - ) - .unwrap(); - let network = NetworkHandle::start(address.clone(), public_key.clone()) + let key_pair = KeyPair::generate().unwrap(); + let public_key = key_pair.public_key().clone(); + let network = NetworkHandle::start(address.clone(), key_pair) .await .unwrap(); tokio::time::sleep(delay).await; @@ -158,23 +155,19 @@ impl TestActor { async fn two_networks() { let delay = Duration::from_millis(300); setup_logger(); - let public_key1 = iroha_crypto::PublicKey::from_str( - "ed01207233BFC89DCBD68C19FDE6CE6158225298EC1131B6A130D1AEB454C1AB5183C0", - ) - .unwrap(); - let public_key2 = iroha_crypto::PublicKey::from_str( - "ed01207233BFC89DCBD68C19FDE6CE6158225298EC1131B6A130D1AEB454C1AB5183C1", - ) - .unwrap(); + let key_pair1 = KeyPair::generate().unwrap(); + let public_key1 = key_pair1.public_key().clone(); + let key_pair2 = KeyPair::generate().unwrap().clone(); + let public_key2 = key_pair2.public_key().clone(); info!("Starting first network..."); let address1 = socket_addr!(127.0.0.1:12_005); - let mut network1 = NetworkHandle::start(address1.clone(), public_key1.clone()) + let mut network1 = NetworkHandle::start(address1.clone(), key_pair1) .await .unwrap(); info!("Starting second network..."); let address2 = socket_addr!(127.0.0.1:12_010); - let network2 = NetworkHandle::start(address2.clone(), public_key2.clone()) + let network2 = NetworkHandle::start(address2.clone(), key_pair2) .await .unwrap(); @@ -251,13 +244,16 @@ async fn multiple_networks() { info!("Starting..."); let mut peers = Vec::new(); + let mut key_pairs = Vec::new(); for i in 0_u16..10_u16 { let address = socket_addr!(127.0.0.1: 12_015 + ( i * 5)); - let keypair = KeyPair::generate().unwrap(); + let key_pair = KeyPair::generate().unwrap(); + let public_key = key_pair.public_key().clone(); peers.push(PeerId { address, - public_key: keypair.public_key().clone(), + public_key, }); + key_pairs.push(key_pair); } let mut networks = Vec::new(); @@ -269,9 +265,11 @@ async fn multiple_networks() { let barrier = Arc::new(Barrier::new(peers.len())); peers .iter() - .map(|peer| { + .zip(key_pairs) + .map(|(peer, key_pair)| { start_network( peer.clone(), + key_pair, peers.clone(), msgs.clone(), Arc::clone(&barrier), @@ -312,6 +310,7 @@ async fn multiple_networks() { async fn start_network( peer: PeerId, + key_pair: KeyPair, peers: Vec, messages: WaitForN, barrier: Arc, @@ -321,11 +320,8 @@ async fn start_network( // This actor will get the messages from other peers and increment the counter let actor = TestActor::start(messages); - let PeerId { - address, - public_key, - } = peer.clone(); - let mut network = NetworkHandle::start(address, public_key).await.unwrap(); + let PeerId { address, .. } = peer.clone(); + let mut network = NetworkHandle::start(address, key_pair).await.unwrap(); network.subscribe_to_peers_messages(actor); let _ = barrier.wait().await;