Skip to content

Commit

Permalink
Skip adding XDM to the tx pool during major sync
Browse files Browse the repository at this point in the history
Signed-off-by: linning <[email protected]>
  • Loading branch information
NingLin-P committed Aug 30, 2024
1 parent b8bae12 commit 69f11c3
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 17 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ fn main() -> Result<(), Error> {

let consensus_network_service = consensus_chain_node.network_service.clone();
let consensus_task_spawn_essential_handler = consensus_chain_node.task_manager.spawn_essential_handle();
let consensus_sync_service = consensus_chain_node.sync_service.clone();
consensus_chain_node
.task_manager
.spawn_essential_handle()
Expand Down Expand Up @@ -352,14 +353,16 @@ fn main() -> Result<(), Error> {
_,
DomainBlock,
_,
_,
>(
ChainId::Consensus,
consensus_chain_node.client.clone(),
consensus_chain_node.client.clone(),
consensus_chain_node.transaction_pool.clone(),
consensus_network_service,
consensus_msg_receiver,
domain_code_executor
domain_code_executor,
consensus_sync_service,
);

consensus_task_spawn_essential_handler
Expand Down
2 changes: 2 additions & 0 deletions crates/subspace-node/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
_,
DomainBlock,
_,
_,
>(
ChainId::Consensus,
consensus_chain_node.client.clone(),
Expand All @@ -242,6 +243,7 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
consensus_chain_node.network_service.clone(),
consensus_msg_receiver,
domain_code_executor.into(),
consensus_chain_node.sync_service.clone(),
),
),
);
Expand Down
1 change: 1 addition & 0 deletions domains/client/cross-domain-message-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ sc-transaction-pool-api = { git = "https://github.com/subspace/polkadot-sdk", re
sc-utils = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
sp-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
sp-blockchain = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
sp-consensus = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
sp-core = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
sp-domains = { version = "0.1.0", path = "../../../crates/sp-domains" }
sp-messenger = { version = "0.1.0", default-features = false, path = "../../primitives/messenger" }
Expand Down
18 changes: 13 additions & 5 deletions domains/client/cross-domain-message-gossip/src/gossip_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use sc_network_gossip::{
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_api::StorageProof;
use sp_consensus::SyncOracle;
use sp_core::twox_256;
use sp_messenger::messages::{ChainId, ChannelId};
use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT};
Expand Down Expand Up @@ -101,11 +102,11 @@ impl GossipWorkerBuilder {
network: Network,
notification_service: Box<dyn NotificationService>,
sync: Arc<GossipSync>,
) -> GossipWorker<Block, Network>
) -> GossipWorker<Block, Network, GossipSync>
where
Block: BlockT,
Network: sc_network_gossip::Network<Block> + Send + Sync + Clone + 'static,
GossipSync: GossipSyncing<Block> + 'static,
GossipSync: GossipSyncing<Block> + SyncOracle + Send + 'static,
{
let Self {
gossip_msg_stream,
Expand All @@ -116,7 +117,7 @@ impl GossipWorkerBuilder {
let gossip_validator = Arc::new(GossipValidator::new(network.clone()));
let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
network,
sync,
sync.clone(),
notification_service,
PROTOCOL_NAME,
gossip_validator.clone(),
Expand All @@ -128,17 +129,19 @@ impl GossipWorkerBuilder {
gossip_validator,
gossip_msg_stream,
chain_sinks,
sync_oracle: sync,
}
}
}

/// Gossip worker to gossip incoming and outgoing messages to other peers.
/// Also, streams the decoded extrinsics to destination chain tx pool if available.
pub struct GossipWorker<Block: BlockT, Network> {
pub struct GossipWorker<Block: BlockT, Network, SO> {
gossip_engine: Arc<Mutex<GossipEngine<Block>>>,
gossip_validator: Arc<GossipValidator<Network>>,
gossip_msg_stream: TracingUnboundedReceiver<Message>,
chain_sinks: BTreeMap<ChainId, ChainSink>,
sync_oracle: Arc<SO>,
}

/// Returns the network configuration for cross chain message gossip.
Expand All @@ -159,7 +162,7 @@ fn topic<Block: BlockT>() -> Block::Hash {
<<Block::Header as HeaderT>::Hashing as HashT>::hash(b"cross-chain-messages")
}

impl<Block: BlockT, Network> GossipWorker<Block, Network> {
impl<Block: BlockT, Network, SO: SyncOracle> GossipWorker<Block, Network, SO> {
/// Starts the Gossip message worker.
pub async fn run(mut self) {
let incoming_cross_chain_messages = pin!(self
Expand Down Expand Up @@ -206,6 +209,11 @@ impl<Block: BlockT, Network> GossipWorker<Block, Network> {
.lock()
.gossip_message(topic::<Block>(), encoded_msg, false);

// Skip sending the message since the node unable to verify the message before synced
if self.sync_oracle.is_major_syncing() {
return;
}

let Message { chain_id, data } = msg;
let sink = match self.chain_sinks.get(&chain_id) {
Some(sink) => sink,
Expand Down
10 changes: 10 additions & 0 deletions domains/client/cross-domain-message-gossip/src/message_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use sc_network::NetworkPeers;
use sc_transaction_pool_api::{TransactionPool, TransactionSource};
use sp_api::{ApiError, ApiExt, ProvideRuntimeApi, StorageProof};
use sp_blockchain::HeaderBackend;
use sp_consensus::SyncOracle;
use sp_core::crypto::AccountId32;
use sp_core::storage::StorageKey;
use sp_core::traits::CodeExecutor;
Expand Down Expand Up @@ -80,6 +81,7 @@ impl From<VerificationError> for Error {
}
}

#[allow(clippy::too_many_arguments)]
pub async fn start_cross_chain_message_listener<
Client,
TxPool,
Expand All @@ -88,6 +90,7 @@ pub async fn start_cross_chain_message_listener<
CBlock,
Block,
Executor,
SO,
>(
chain_id: ChainId,
consensus_client: Arc<CClient>,
Expand All @@ -96,6 +99,7 @@ pub async fn start_cross_chain_message_listener<
network: Arc<dyn NetworkPeers + Send + Sync>,
mut listener: TxnListener,
domain_executor: Arc<Executor>,
sync_oracle: SO,
) where
TxPool: TransactionPool + 'static,
Client: ProvideRuntimeApi<BlockOf<TxPool>> + HeaderBackend<BlockOf<TxPool>>,
Expand All @@ -106,6 +110,7 @@ pub async fn start_cross_chain_message_listener<
+ RelayerApi<CBlock, NumberFor<CBlock>, NumberFor<CBlock>, CBlock::Hash>,
TxnListener: Stream<Item = ChainMsg> + Unpin,
Executor: CodeExecutor + RuntimeVersionOf,
SO: SyncOracle + Send,
{
tracing::info!(
target: LOG_TARGET,
Expand All @@ -116,6 +121,11 @@ pub async fn start_cross_chain_message_listener<
let mut domain_storage_key_cache = BTreeMap::<(H256, ChainId, ChannelId), StorageKey>::new();

while let Some(msg) = listener.next().await {
// If the client is in major sync, wait until sync is complete
if sync_oracle.is_major_syncing() {
continue;
}

tracing::debug!(
target: LOG_TARGET,
"Message received for Chain: {:?}",
Expand Down
31 changes: 20 additions & 11 deletions domains/service/src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ where
client.clone(),
// domain will use consensus chain sync oracle instead of domain sync oracle
// since domain sync oracle will always return `synced` due to force sync being set.
consensus_network_sync_oracle,
consensus_network_sync_oracle.clone(),
gossip_message_sink,
);

Expand All @@ -512,16 +512,25 @@ where
}

// Start cross domain message listener for domain
let domain_listener =
cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, Block, _>(
ChainId::Domain(domain_id),
consensus_client.clone(),
client.clone(),
params.transaction_pool.clone(),
consensus_network,
domain_message_receiver,
code_executor.clone(),
);
let domain_listener = cross_domain_message_gossip::start_cross_chain_message_listener::<
_,
_,
_,
_,
_,
Block,
_,
_,
>(
ChainId::Domain(domain_id),
consensus_client.clone(),
client.clone(),
params.transaction_pool.clone(),
consensus_network,
domain_message_receiver,
code_executor.clone(),
consensus_network_sync_oracle,
);

spawn_essential.spawn_essential_blocking(
"domain-message-listener",
Expand Down
2 changes: 2 additions & 0 deletions test/subspace-test-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ impl MockConsensusNode {
_,
DomainBlock,
_,
_,
>(
ChainId::Consensus,
client.clone(),
Expand All @@ -532,6 +533,7 @@ impl MockConsensusNode {
network_service.clone(),
consensus_msg_receiver,
domain_executor,
sync_service.clone(),
);

task_manager
Expand Down

0 comments on commit 69f11c3

Please sign in to comment.