diff --git a/Cargo.lock b/Cargo.lock index aeae6592d1..074ada5c25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2069,6 +2069,7 @@ dependencies = [ "sc-utils", "sp-api", "sp-blockchain", + "sp-consensus", "sp-core", "sp-domains", "sp-messenger", diff --git a/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs b/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs index 5489b3c062..f3b846b280 100644 --- a/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs +++ b/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs @@ -316,6 +316,7 @@ fn main() -> Result<(), Error> { let consensus_network_service = consensus_chain_node.network_service.clone(); let consensus_task_spawn_essential_handler = consensus_chain_node.task_manager.spawn_essential_handle(); + let consensus_sync_service = consensus_chain_node.sync_service.clone(); consensus_chain_node .task_manager .spawn_essential_handle() @@ -352,6 +353,7 @@ fn main() -> Result<(), Error> { _, DomainBlock, _, + _, >( ChainId::Consensus, consensus_chain_node.client.clone(), @@ -359,7 +361,8 @@ fn main() -> Result<(), Error> { consensus_chain_node.transaction_pool.clone(), consensus_network_service, consensus_msg_receiver, - domain_code_executor + domain_code_executor, + consensus_sync_service, ); consensus_task_spawn_essential_handler diff --git a/crates/subspace-node/src/commands/run.rs b/crates/subspace-node/src/commands/run.rs index 444ff9cc65..3033bb93c3 100644 --- a/crates/subspace-node/src/commands/run.rs +++ b/crates/subspace-node/src/commands/run.rs @@ -234,6 +234,7 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> { _, DomainBlock, _, + _, >( ChainId::Consensus, consensus_chain_node.client.clone(), @@ -242,6 +243,7 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> { consensus_chain_node.network_service.clone(), consensus_msg_receiver, domain_code_executor.into(), + consensus_chain_node.sync_service.clone(), ), ), ); diff --git a/domains/client/cross-domain-message-gossip/Cargo.toml b/domains/client/cross-domain-message-gossip/Cargo.toml index 6f9864bb7b..ab63d5c538 100644 --- a/domains/client/cross-domain-message-gossip/Cargo.toml +++ b/domains/client/cross-domain-message-gossip/Cargo.toml @@ -25,6 +25,7 @@ sc-transaction-pool-api = { git = "https://github.com/subspace/polkadot-sdk", re sc-utils = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" } sp-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" } sp-blockchain = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" } +sp-consensus = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" } sp-core = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" } sp-domains = { version = "0.1.0", path = "../../../crates/sp-domains" } sp-messenger = { version = "0.1.0", default-features = false, path = "../../primitives/messenger" } diff --git a/domains/client/cross-domain-message-gossip/src/gossip_worker.rs b/domains/client/cross-domain-message-gossip/src/gossip_worker.rs index c365c34fe5..98247ffba9 100644 --- a/domains/client/cross-domain-message-gossip/src/gossip_worker.rs +++ b/domains/client/cross-domain-message-gossip/src/gossip_worker.rs @@ -9,6 +9,7 @@ use sc_network_gossip::{ }; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_api::StorageProof; +use sp_consensus::SyncOracle; use sp_core::twox_256; use sp_messenger::messages::{ChainId, ChannelId}; use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT}; @@ -101,11 +102,11 @@ impl GossipWorkerBuilder { network: Network, notification_service: Box, sync: Arc, - ) -> GossipWorker + ) -> GossipWorker where Block: BlockT, Network: sc_network_gossip::Network + Send + Sync + Clone + 'static, - GossipSync: GossipSyncing + 'static, + GossipSync: GossipSyncing + SyncOracle + Send + 'static, { let Self { gossip_msg_stream, @@ -116,7 +117,7 @@ impl GossipWorkerBuilder { let gossip_validator = Arc::new(GossipValidator::new(network.clone())); let gossip_engine = Arc::new(Mutex::new(GossipEngine::new( network, - sync, + sync.clone(), notification_service, PROTOCOL_NAME, gossip_validator.clone(), @@ -128,17 +129,19 @@ impl GossipWorkerBuilder { gossip_validator, gossip_msg_stream, chain_sinks, + sync_oracle: sync, } } } /// Gossip worker to gossip incoming and outgoing messages to other peers. /// Also, streams the decoded extrinsics to destination chain tx pool if available. -pub struct GossipWorker { +pub struct GossipWorker { gossip_engine: Arc>>, gossip_validator: Arc>, gossip_msg_stream: TracingUnboundedReceiver, chain_sinks: BTreeMap, + sync_oracle: Arc, } /// Returns the network configuration for cross chain message gossip. @@ -159,7 +162,7 @@ fn topic() -> Block::Hash { <::Hashing as HashT>::hash(b"cross-chain-messages") } -impl GossipWorker { +impl GossipWorker { /// Starts the Gossip message worker. pub async fn run(mut self) { let incoming_cross_chain_messages = pin!(self @@ -206,6 +209,11 @@ impl GossipWorker { .lock() .gossip_message(topic::(), encoded_msg, false); + // Skip sending the message since the node unable to verify the message before synced + if self.sync_oracle.is_major_syncing() { + return; + } + let Message { chain_id, data } = msg; let sink = match self.chain_sinks.get(&chain_id) { Some(sink) => sink, diff --git a/domains/client/cross-domain-message-gossip/src/message_listener.rs b/domains/client/cross-domain-message-gossip/src/message_listener.rs index 224fefe8bd..db5047366f 100644 --- a/domains/client/cross-domain-message-gossip/src/message_listener.rs +++ b/domains/client/cross-domain-message-gossip/src/message_listener.rs @@ -10,6 +10,7 @@ use sc_network::NetworkPeers; use sc_transaction_pool_api::{TransactionPool, TransactionSource}; use sp_api::{ApiError, ApiExt, ProvideRuntimeApi, StorageProof}; use sp_blockchain::HeaderBackend; +use sp_consensus::SyncOracle; use sp_core::crypto::AccountId32; use sp_core::storage::StorageKey; use sp_core::traits::CodeExecutor; @@ -80,6 +81,7 @@ impl From for Error { } } +#[allow(clippy::too_many_arguments)] pub async fn start_cross_chain_message_listener< Client, TxPool, @@ -88,6 +90,7 @@ pub async fn start_cross_chain_message_listener< CBlock, Block, Executor, + SO, >( chain_id: ChainId, consensus_client: Arc, @@ -96,6 +99,7 @@ pub async fn start_cross_chain_message_listener< network: Arc, mut listener: TxnListener, domain_executor: Arc, + sync_oracle: SO, ) where TxPool: TransactionPool + 'static, Client: ProvideRuntimeApi> + HeaderBackend>, @@ -106,6 +110,7 @@ pub async fn start_cross_chain_message_listener< + RelayerApi, NumberFor, CBlock::Hash>, TxnListener: Stream + Unpin, Executor: CodeExecutor + RuntimeVersionOf, + SO: SyncOracle + Send, { tracing::info!( target: LOG_TARGET, @@ -116,6 +121,11 @@ pub async fn start_cross_chain_message_listener< let mut domain_storage_key_cache = BTreeMap::<(H256, ChainId, ChannelId), StorageKey>::new(); while let Some(msg) = listener.next().await { + // If the client is in major sync, wait until sync is complete + if sync_oracle.is_major_syncing() { + continue; + } + tracing::debug!( target: LOG_TARGET, "Message received for Chain: {:?}", diff --git a/domains/service/src/domain.rs b/domains/service/src/domain.rs index 12f3843080..b5ee2cdcf2 100644 --- a/domains/service/src/domain.rs +++ b/domains/service/src/domain.rs @@ -500,7 +500,7 @@ where client.clone(), // domain will use consensus chain sync oracle instead of domain sync oracle // since domain sync oracle will always return `synced` due to force sync being set. - consensus_network_sync_oracle, + consensus_network_sync_oracle.clone(), gossip_message_sink, ); @@ -512,16 +512,25 @@ where } // Start cross domain message listener for domain - let domain_listener = - cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, Block, _>( - ChainId::Domain(domain_id), - consensus_client.clone(), - client.clone(), - params.transaction_pool.clone(), - consensus_network, - domain_message_receiver, - code_executor.clone(), - ); + let domain_listener = cross_domain_message_gossip::start_cross_chain_message_listener::< + _, + _, + _, + _, + _, + Block, + _, + _, + >( + ChainId::Domain(domain_id), + consensus_client.clone(), + client.clone(), + params.transaction_pool.clone(), + consensus_network, + domain_message_receiver, + code_executor.clone(), + consensus_network_sync_oracle, + ); spawn_essential.spawn_essential_blocking( "domain-message-listener", diff --git a/test/subspace-test-service/src/lib.rs b/test/subspace-test-service/src/lib.rs index b0a82c7890..ff6396a48e 100644 --- a/test/subspace-test-service/src/lib.rs +++ b/test/subspace-test-service/src/lib.rs @@ -524,6 +524,7 @@ impl MockConsensusNode { _, DomainBlock, _, + _, >( ChainId::Consensus, client.clone(), @@ -532,6 +533,7 @@ impl MockConsensusNode { network_service.clone(), consensus_msg_receiver, domain_executor, + sync_service.clone(), ); task_manager