diff --git a/mm2src/mm2_core/src/mm_ctx.rs b/mm2src/mm2_core/src/mm_ctx.rs index 245b21af16..273882a656 100644 --- a/mm2src/mm2_core/src/mm_ctx.rs +++ b/mm2src/mm2_core/src/mm_ctx.rs @@ -60,7 +60,7 @@ pub struct HealthcheckConfig { pub blocking_ms_for_per_address: u64, /// Lifetime of the message. /// Do not change this unless you know what you are doing. - pub message_expiration: u64, + pub message_expiration_secs: u64, /// Maximum time (milliseconds) to wait for healthcheck response. pub timeout_secs: u64, } @@ -69,7 +69,7 @@ impl Default for HealthcheckConfig { fn default() -> Self { Self { blocking_ms_for_per_address: 750, - message_expiration: 10, + message_expiration_secs: 10, timeout_secs: 10, } } diff --git a/mm2src/mm2_main/src/lp_healthcheck.rs b/mm2src/mm2_main/src/lp_healthcheck.rs index 9c9b543c1a..2971ac17bc 100644 --- a/mm2src/mm2_main/src/lp_healthcheck.rs +++ b/mm2src/mm2_main/src/lp_healthcheck.rs @@ -5,12 +5,13 @@ use common::{log, HttpStatusCode, StatusCode}; use derive_more::Display; use futures::channel::oneshot::{self, Receiver, Sender}; use instant::Duration; -use mm2_core::mm_ctx::MmArc; +use mm2_core::mm_ctx::{HealthcheckConfig, MmArc}; use mm2_err_handle::prelude::MmError; use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, PeerId, TopicPrefix}; use mm2_net::p2p::P2PContext; use ser_error_derive::SerializeErrorType; use serde::{Deserialize, Serialize}; +use std::convert::TryFrom; use std::convert::TryInto; use std::num::TryFromIntError; use std::str::FromStr; @@ -52,14 +53,23 @@ impl HealthcheckMessage { Ok(Self { signature, data }) } - pub(crate) fn is_received_message_valid(&self, my_peer_id: PeerId) -> bool { + pub(crate) fn is_received_message_valid(&self, my_peer_id: PeerId, healthcheck_config: &HealthcheckConfig) -> bool { let now = Utc::now().timestamp(); - if now > self.data.expires_at { + let remaining_expiration_seconds = u64::try_from(self.data.expires_at - now).unwrap_or(0); + + if remaining_expiration_seconds == 0 { log::debug!( "Healthcheck message is expired. Current time in UTC: {now}, healthcheck `expires_at` in UTC: {}", self.data.expires_at ); return false; + } else if remaining_expiration_seconds > healthcheck_config.message_expiration_secs { + log::debug!( + "Healthcheck message have too high expiration time.\nMax allowed expiration seconds: {}\nReceived message expiration seconds: {}", + self.data.expires_at, + remaining_expiration_seconds, + ); + return false; } if self.data.target_peer != my_peer_id { @@ -264,7 +274,7 @@ pub async fn peer_connection_healthcheck_rpc( false, ctx.health_checker .config - .message_expiration + .message_expiration_secs .try_into() .map_err(|e: TryFromIntError| HealthcheckRpcError::Internal { reason: e.to_string() })?, ) @@ -329,7 +339,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li // so KDF can handle a high amount of healthcheck messages more efficiently. ctx.spawner().spawn(async move { let my_peer_id = P2PContext::fetch_from_mm_arc(&ctx_c).peer_id(); - if !data.is_received_message_valid(my_peer_id) { + if !data.is_received_message_valid(my_peer_id, &ctx_c.health_checker.config) { log::error!("Received an invalid healthcheck message."); log::debug!("Message context: {:?}", data); return; @@ -403,7 +413,7 @@ mod tests { let ctx = ctx(); let target_peer = create_test_peer_id(); let message = HealthcheckMessage::generate_message(&ctx, target_peer, false, 5).unwrap(); - assert!(message.is_received_message_valid(target_peer)); + assert!(message.is_received_message_valid(target_peer, &ctx.health_checker.config)); }); cross_test!(test_corrupted_messages, { @@ -412,29 +422,29 @@ mod tests { let mut message = HealthcheckMessage::generate_message(&ctx, target_peer, false, 5).unwrap(); message.data.expires_at += 1; - assert!(!message.is_received_message_valid(target_peer)); + assert!(!message.is_received_message_valid(target_peer, &ctx.health_checker.config)); let mut message = HealthcheckMessage::generate_message(&ctx, target_peer, false, 5).unwrap(); message.data.is_a_reply = !message.data.is_a_reply; - assert!(!message.is_received_message_valid(target_peer)); + assert!(!message.is_received_message_valid(target_peer, &ctx.health_checker.config)); let mut message = HealthcheckMessage::generate_message(&ctx, target_peer, false, 5).unwrap(); message.data.sender_peer = message.data.target_peer; - assert!(!message.is_received_message_valid(target_peer)); + assert!(!message.is_received_message_valid(target_peer, &ctx.health_checker.config)); let mut message = HealthcheckMessage::generate_message(&ctx, target_peer, false, 5).unwrap(); message.data.target_peer = message.data.sender_peer; - assert!(!message.is_received_message_valid(target_peer)); + assert!(!message.is_received_message_valid(target_peer, &ctx.health_checker.config)); let message = HealthcheckMessage::generate_message(&ctx, target_peer, false, 5).unwrap(); - assert!(!message.is_received_message_valid(message.data.sender_peer)); + assert!(!message.is_received_message_valid(message.data.sender_peer, &ctx.health_checker.config)); }); cross_test!(test_expired_message, { let ctx = ctx(); let target_peer = create_test_peer_id(); let message = HealthcheckMessage::generate_message(&ctx, target_peer, false, -1).unwrap(); - assert!(!message.is_received_message_valid(target_peer)); + assert!(!message.is_received_message_valid(target_peer, &ctx.health_checker.config)); }); cross_test!(test_encode_decode, {