From f4cc0495e941727127acf5a6000d88d40b519a33 Mon Sep 17 00:00:00 2001 From: D-Stacks <78099568+D-Stacks@users.noreply.github.com> Date: Sun, 17 Nov 2024 23:19:52 +0100 Subject: [PATCH] address reveiw points from m. sutton. --- consensus/src/consensus/mod.rs | 16 +---- .../body_validation_in_context.rs | 30 ++++++--- .../src/pipeline/body_processor/processor.rs | 63 ++++++++++--------- .../pipeline/virtual_processor/processor.rs | 26 ++++---- 4 files changed, 70 insertions(+), 65 deletions(-) diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index b3edd55ca..eca78ee2a 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -241,23 +241,13 @@ impl Consensus { body_receiver, virtual_sender, block_processors_pool, + params, db.clone(), - storage.statuses_store.clone(), - storage.ghostdag_store.clone(), - storage.headers_store.clone(), - storage.block_transactions_store.clone(), - storage.body_tips_store.clone(), - services.reachability_service.clone(), - services.coinbase_manager.clone(), - services.mass_calculator.clone(), - services.transaction_validator.clone(), - services.window_manager.clone(), - params.max_block_mass, - params.genesis.clone(), + &storage, + &services, pruning_lock.clone(), notification_root.clone(), counters.clone(), - params.storage_mass_activation, )); let virtual_processor = Arc::new(VirtualStateProcessor::new( diff --git a/consensus/src/pipeline/body_processor/body_validation_in_context.rs b/consensus/src/pipeline/body_processor/body_validation_in_context.rs index cdf6dc28c..4b8505b5d 100644 --- a/consensus/src/pipeline/body_processor/body_validation_in_context.rs +++ b/consensus/src/pipeline/body_processor/body_validation_in_context.rs @@ -9,7 +9,7 @@ use kaspa_database::prelude::StoreResultExtensions; use kaspa_hashes::Hash; use kaspa_utils::option::OptionExtensions; use once_cell::unsync::Lazy; -use std::sync::Arc; +use std::{ops::Deref, sync::Arc}; impl BlockBodyProcessor { pub fn validate_body_in_context(self: &Arc, block: &Block) -> BlockProcessResult<()> { @@ -19,17 +19,29 @@ impl BlockBodyProcessor { } fn check_block_transactions_in_context(self: &Arc, block: &Block) -> BlockProcessResult<()> { - // TODO: this is somewhat expensive during ibd, as it incurs cache misses. - let pmt_res = - Lazy::new(|| match self.window_manager.calc_past_median_time(&self.ghostdag_store.get_data(block.hash()).unwrap()) { - Ok((pmt, _)) => Ok(pmt), - Err(e) => Err(e), - }); + // Note: This is somewhat expensive during ibd, as it incurs cache misses. + + // Use lazy evaluation to avoid unnecessary work, as most of the time we expect the txs not to have lock time. + let lazy_ghostdag_data = Lazy::new(|| self.ghostdag_store.get_data(block.hash()).unwrap()); + let lazy_pmt_res = Lazy::new(|| match self.window_manager.calc_past_median_time(lazy_ghostdag_data.deref()) { + Ok((pmt, _)) => Ok(pmt), + Err(e) => Err(e), + }); for tx in block.transactions.iter() { - // quick check to avoid the expensive Lazy eval during ibd (in most cases). + // Quick check to avoid the expensive Lazy eval during ibd (in most cases). + // TODO: refactor this and avoid classifying the tx lock outside of the transaction validator. if tx.lock_time != 0 { - if let Err(e) = self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, (*pmt_res).clone()?) { + // Extract the past median time from the Lazy. + let pmt = (*lazy_pmt_res).clone()?; + + // Commit the past median time to the cache, if not already there. + if !self.block_window_cache_for_past_median_time.contains_key(&block.hash()) { + self.block_window_cache_for_past_median_time + .insert(block.hash(), self.window_manager.calc_past_median_time(lazy_ghostdag_data.deref()).unwrap().1); + }; + + if let Err(e) = self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, pmt) { return Err(RuleError::TxInContextFailed(tx.id(), e)); }; }; diff --git a/consensus/src/pipeline/body_processor/processor.rs b/consensus/src/pipeline/body_processor/processor.rs index 6885c78b5..4f2375562 100644 --- a/consensus/src/pipeline/body_processor/processor.rs +++ b/consensus/src/pipeline/body_processor/processor.rs @@ -1,10 +1,14 @@ use crate::{ - consensus::services::DbWindowManager, + consensus::{ + services::{ConsensusServices, DbWindowManager}, + storage::ConsensusStorage, + }, errors::{BlockProcessResult, RuleError}, model::{ services::reachability::MTReachabilityService, stores::{ block_transactions::DbBlockTransactionsStore, + block_window_cache::BlockWindowCacheStore, ghostdag::DbGhostdagStore, headers::DbHeadersStore, reachability::DbReachabilityStore, @@ -23,7 +27,10 @@ use crossbeam_channel::{Receiver, Sender}; use kaspa_consensus_core::{ block::Block, blockstatus::BlockStatus::{self, StatusHeaderOnly, StatusInvalid}, - config::{genesis::GenesisBlock, params::ForkActivation}, + config::{ + genesis::GenesisBlock, + params::{ForkActivation, Params}, + }, mass::MassCalculator, tx::Transaction, }; @@ -60,6 +67,8 @@ pub struct BlockBodyProcessor { pub(super) headers_store: Arc, pub(super) block_transactions_store: Arc, pub(super) body_tips_store: Arc>, + pub(super) block_window_cache_for_difficulty: Arc, + pub(super) block_window_cache_for_past_median_time: Arc, // Managers and services pub(super) reachability_service: MTReachabilityService, @@ -91,47 +100,43 @@ impl BlockBodyProcessor { sender: Sender, thread_pool: Arc, + params: &Params, db: Arc, - statuses_store: Arc>, - ghostdag_store: Arc, - headers_store: Arc, - block_transactions_store: Arc, - body_tips_store: Arc>, - - reachability_service: MTReachabilityService, - coinbase_manager: CoinbaseManager, - mass_calculator: MassCalculator, - transaction_validator: TransactionValidator, - window_manager: DbWindowManager, - max_block_mass: u64, - genesis: GenesisBlock, + storage: &Arc, + services: &Arc, + pruning_lock: SessionLock, notification_root: Arc, counters: Arc, - storage_mass_activation: ForkActivation, ) -> Self { Self { receiver, sender, thread_pool, db, - statuses_store, - reachability_service, - ghostdag_store, - headers_store, - block_transactions_store, - body_tips_store, - coinbase_manager, - mass_calculator, - transaction_validator, - window_manager, - max_block_mass, - genesis, + + max_block_mass: params.max_block_mass, + genesis: params.genesis.clone(), + + statuses_store: storage.statuses_store.clone(), + ghostdag_store: storage.ghostdag_store.clone(), + headers_store: storage.headers_store.clone(), + block_transactions_store: storage.block_transactions_store.clone(), + body_tips_store: storage.body_tips_store.clone(), + block_window_cache_for_difficulty: storage.block_window_cache_for_difficulty.clone(), + block_window_cache_for_past_median_time: storage.block_window_cache_for_past_median_time.clone(), + + reachability_service: services.reachability_service.clone(), + coinbase_manager: services.coinbase_manager.clone(), + mass_calculator: services.mass_calculator.clone(), + transaction_validator: services.transaction_validator.clone(), + window_manager: services.window_manager.clone(), + pruning_lock, task_manager: BlockTaskDependencyManager::new(), notification_root, counters, - storage_mass_activation, + storage_mass_activation: params.storage_mass_activation, } } diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index bfdeff3e2..c189cc777 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -299,6 +299,17 @@ impl VirtualStateProcessor { let sink_multiset = self.utxo_multisets_store.get(new_sink).unwrap(); let chain_path = self.dag_traversal_manager.calculate_chain_path(prev_sink, new_sink, None); + let compact_sink_ghostdag_data = if prev_sink != new_sink { + // We need to check with full data here, since we may need to update the window caches + let sink_ghostdag_data = self.ghostdag_store.get_data(new_sink).unwrap(); + // Update window caches - for ibd performance. see method comment for more details. + // This should also be called before `calculate_and_commit_virtual_state` to ascertain cache hits in the latter method. + self.commit_windows(new_sink, &sink_ghostdag_data); + CompactGhostdagData::from(sink_ghostdag_data.as_ref()) + } else { + self.ghostdag_store.get_compact_data(new_sink).unwrap() + }; + let new_virtual_state = self .calculate_and_commit_virtual_state( virtual_read, @@ -311,15 +322,6 @@ impl VirtualStateProcessor { .expect("all possible rule errors are unexpected here"); // Update the pruning processor about the virtual state change - let compact_sink_ghostdag_data = if prev_sink != new_sink { - // we need to check with full data here, since we may need to update the window caches - let sink_ghostdag_data = self.ghostdag_store.get_data(new_sink).unwrap(); - // update window caches - for ibd performance. see method comment for more details. - self.maybe_commit_windows(new_sink, &sink_ghostdag_data); - CompactGhostdagData::from(sink_ghostdag_data.as_ref()) - } else { - self.ghostdag_store.get_compact_data(new_sink).unwrap() - }; // Empty the channel before sending the new message. If pruning processor is busy, this step makes sure // the internal channel does not grow with no need (since we only care about the most recent message) let _consume = self.pruning_receiver.try_iter().count(); @@ -556,13 +558,9 @@ impl VirtualStateProcessor { drop(selected_chain_write); } - fn maybe_commit_windows(&self, new_sink: Hash, sink_ghostdag_data: &GhostdagData) { + fn commit_windows(&self, new_sink: Hash, sink_ghostdag_data: &GhostdagData) { // this is only important for ibd performance, as we incur expensive cache misses otherwise. // this occurs because we cannot rely on header processing to pre-cache in this scenario. - - // TODO: We could optimize this by only committing the windows if virtual processor where to have explicit knowledge of being in ibd. - // above may be possible with access to the `is_ibd_running` AtomicBool, or `is_nearly_synced()` method. - if !self.block_window_cache_for_difficulty.contains_key(&new_sink) { self.block_window_cache_for_difficulty .insert(new_sink, self.window_manager.block_daa_window(sink_ghostdag_data).unwrap().window);