Skip to content

Commit

Permalink
address reveiw points from m. sutton.
Browse files Browse the repository at this point in the history
  • Loading branch information
D-Stacks committed Nov 17, 2024
1 parent fc4f047 commit f4cc049
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 65 deletions.
16 changes: 3 additions & 13 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,23 +241,13 @@ impl Consensus {
body_receiver,
virtual_sender,
block_processors_pool,
params,
db.clone(),
storage.statuses_store.clone(),
storage.ghostdag_store.clone(),
storage.headers_store.clone(),
storage.block_transactions_store.clone(),
storage.body_tips_store.clone(),
services.reachability_service.clone(),
services.coinbase_manager.clone(),
services.mass_calculator.clone(),
services.transaction_validator.clone(),
services.window_manager.clone(),
params.max_block_mass,
params.genesis.clone(),
&storage,
&services,
pruning_lock.clone(),
notification_root.clone(),
counters.clone(),
params.storage_mass_activation,
));

let virtual_processor = Arc::new(VirtualStateProcessor::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use kaspa_database::prelude::StoreResultExtensions;
use kaspa_hashes::Hash;
use kaspa_utils::option::OptionExtensions;
use once_cell::unsync::Lazy;
use std::sync::Arc;
use std::{ops::Deref, sync::Arc};

impl BlockBodyProcessor {
pub fn validate_body_in_context(self: &Arc<Self>, block: &Block) -> BlockProcessResult<()> {
Expand All @@ -19,17 +19,29 @@ impl BlockBodyProcessor {
}

fn check_block_transactions_in_context(self: &Arc<Self>, block: &Block) -> BlockProcessResult<()> {
// TODO: this is somewhat expensive during ibd, as it incurs cache misses.
let pmt_res =
Lazy::new(|| match self.window_manager.calc_past_median_time(&self.ghostdag_store.get_data(block.hash()).unwrap()) {
Ok((pmt, _)) => Ok(pmt),
Err(e) => Err(e),
});
// Note: This is somewhat expensive during ibd, as it incurs cache misses.

// Use lazy evaluation to avoid unnecessary work, as most of the time we expect the txs not to have lock time.
let lazy_ghostdag_data = Lazy::new(|| self.ghostdag_store.get_data(block.hash()).unwrap());
let lazy_pmt_res = Lazy::new(|| match self.window_manager.calc_past_median_time(lazy_ghostdag_data.deref()) {
Ok((pmt, _)) => Ok(pmt),
Err(e) => Err(e),
});

for tx in block.transactions.iter() {
// quick check to avoid the expensive Lazy eval during ibd (in most cases).
// Quick check to avoid the expensive Lazy eval during ibd (in most cases).
// TODO: refactor this and avoid classifying the tx lock outside of the transaction validator.
if tx.lock_time != 0 {
if let Err(e) = self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, (*pmt_res).clone()?) {
// Extract the past median time from the Lazy.
let pmt = (*lazy_pmt_res).clone()?;

// Commit the past median time to the cache, if not already there.
if !self.block_window_cache_for_past_median_time.contains_key(&block.hash()) {
self.block_window_cache_for_past_median_time
.insert(block.hash(), self.window_manager.calc_past_median_time(lazy_ghostdag_data.deref()).unwrap().1);
};

if let Err(e) = self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, pmt) {
return Err(RuleError::TxInContextFailed(tx.id(), e));
};
};
Expand Down
63 changes: 34 additions & 29 deletions consensus/src/pipeline/body_processor/processor.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::{
consensus::services::DbWindowManager,
consensus::{
services::{ConsensusServices, DbWindowManager},
storage::ConsensusStorage,
},
errors::{BlockProcessResult, RuleError},
model::{
services::reachability::MTReachabilityService,
stores::{
block_transactions::DbBlockTransactionsStore,
block_window_cache::BlockWindowCacheStore,
ghostdag::DbGhostdagStore,
headers::DbHeadersStore,
reachability::DbReachabilityStore,
Expand All @@ -23,7 +27,10 @@ use crossbeam_channel::{Receiver, Sender};
use kaspa_consensus_core::{
block::Block,
blockstatus::BlockStatus::{self, StatusHeaderOnly, StatusInvalid},
config::{genesis::GenesisBlock, params::ForkActivation},
config::{
genesis::GenesisBlock,
params::{ForkActivation, Params},
},
mass::MassCalculator,
tx::Transaction,
};
Expand Down Expand Up @@ -60,6 +67,8 @@ pub struct BlockBodyProcessor {
pub(super) headers_store: Arc<DbHeadersStore>,
pub(super) block_transactions_store: Arc<DbBlockTransactionsStore>,
pub(super) body_tips_store: Arc<RwLock<DbTipsStore>>,
pub(super) block_window_cache_for_difficulty: Arc<BlockWindowCacheStore>,
pub(super) block_window_cache_for_past_median_time: Arc<BlockWindowCacheStore>,

// Managers and services
pub(super) reachability_service: MTReachabilityService<DbReachabilityStore>,
Expand Down Expand Up @@ -91,47 +100,43 @@ impl BlockBodyProcessor {
sender: Sender<VirtualStateProcessingMessage>,
thread_pool: Arc<ThreadPool>,

params: &Params,
db: Arc<DB>,
statuses_store: Arc<RwLock<DbStatusesStore>>,
ghostdag_store: Arc<DbGhostdagStore>,
headers_store: Arc<DbHeadersStore>,
block_transactions_store: Arc<DbBlockTransactionsStore>,
body_tips_store: Arc<RwLock<DbTipsStore>>,

reachability_service: MTReachabilityService<DbReachabilityStore>,
coinbase_manager: CoinbaseManager,
mass_calculator: MassCalculator,
transaction_validator: TransactionValidator,
window_manager: DbWindowManager,
max_block_mass: u64,
genesis: GenesisBlock,
storage: &Arc<ConsensusStorage>,
services: &Arc<ConsensusServices>,

pruning_lock: SessionLock,
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
storage_mass_activation: ForkActivation,
) -> Self {
Self {
receiver,
sender,
thread_pool,
db,
statuses_store,
reachability_service,
ghostdag_store,
headers_store,
block_transactions_store,
body_tips_store,
coinbase_manager,
mass_calculator,
transaction_validator,
window_manager,
max_block_mass,
genesis,

max_block_mass: params.max_block_mass,
genesis: params.genesis.clone(),

statuses_store: storage.statuses_store.clone(),
ghostdag_store: storage.ghostdag_store.clone(),
headers_store: storage.headers_store.clone(),
block_transactions_store: storage.block_transactions_store.clone(),
body_tips_store: storage.body_tips_store.clone(),
block_window_cache_for_difficulty: storage.block_window_cache_for_difficulty.clone(),
block_window_cache_for_past_median_time: storage.block_window_cache_for_past_median_time.clone(),

reachability_service: services.reachability_service.clone(),
coinbase_manager: services.coinbase_manager.clone(),
mass_calculator: services.mass_calculator.clone(),
transaction_validator: services.transaction_validator.clone(),
window_manager: services.window_manager.clone(),

pruning_lock,
task_manager: BlockTaskDependencyManager::new(),
notification_root,
counters,
storage_mass_activation,
storage_mass_activation: params.storage_mass_activation,
}
}

Expand Down
26 changes: 12 additions & 14 deletions consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,17 @@ impl VirtualStateProcessor {

let sink_multiset = self.utxo_multisets_store.get(new_sink).unwrap();
let chain_path = self.dag_traversal_manager.calculate_chain_path(prev_sink, new_sink, None);
let compact_sink_ghostdag_data = if prev_sink != new_sink {
// We need to check with full data here, since we may need to update the window caches
let sink_ghostdag_data = self.ghostdag_store.get_data(new_sink).unwrap();
// Update window caches - for ibd performance. see method comment for more details.
// This should also be called before `calculate_and_commit_virtual_state` to ascertain cache hits in the latter method.
self.commit_windows(new_sink, &sink_ghostdag_data);
CompactGhostdagData::from(sink_ghostdag_data.as_ref())
} else {
self.ghostdag_store.get_compact_data(new_sink).unwrap()
};

let new_virtual_state = self
.calculate_and_commit_virtual_state(
virtual_read,
Expand All @@ -311,15 +322,6 @@ impl VirtualStateProcessor {
.expect("all possible rule errors are unexpected here");

// Update the pruning processor about the virtual state change
let compact_sink_ghostdag_data = if prev_sink != new_sink {
// we need to check with full data here, since we may need to update the window caches
let sink_ghostdag_data = self.ghostdag_store.get_data(new_sink).unwrap();
// update window caches - for ibd performance. see method comment for more details.
self.maybe_commit_windows(new_sink, &sink_ghostdag_data);
CompactGhostdagData::from(sink_ghostdag_data.as_ref())
} else {
self.ghostdag_store.get_compact_data(new_sink).unwrap()
};
// Empty the channel before sending the new message. If pruning processor is busy, this step makes sure
// the internal channel does not grow with no need (since we only care about the most recent message)
let _consume = self.pruning_receiver.try_iter().count();
Expand Down Expand Up @@ -556,13 +558,9 @@ impl VirtualStateProcessor {
drop(selected_chain_write);
}

fn maybe_commit_windows(&self, new_sink: Hash, sink_ghostdag_data: &GhostdagData) {
fn commit_windows(&self, new_sink: Hash, sink_ghostdag_data: &GhostdagData) {
// this is only important for ibd performance, as we incur expensive cache misses otherwise.
// this occurs because we cannot rely on header processing to pre-cache in this scenario.

// TODO: We could optimize this by only committing the windows if virtual processor where to have explicit knowledge of being in ibd.
// above may be possible with access to the `is_ibd_running` AtomicBool, or `is_nearly_synced()` method.

if !self.block_window_cache_for_difficulty.contains_key(&new_sink) {
self.block_window_cache_for_difficulty
.insert(new_sink, self.window_manager.block_daa_window(sink_ghostdag_data).unwrap().window);
Expand Down

0 comments on commit f4cc049

Please sign in to comment.