From caedaa28385d4dfdd74f99f1d7e05bbb240189e9 Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Fri, 21 Jul 2023 00:59:22 +0300 Subject: [PATCH] IBD: fix some syncer-syncee miscommunications which are more apparent with 10 bps (#221) * comments * extend resolution range to 8 max * do not sync missing bodies in the past of `syncer_header_selected_tip` * typos * no need to lock the pruning store throughout locator building * the two conditions can be done in one * rollback previous change (will be fixed more correctly by the coming switch HSC-> VSC) * change selected chain store from *headers* selected chain to *virtual* selected chain (wip: test fix; renaming of various variables) * fix selected-chain test by adding a way to (test-)build utxo valid blocks with specific parents * make pruning point getter non-Option * rename `headers_selected_chain` -> `virtual_selected_parent` * add temp logic for upgrading from prev DB version * get tip if high is none through the selected chain store itself * add virtual chain assertion to relevant tests * added selected_chain_store_iterator which is more idiomatic * wrap with TestBlockBuilder to avoid direct access through virtual processor * keep the pruning point read guard throughout building the locator * break if parent is missing * extend comment --- components/consensusmanager/src/session.rs | 6 +- consensus/core/src/api/mod.rs | 4 +- consensus/core/src/lib.rs | 1 + consensus/src/consensus/mod.rs | 27 +++-- consensus/src/consensus/test_consensus.rs | 55 ++++++--- consensus/src/model/stores/selected_chain.rs | 15 ++- .../pipeline/header_processor/processor.rs | 12 -- .../src/pipeline/virtual_processor/mod.rs | 1 + .../pipeline/virtual_processor/processor.rs | 113 +++++++++++++++--- .../virtual_processor/test_block_builder.rs | 66 ++++++++++ consensus/src/processes/sync/mod.rs | 16 ++- kaspad/src/main.rs | 7 -- protocol/flows/src/flow_context.rs | 2 +- protocol/flows/src/v5/blockrelay/flow.rs | 6 +- protocol/flows/src/v5/ibd/flow.rs | 38 +++--- protocol/flows/src/v5/ibd/negotiate.rs | 13 +- .../src/v5/request_ibd_chain_block_locator.rs | 3 +- rpc/service/src/service.rs | 2 +- testing/integration/src/integration_tests.rs | 37 ++++-- 19 files changed, 300 insertions(+), 124 deletions(-) create mode 100644 consensus/src/pipeline/virtual_processor/test_block_builder.rs diff --git a/components/consensusmanager/src/session.rs b/components/consensusmanager/src/session.rs index 78b0cd87ed..31f22c452d 100644 --- a/components/consensusmanager/src/session.rs +++ b/components/consensusmanager/src/session.rs @@ -256,12 +256,12 @@ impl ConsensusSessionOwned { self.clone().spawn_blocking(|c| c.get_pruning_point_proof()).await } - pub async fn async_create_headers_selected_chain_block_locator( + pub async fn async_create_virtual_selected_chain_block_locator( &self, low: Option, high: Option, ) -> ConsensusResult> { - self.clone().spawn_blocking(move |c| c.create_headers_selected_chain_block_locator(low, high)).await + self.clone().spawn_blocking(move |c| c.create_virtual_selected_chain_block_locator(low, high)).await } pub async fn async_create_block_locator_from_pruning_point(&self, high: Hash, limit: usize) -> ConsensusResult> { @@ -331,7 +331,7 @@ impl ConsensusSessionOwned { self.clone().spawn_blocking(move |c| c.get_missing_block_body_hashes(high)).await } - pub async fn async_pruning_point(&self) -> Option { + pub async fn async_pruning_point(&self) -> Hash { self.clone().spawn_blocking(|c| c.pruning_point()).await } diff --git a/consensus/core/src/api/mod.rs b/consensus/core/src/api/mod.rs index 738fc0df3f..6bc21677b5 100644 --- a/consensus/core/src/api/mod.rs +++ b/consensus/core/src/api/mod.rs @@ -169,7 +169,7 @@ pub trait ConsensusApi: Send + Sync { unimplemented!() } - fn create_headers_selected_chain_block_locator(&self, low: Option, high: Option) -> ConsensusResult> { + fn create_virtual_selected_chain_block_locator(&self, low: Option, high: Option) -> ConsensusResult> { unimplemented!() } @@ -238,7 +238,7 @@ pub trait ConsensusApi: Send + Sync { unimplemented!() } - fn pruning_point(&self) -> Option { + fn pruning_point(&self) -> Hash { unimplemented!() } diff --git a/consensus/core/src/lib.rs b/consensus/core/src/lib.rs index ab98470191..af72907811 100644 --- a/consensus/core/src/lib.rs +++ b/consensus/core/src/lib.rs @@ -74,6 +74,7 @@ impl HashMapCustomHasher for BlockHashSet { } } +#[derive(Default, Debug)] pub struct ChainPath { pub added: Vec, pub removed: Vec, diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 0e42ce9737..aea4a958a6 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -557,7 +557,7 @@ impl ConsensusApi for Consensus { self.services.pruning_proof_manager.get_pruning_point_proof() } - fn create_headers_selected_chain_block_locator(&self, low: Option, high: Option) -> ConsensusResult> { + fn create_virtual_selected_chain_block_locator(&self, low: Option, high: Option) -> ConsensusResult> { if let Some(low) = low { self.validate_block_exists(low)?; } @@ -566,7 +566,7 @@ impl ConsensusApi for Consensus { self.validate_block_exists(high)?; } - Ok(self.services.sync_manager.create_headers_selected_chain_block_locator(low, high)?) + Ok(self.services.sync_manager.create_virtual_selected_chain_block_locator(low, high)?) } fn pruning_point_headers(&self) -> Vec> { @@ -652,8 +652,8 @@ impl ConsensusApi for Consensus { Ok(self.services.sync_manager.get_missing_block_body_hashes(high)?) } - fn pruning_point(&self) -> Option { - self.pruning_point_store.read().pruning_point().unwrap_option() + fn pruning_point(&self) -> Hash { + self.pruning_point_store.read().pruning_point().unwrap() } fn get_daa_window(&self, hash: Hash) -> ConsensusResult> { @@ -673,27 +673,30 @@ impl ConsensusApi for Consensus { self.validate_block_exists(hash)?; // In order to guarantee the chain height is at least k, we check that the pruning point is not genesis. - if self.pruning_point().unwrap() == self.config.genesis.hash { + if self.pruning_point() == self.config.genesis.hash { return Err(ConsensusError::UnexpectedPruningPoint); } let mut hashes = Vec::with_capacity(self.config.params.ghostdag_k as usize); let mut current = hash; - // TODO: This will crash if we don't have the data for k blocks in the past of - // current. The syncee should validate it got all of the associated data. for _ in 0..=self.config.params.ghostdag_k { hashes.push(current); - current = self.ghostdag_primary_store.get_selected_parent(current).unwrap(); + // TODO: ideally the syncee should validate it got all of the associated data up + // to k blocks back and then we would be able to safely unwrap here. For now we + // just break the loop, since if the data was truly missing we wouldn't accept + // the staging consensus in the first place + let Some(parent) = self.ghostdag_primary_store.get_selected_parent(current).unwrap_option() else { break; }; + current = parent; } Ok(hashes) } fn create_block_locator_from_pruning_point(&self, high: Hash, limit: usize) -> ConsensusResult> { self.validate_block_exists(high)?; - - let pp_read_guard = self.pruning_point_store.read(); - let pp = pp_read_guard.pruning_point().unwrap(); - Ok(self.services.sync_manager.create_block_locator_from_pruning_point(high, pp, Some(limit))?) + // Keep the pruning point read guard throughout building the locator + let pruning_point_read = self.pruning_point_store.read(); + let pruning_point = pruning_point_read.pruning_point().unwrap(); + Ok(self.services.sync_manager.create_block_locator_from_pruning_point(high, pruning_point, Some(limit))?) } fn estimate_network_hashes_per_second(&self, start_hash: Option, window_size: usize) -> ConsensusResult { diff --git a/consensus/src/consensus/test_consensus.rs b/consensus/src/consensus/test_consensus.rs index 7312599fcd..abfa45ad97 100644 --- a/consensus/src/consensus/test_consensus.rs +++ b/consensus/src/consensus/test_consensus.rs @@ -1,4 +1,6 @@ use async_channel::Sender; +use kaspa_consensus_core::coinbase::MinerData; +use kaspa_consensus_core::tx::ScriptPublicKey; use kaspa_consensus_core::{ api::ConsensusApi, block::MutableBlock, blockstatus::BlockStatus, header::Header, merkle::calc_hash_merkle_root, subnets::SUBNETWORK_ID_COINBASE, tx::Transaction, @@ -13,6 +15,7 @@ use parking_lot::RwLock; use std::future::Future; use std::{sync::Arc, thread::JoinHandle}; +use crate::pipeline::virtual_processor::test_block_builder::TestBlockBuilder; use crate::processes::window::WindowManager; use crate::{ config::Config, @@ -34,8 +37,9 @@ use super::services::{DbDagTraversalManager, DbGhostdagManager, DbWindowManager} use super::Consensus; pub struct TestConsensus { - consensus: Arc, params: Params, + consensus: Arc, + block_builder: TestBlockBuilder, db_lifetime: DbLifetime, } @@ -44,11 +48,10 @@ impl TestConsensus { pub fn with_db(db: Arc, config: &Config, notification_sender: Sender) -> Self { let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender)); let counters = Arc::new(ProcessingCounters::default()); - Self { - consensus: Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters)), - params: config.params.clone(), - db_lifetime: Default::default(), - } + let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters)); + let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone()); + + Self { params: config.params.clone(), consensus, block_builder, db_lifetime: Default::default() } } /// Creates a test consensus instance based on `config` with a temp DB and the provided `notification_sender` @@ -56,11 +59,10 @@ impl TestConsensus { let (db_lifetime, db) = create_temp_db(); let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender)); let counters = Arc::new(ProcessingCounters::default()); - Self { - consensus: Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters)), - params: config.params.clone(), - db_lifetime, - } + let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters)); + let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone()); + + Self { consensus, block_builder, params: config.params.clone(), db_lifetime } } /// Creates a test consensus instance based on `config` with a temp DB and no notifier @@ -69,11 +71,10 @@ impl TestConsensus { let (dummy_notification_sender, _) = async_channel::unbounded(); let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender)); let counters = Arc::new(ProcessingCounters::default()); - Self { - consensus: Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters)), - params: config.params.clone(), - db_lifetime, - } + let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters)); + let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone()); + + Self { consensus, block_builder, params: config.params.clone(), db_lifetime } } /// Clone the inner consensus Arc. For general usage of the underlying consensus simply deref @@ -107,6 +108,28 @@ impl TestConsensus { self.validate_and_insert_block(self.build_block_with_parents(hash, parents).to_immutable()) } + pub fn add_utxo_valid_block_with_parents( + &self, + hash: Hash, + parents: Vec, + txs: Vec, + ) -> impl Future> { + let miner_data = MinerData::new(ScriptPublicKey::from_vec(0, vec![]), vec![]); + self.validate_and_insert_block(self.build_utxo_valid_block_with_parents(hash, parents, miner_data, txs).to_immutable()) + } + + pub fn build_utxo_valid_block_with_parents( + &self, + hash: Hash, + parents: Vec, + miner_data: MinerData, + txs: Vec, + ) -> MutableBlock { + let mut template = self.block_builder.build_block_template_with_parents(parents, miner_data, txs).unwrap(); + template.block.header.hash = hash; + template.block + } + pub fn build_block_with_parents_and_transactions( &self, hash: Hash, diff --git a/consensus/src/model/stores/selected_chain.rs b/consensus/src/model/stores/selected_chain.rs index d2acd27bf5..0a2f86f591 100644 --- a/consensus/src/model/stores/selected_chain.rs +++ b/consensus/src/model/stores/selected_chain.rs @@ -17,12 +17,13 @@ use super::U64Key; pub trait SelectedChainStoreReader { fn get_by_hash(&self, hash: Hash) -> StoreResult; fn get_by_index(&self, index: u64) -> StoreResult; + fn get_tip(&self) -> StoreResult<(u64, Hash)>; } /// Write API for `SelectedChainStore`. The set function is deliberately `mut` /// since chain index is not append-only and thus needs to be guarded. pub trait SelectedChainStore: SelectedChainStoreReader { - fn apply_changes(&mut self, batch: &mut WriteBatch, changes: ChainPath) -> StoreResult<()>; + fn apply_changes(&mut self, batch: &mut WriteBatch, changes: &ChainPath) -> StoreResult<()>; fn prune_below_pruning_point(&mut self, writer: impl DbWriter, pruning_point: Hash) -> StoreResult<()>; fn init_with_pruning_point(&mut self, batch: &mut WriteBatch, block: Hash) -> StoreResult<()>; } @@ -68,22 +69,28 @@ impl SelectedChainStoreReader for DbSelectedChainStore { fn get_by_index(&self, index: u64) -> StoreResult { self.access_hash_by_index.read(index.into()) } + + fn get_tip(&self) -> StoreResult<(u64, Hash)> { + let idx = self.access_highest_index.read()?; + let hash = self.access_hash_by_index.read(idx.into())?; + Ok((idx, hash)) + } } impl SelectedChainStore for DbSelectedChainStore { - fn apply_changes(&mut self, batch: &mut WriteBatch, changes: ChainPath) -> StoreResult<()> { + fn apply_changes(&mut self, batch: &mut WriteBatch, changes: &ChainPath) -> StoreResult<()> { let added_len = changes.added.len() as u64; let current_highest_index = self.access_highest_index.read().unwrap(); let split_index = current_highest_index - changes.removed.len() as u64; let new_highest_index = added_len + split_index; - for to_remove in changes.removed { + for to_remove in changes.removed.iter().copied() { let index = self.access_index_by_hash.read(to_remove).unwrap(); self.access_index_by_hash.delete(BatchDbWriter::new(batch), to_remove).unwrap(); self.access_hash_by_index.delete(BatchDbWriter::new(batch), index.into()).unwrap(); } - for (i, to_add) in changes.added.into_iter().enumerate() { + for (i, to_add) in changes.added.iter().copied().enumerate() { self.access_index_by_hash.write(BatchDbWriter::new(batch), to_add, i as u64 + split_index + 1).unwrap(); self.access_hash_by_index.write(BatchDbWriter::new(batch), (i as u64 + split_index + 1).into(), to_add).unwrap(); } diff --git a/consensus/src/pipeline/header_processor/processor.rs b/consensus/src/pipeline/header_processor/processor.rs index 8413bfcdb2..f51d05a5e2 100644 --- a/consensus/src/pipeline/header_processor/processor.rs +++ b/consensus/src/pipeline/header_processor/processor.rs @@ -19,7 +19,6 @@ use crate::{ pruning::{DbPruningStore, PruningPointInfo, PruningStoreReader}, reachability::{DbReachabilityStore, StagingReachabilityStore}, relations::{DbRelationsStore, RelationsStoreReader}, - selected_chain::{DbSelectedChainStore, SelectedChainStore}, statuses::{DbStatusesStore, StatusesStore, StatusesStoreBatchExtensions, StatusesStoreReader}, DB, }, @@ -136,7 +135,6 @@ pub struct HeaderProcessor { pub(super) daa_excluded_store: Arc, pub(super) headers_store: Arc, pub(super) headers_selected_tip_store: Arc>, - pub(super) selected_chain_store: Arc>, pub(super) depth_store: Arc, // Managers and services @@ -187,7 +185,6 @@ impl HeaderProcessor { headers_store: storage.headers_store.clone(), depth_store: storage.depth_store.clone(), headers_selected_tip_store: storage.headers_selected_tip_store.clone(), - selected_chain_store: storage.selected_chain_store.clone(), block_window_cache_for_difficulty: storage.block_window_cache_for_difficulty.clone(), block_window_cache_for_past_median_time: storage.block_window_cache_for_past_median_time.clone(), @@ -392,18 +389,13 @@ impl HeaderProcessor { // Non-append only stores need to use write locks. // Note we need to keep the lock write guards until the batch is written. let mut hst_write = self.headers_selected_tip_store.write(); - let mut sc_write = self.selected_chain_store.write(); let prev_hst = hst_write.get().unwrap(); - // We can't calculate chain path for blocks that do not have the pruning point in their chain, so we just skip them. if SortableBlock::new(ctx.hash, header.blue_work) > prev_hst && reachability::is_chain_ancestor_of(&staging, pp, ctx.hash).unwrap() { // Hint reachability about the new tip. reachability::hint_virtual_selected_parent(&mut staging, ctx.hash).unwrap(); hst_write.set_batch(&mut batch, SortableBlock::new(ctx.hash, header.blue_work)).unwrap(); - let mut chain_path = self.dag_traversal_manager.calculate_chain_path(prev_hst.hash, ghostdag_data[0].selected_parent); - chain_path.added.push(ctx.hash); - sc_write.apply_changes(&mut batch, chain_path).unwrap(); } // @@ -437,7 +429,6 @@ impl HeaderProcessor { drop(reachability_relations_write); drop(relations_write); drop(hst_write); - drop(sc_write); } fn commit_trusted_header(&self, ctx: HeaderProcessingContext, _header: &Header) { @@ -463,13 +454,10 @@ impl HeaderProcessor { pub fn process_genesis(&self) { // Init headers selected tip and selected chain stores let mut batch = WriteBatch::default(); - let mut sc_write = self.selected_chain_store.write(); - sc_write.init_with_pruning_point(&mut batch, self.genesis.hash).unwrap(); let mut hst_write = self.headers_selected_tip_store.write(); hst_write.set_batch(&mut batch, SortableBlock::new(self.genesis.hash, 0.into())).unwrap(); self.db.write(batch).unwrap(); drop(hst_write); - drop(sc_write); // Write the genesis header let mut genesis_header: Header = (&self.genesis).into(); diff --git a/consensus/src/pipeline/virtual_processor/mod.rs b/consensus/src/pipeline/virtual_processor/mod.rs index bfe86041f7..a35dec6856 100644 --- a/consensus/src/pipeline/virtual_processor/mod.rs +++ b/consensus/src/pipeline/virtual_processor/mod.rs @@ -2,5 +2,6 @@ pub mod errors; mod processor; mod utxo_validation; pub use processor::*; +pub mod test_block_builder; #[cfg(test)] mod tests; diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index e36d7a4e99..ff22bcdc49 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -25,6 +25,7 @@ use crate::{ pruning_utxoset::PruningUtxosetStores, reachability::DbReachabilityStore, relations::{DbRelationsStore, RelationsStoreReader}, + selected_chain::{DbSelectedChainStore, SelectedChainStore, SelectedChainStoreReader}, statuses::{DbStatusesStore, StatusesStore, StatusesStoreBatchExtensions, StatusesStoreReader}, tips::{DbTipsStore, TipsStoreReader}, utxo_diffs::{DbUtxoDiffsStore, UtxoDiffsStoreReader}, @@ -58,7 +59,7 @@ use kaspa_consensus_core::{ utxo_diff::UtxoDiff, utxo_view::{UtxoView, UtxoViewComposition}, }, - BlockHashSet, + BlockHashSet, ChainPath, }; use kaspa_consensus_notify::{ notification::{ @@ -118,6 +119,7 @@ pub struct VirtualStateProcessor { pub(super) past_pruning_points_store: Arc, pub(super) body_tips_store: Arc>, pub(super) depth_store: Arc, + pub(super) selected_chain_store: Arc>, // Utxo-related stores pub(super) utxo_diffs_store: Arc, @@ -184,6 +186,7 @@ impl VirtualStateProcessor { past_pruning_points_store: storage.past_pruning_points_store.clone(), body_tips_store: storage.body_tips_store.clone(), depth_store: storage.depth_store.clone(), + selected_chain_store: storage.selected_chain_store.clone(), utxo_diffs_store: storage.utxo_diffs_store.clone(), utxo_multisets_store: storage.utxo_multisets_store.clone(), acceptance_data_store: storage.acceptance_data_store.clone(), @@ -208,6 +211,22 @@ impl VirtualStateProcessor { } pub fn worker(self: &Arc) { + // TEMP: upgrade from prev DB version where the chain was the headers selected chain + if let Some(virtual_state) = self.virtual_stores.read().state.get().unwrap_option() { + let sink = virtual_state.ghostdag_data.selected_parent; + let mut selected_chain_write = self.selected_chain_store.write(); + if let Some((_, tip)) = selected_chain_write.get_tip().unwrap_option() { + // This means we are upgrading from the previous version + if sink != tip { + let chain_path = self.dag_traversal_manager.calculate_chain_path(tip, sink); + info!("Upgrading the DB from HSC storage to VSC storage: {:?}", chain_path); + let mut batch = WriteBatch::default(); + selected_chain_write.apply_changes(&mut batch, &chain_path).unwrap(); + self.db.write(batch).unwrap(); + } + } + } + 'outer: while let Ok(msg) = self.receiver.recv() { if msg.is_exit_message() { break; @@ -254,6 +273,7 @@ impl VirtualStateProcessor { assert_eq!(virtual_ghostdag_data.selected_parent, new_sink); let sink_multiset = self.utxo_multisets_store.get(new_sink).unwrap(); + let chain_path = self.dag_traversal_manager.calculate_chain_path(prev_sink, new_sink); let new_virtual_state = self .calculate_and_commit_virtual_state( virtual_read, @@ -261,6 +281,7 @@ impl VirtualStateProcessor { virtual_ghostdag_data, sink_multiset, &mut accumulated_diff, + &chain_path, ) .expect("all possible rule errors are unexpected here"); @@ -283,7 +304,6 @@ impl VirtualStateProcessor { self.notification_root .notify(Notification::VirtualDaaScoreChanged(VirtualDaaScoreChangedNotification::new(new_virtual_state.daa_score))) .expect("expecting an open unbounded channel"); - let chain_path = self.dag_traversal_manager.calculate_chain_path(prev_sink, new_sink); // TODO: Fetch acceptance data only if there's a subscriber for the below notification. let added_chain_blocks_acceptance_data = chain_path.added.iter().copied().map(|added| self.acceptance_data_store.get(added).unwrap()).collect_vec(); @@ -296,7 +316,7 @@ impl VirtualStateProcessor { .expect("expecting an open unbounded channel"); } - fn virtual_finality_point(&self, virtual_ghostdag_data: &GhostdagData, pruning_point: Hash) -> Hash { + pub(super) fn virtual_finality_point(&self, virtual_ghostdag_data: &GhostdagData, pruning_point: Hash) -> Hash { let finality_point = self.depth_manager.calc_finality_point(virtual_ghostdag_data, pruning_point); if self.reachability_service.is_chain_ancestor_of(pruning_point, finality_point) { finality_point @@ -414,8 +434,28 @@ impl VirtualStateProcessor { virtual_ghostdag_data: GhostdagData, selected_parent_multiset: MuHash, accumulated_diff: &mut UtxoDiff, + chain_path: &ChainPath, + ) -> Result, RuleError> { + let new_virtual_state = self.calculate_virtual_state( + &virtual_read, + virtual_parents, + virtual_ghostdag_data, + selected_parent_multiset, + accumulated_diff, + )?; + self.commit_virtual_state(virtual_read, new_virtual_state.clone(), accumulated_diff, chain_path); + Ok(new_virtual_state) + } + + pub(super) fn calculate_virtual_state( + &self, + virtual_stores: &VirtualStores, + virtual_parents: Vec, + virtual_ghostdag_data: GhostdagData, + selected_parent_multiset: MuHash, + accumulated_diff: &mut UtxoDiff, ) -> Result, RuleError> { - let selected_parent_utxo_view = (&virtual_read.utxo_set).compose(&*accumulated_diff); + let selected_parent_utxo_view = (&virtual_stores.utxo_set).compose(&*accumulated_diff); let mut ctx = UtxoProcessingContext::new((&virtual_ghostdag_data).into(), selected_parent_multiset); // Calc virtual DAA score, difficulty bits and past median time @@ -430,7 +470,7 @@ impl VirtualStateProcessor { accumulated_diff.with_diff_in_place(&ctx.mergeset_diff).unwrap(); // Build the new virtual state - let new_virtual_state = Arc::new(VirtualState::new( + Ok(Arc::new(VirtualState::new( virtual_parents, virtual_daa_window.daa_score, virtual_bits, @@ -441,24 +481,35 @@ impl VirtualStateProcessor { ctx.mergeset_rewards, virtual_daa_window.mergeset_non_daa, virtual_ghostdag_data, - )); + ))) + } + fn commit_virtual_state( + &self, + virtual_read: RwLockUpgradableReadGuard<'_, VirtualStores>, + new_virtual_state: Arc, + accumulated_diff: &UtxoDiff, + chain_path: &ChainPath, + ) { let mut batch = WriteBatch::default(); let mut virtual_write = RwLockUpgradableReadGuard::upgrade(virtual_read); + let mut selected_chain_write = self.selected_chain_store.write(); // Apply the accumulated diff to the virtual UTXO set virtual_write.utxo_set.write_diff_batch(&mut batch, accumulated_diff).unwrap(); // Update virtual state - virtual_write.state.set_batch(&mut batch, new_virtual_state.clone()).unwrap(); + virtual_write.state.set_batch(&mut batch, new_virtual_state).unwrap(); + + // Update the virtual selected chain + selected_chain_write.apply_changes(&mut batch, chain_path).unwrap(); // Flush the batch changes self.db.write(batch).unwrap(); // Calling the drops explicitly after the batch is written in order to avoid possible errors. drop(virtual_write); - - Ok(new_virtual_state) + drop(selected_chain_write); } /// Returns the max number of tips to consider as virtual parents in a single virtual resolve operation @@ -475,7 +526,7 @@ impl VirtualStateProcessor { /// The function returns with `diff` being the diff of the new sink from previous virtual. /// In addition to the found sink the function also returns a queue of additional virtual /// parent candidates ordered in descending blue work order. - fn sink_search_algorithm( + pub(super) fn sink_search_algorithm( &self, stores: &VirtualStores, diff: &mut UtxoDiff, @@ -539,7 +590,7 @@ impl VirtualStateProcessor { /// 1. `selected_parent` is a UTXO-valid block /// 2. `candidates` are an antichain ordered in descending blue work order /// 3. `candidates` do not contain `selected_parent` and `selected_parent.blue work > max(candidates.blue_work)` - fn pick_virtual_parents( + pub(super) fn pick_virtual_parents( &self, selected_parent: Hash, mut candidates: VecDeque, @@ -682,25 +733,48 @@ impl VirtualStateProcessor { Ok(()) } - pub fn build_block_template(&self, miner_data: MinerData, mut txs: Vec) -> Result { + pub fn build_block_template(&self, miner_data: MinerData, txs: Vec) -> Result { // TODO: tests let virtual_read = self.virtual_stores.read(); let virtual_state = virtual_read.state.get().unwrap(); let virtual_utxo_view = &virtual_read.utxo_set; + // Validate the transactions in virtual's utxo context + self.validate_block_template_transactions(&txs, &virtual_state, virtual_utxo_view)?; + + // At this point we can safely drop the read lock + drop(virtual_read); + + // Build the template + self.build_block_template_from_virtual_state(virtual_state, miner_data, txs) + } + + pub fn validate_block_template_transactions( + &self, + txs: &[Transaction], + virtual_state: &VirtualState, + utxo_view: &impl UtxoView, + ) -> Result<(), RuleError> { // Search for invalid transactions. This can happen since the mining manager calling this function is not atomically in sync with virtual state let mut invalid_transactions = Vec::new(); for tx in txs.iter() { - if let Err(e) = self.validate_block_template_transaction(tx, &virtual_state, virtual_utxo_view) { + if let Err(e) = self.validate_block_template_transaction(tx, virtual_state, utxo_view) { invalid_transactions.push((tx.id(), e)) } } if !invalid_transactions.is_empty() { - return Err(RuleError::InvalidTransactionsInNewBlock(invalid_transactions)); + Err(RuleError::InvalidTransactionsInNewBlock(invalid_transactions)) + } else { + Ok(()) } - // At this point we can safely drop the read lock - drop(virtual_read); + } + pub(crate) fn build_block_template_from_virtual_state( + &self, + virtual_state: Arc, + miner_data: MinerData, + mut txs: Vec, + ) -> Result { let pruning_info = self.pruning_point_store.read().get().unwrap(); let header_pruning_point = self.pruning_point_manager.expected_header_pruning_point(virtual_state.ghostdag_data.to_compact(), pruning_info); @@ -775,6 +849,12 @@ impl VirtualStateProcessor { .state .set(Arc::new(VirtualState::from_genesis(&self.genesis, self.ghostdag_manager.ghostdag(&[self.genesis.hash])))) .unwrap(); + // Init the virtual selected chain store + let mut batch = WriteBatch::default(); + let mut selected_chain_write = self.selected_chain_store.write(); + selected_chain_write.init_with_pruning_point(&mut batch, self.genesis.hash).unwrap(); + self.db.write(batch).unwrap(); + drop(selected_chain_write); } // TODO: rename to reflect finalizing pruning point utxoset state and importing *to* virtual utxoset @@ -847,6 +927,7 @@ impl VirtualStateProcessor { virtual_ghostdag_data, imported_utxo_multiset.clone(), &mut UtxoDiff::default(), + &ChainPath::default(), )?; Ok(()) diff --git a/consensus/src/pipeline/virtual_processor/test_block_builder.rs b/consensus/src/pipeline/virtual_processor/test_block_builder.rs new file mode 100644 index 0000000000..872bf15b40 --- /dev/null +++ b/consensus/src/pipeline/virtual_processor/test_block_builder.rs @@ -0,0 +1,66 @@ +use std::{ops::Deref, sync::Arc}; + +use crate::model::stores::{ + pruning::PruningStoreReader, utxo_multisets::UtxoMultisetsStoreReader, virtual_state::VirtualStateStoreReader, +}; +use kaspa_consensus_core::{ + block::BlockTemplate, blockhash::ORIGIN, coinbase::MinerData, errors::block::RuleError, tx::Transaction, + utxo::utxo_view::UtxoViewComposition, +}; +use kaspa_hashes::Hash; + +use super::VirtualStateProcessor; + +/// Wrapper for virtual processor with util methods for building a block with any parent context +pub struct TestBlockBuilder { + processor: Arc, +} + +impl Deref for TestBlockBuilder { + type Target = VirtualStateProcessor; + + fn deref(&self) -> &Self::Target { + &self.processor + } +} + +impl TestBlockBuilder { + pub fn new(processor: Arc) -> Self { + Self { processor } + } + + /// Test-only helper method for building a block template with specific parents + pub(crate) fn build_block_template_with_parents( + &self, + parents: Vec, + miner_data: MinerData, + txs: Vec, + ) -> Result { + // + // In the context of this method "pov virtual" is the virtual block which has `parents` as tips and not the actual virtual + // + let pruning_point = self.pruning_point_store.read().pruning_point().unwrap(); + let virtual_read = self.virtual_stores.read(); + let virtual_state = virtual_read.state.get().unwrap(); + let finality_point = ORIGIN; // No real finality point since we are not actually building virtual here + let sink = virtual_state.ghostdag_data.selected_parent; + let mut accumulated_diff = virtual_state.utxo_diff.clone().to_reversed(); + // Search for the sink block from the PoV of this virtual + let (pov_sink, virtual_parent_candidates) = + self.sink_search_algorithm(&virtual_read, &mut accumulated_diff, sink, parents, finality_point, pruning_point); + let (pov_virtual_parents, pov_virtual_ghostdag_data) = + self.pick_virtual_parents(pov_sink, virtual_parent_candidates, pruning_point); + let pov_sink_multiset = self.utxo_multisets_store.get(pov_sink).unwrap(); + let pov_virtual_state = self.calculate_virtual_state( + &virtual_read, + pov_virtual_parents, + pov_virtual_ghostdag_data, + pov_sink_multiset, + &mut accumulated_diff, + )?; + let pov_virtual_utxo_view = (&virtual_read.utxo_set).compose(accumulated_diff); + self.validate_block_template_transactions(&txs, &pov_virtual_state, &pov_virtual_utxo_view)?; + drop(virtual_read); + self.build_block_template_from_virtual_state(pov_virtual_state, miner_data, txs) + } +} diff --git a/consensus/src/processes/sync/mod.rs b/consensus/src/processes/sync/mod.rs index 8ce2dace04..9650360ec4 100644 --- a/consensus/src/processes/sync/mod.rs +++ b/consensus/src/processes/sync/mod.rs @@ -118,12 +118,12 @@ impl< .expect("because of the pruning rules such block has to exist") } - pub fn create_headers_selected_chain_block_locator(&self, low: Option, high: Option) -> SyncManagerResult> { + /// Returns a logarithmic amount of blocks sampled from the virtual selected chain between `low` and `high`. + /// Expects both blocks to be on the virtual selected chain, otherwise an error is returned + pub fn create_virtual_selected_chain_block_locator(&self, low: Option, high: Option) -> SyncManagerResult> { let low = low.unwrap_or_else(|| self.pruning_point_store.read().get().unwrap().pruning_point); - let high = high.unwrap_or_else(|| self.header_selected_tip_store.read().get().unwrap().hash); - let sc_read = self.selected_chain_store.read(); - + let high = high.unwrap_or_else(|| sc_read.get_tip().unwrap().1); if low == high { return Ok(vec![low]); } @@ -214,10 +214,8 @@ impl< let mut locator = Vec::new(); loop { locator.push(current); - if let Some(limit) = limit { - if locator.len() == limit { - break; - } + if limit == Some(locator.len()) { + break; } let current_gd = self.ghostdag_store.get_compact_data(current).unwrap(); @@ -235,7 +233,7 @@ impl< current_gd.blue_score - step }; - // Walk down currentHash's selected parent chain to the appropriate ancestor + // Walk down current's selected parent chain to the appropriate ancestor current = self.traversal_manager.lowest_chain_block_above_or_equal_to_blue_score(current, next_bs); // Double the distance between included hashes diff --git a/kaspad/src/main.rs b/kaspad/src/main.rs index 233a861da9..d98cb28484 100644 --- a/kaspad/src/main.rs +++ b/kaspad/src/main.rs @@ -27,14 +27,7 @@ use std::path::PathBuf; use std::process::exit; use std::sync::Arc; -// ~~~ -// TODO - discuss handling use args::{Args, Defaults}; -// use clap::Parser; -// ~~~ - -// TODO: testnet 11 tasks: -// coinbase rewards use kaspa_consensus::config::ConfigBuilder; use kaspa_utxoindex::UtxoIndex; diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index b7d9ea3608..5c77eb671c 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -183,7 +183,7 @@ impl FlowContext { ) -> Self { let hub = Hub::new(); - let orphan_resolution_range = BASELINE_ORPHAN_RESOLUTION_RANGE + (config.bps() as f64).log(3.0).min(2.0) as u32; + let orphan_resolution_range = BASELINE_ORPHAN_RESOLUTION_RANGE + (config.bps() as f64).log2().min(3.0) as u32; // The maximum amount of orphans allowed in the orphans pool. This number is an // approximation of how many orphans there can possibly be on average. diff --git a/protocol/flows/src/v5/blockrelay/flow.rs b/protocol/flows/src/v5/blockrelay/flow.rs index 5cc7fc6670..143c2920f2 100644 --- a/protocol/flows/src/v5/blockrelay/flow.rs +++ b/protocol/flows/src/v5/blockrelay/flow.rs @@ -205,7 +205,7 @@ impl HandleRelayInvsFlow { // Add the block to the orphan pool if it's within orphan resolution range. // If the block is indirect it means one of its descendants was already is resolution range, so - // we can save the query. + // we can avoid the query. if is_indirect_inv || self.check_orphan_resolution_range(consensus, block.hash()).await? { let hash = block.hash(); self.ctx.add_orphan(block).await; @@ -224,8 +224,8 @@ impl HandleRelayInvsFlow { /// Finds out whether the given block hash should be retrieved via the unorphaning /// mechanism or via IBD. This method sends a BlockLocator request to the peer with - /// a limit of ORPHAN_RESOLUTION_RANGE. In the response, if we know none of the hashes, - /// we should retrieve the given blockHash via IBD. Otherwise, via unorphaning. + /// a limit of `ctx.orphan_resolution_range`. In the response, if we know none of the hashes, + /// we should retrieve the given block `hash` via IBD. Otherwise, via unorphaning. async fn check_orphan_resolution_range(&mut self, consensus: &ConsensusProxy, hash: Hash) -> Result { self.router .enqueue(make_message!( diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index 42f5d3bd30..cd8cbb0a3e 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -9,7 +9,6 @@ use futures::future::try_join_all; use kaspa_consensus_core::{ api::BlockValidationFuture, block::Block, - blockhash::BlockHashExtensions, header::Header, pruning::{PruningPointProof, PruningPointsList}, BlockHashSet, @@ -101,7 +100,7 @@ impl IbdFlow { IbdType::Sync(highest_known_syncer_chain_hash) => { self.sync_headers( &session, - negotiation_output.syncer_header_selected_tip, + negotiation_output.syncer_virtual_selected_parent, highest_known_syncer_chain_hash, &relay_block, ) @@ -110,7 +109,7 @@ impl IbdFlow { IbdType::DownloadHeadersProof => { drop(session); // Avoid holding the previous consensus throughout the staging IBD let staging = self.ctx.consensus_manager.new_staging_consensus(); - match self.ibd_with_headers_proof(&staging, negotiation_output.syncer_header_selected_tip, &relay_block).await { + match self.ibd_with_headers_proof(&staging, negotiation_output.syncer_virtual_selected_parent, &relay_block).await { Ok(()) => { spawn_blocking(|| staging.commit()).await.unwrap(); self.ctx.on_pruning_point_utxoset_override(); @@ -125,11 +124,11 @@ impl IbdFlow { } } - // Sync missing bodies in the past of syncer selected tip - self.sync_missing_block_bodies(&session, negotiation_output.syncer_header_selected_tip).await?; + // Sync missing bodies in the past of syncer sink (virtual selected parent) + self.sync_missing_block_bodies(&session, negotiation_output.syncer_virtual_selected_parent).await?; // Relay block might be in the anticone of syncer selected tip, thus - // check its chain for missing bodies as well. + // check its past for missing bodies as well. self.sync_missing_block_bodies(&session, relay_block.hash()).await } @@ -139,12 +138,8 @@ impl IbdFlow { relay_header: &Header, highest_known_syncer_chain_hash: Option, ) -> Result { - let Some(pruning_point) = consensus.async_pruning_point().await else { - // TODO: fix when applying staging consensus - return Ok(IbdType::DownloadHeadersProof); - }; - if let Some(highest_known_syncer_chain_hash) = highest_known_syncer_chain_hash { + let pruning_point = consensus.async_pruning_point().await; if consensus.async_is_chain_ancestor_of(pruning_point, highest_known_syncer_chain_hash).await? { // The node is only missing a segment in the future of its current pruning point, and the chains // agree as well, so we perform a simple sync IBD and only download the missing data @@ -155,12 +150,7 @@ impl IbdFlow { // this info should possibly be used to reject the IBD despite having more blue work etc. } - let hst_hash = consensus.async_get_headers_selected_tip().await; - // TODO: remove when applying staging consensus - if hst_hash.is_origin() { - return Ok(IbdType::DownloadHeadersProof); - } - let hst_header = consensus.async_get_header(hst_hash).await.unwrap(); + let hst_header = consensus.async_get_header(consensus.async_get_headers_selected_tip().await).await.unwrap(); if relay_header.blue_score >= hst_header.blue_score + self.ctx.config.pruning_depth && relay_header.blue_work > hst_header.blue_work { @@ -174,7 +164,7 @@ impl IbdFlow { async fn ibd_with_headers_proof( &mut self, staging: &StagingConsensus, - syncer_header_selected_tip: Hash, + syncer_virtual_selected_parent: Hash, relay_block: &Block, ) -> Result<(), ProtocolError> { info!("Starting IBD with headers proof with peer {}", self.router); @@ -182,7 +172,7 @@ impl IbdFlow { let session = staging.session().await; let pruning_point = self.sync_and_validate_pruning_proof(&session).await?; - self.sync_headers(&session, syncer_header_selected_tip, pruning_point, relay_block).await?; + self.sync_headers(&session, syncer_virtual_selected_parent, pruning_point, relay_block).await?; self.validate_staging_timestamps(&self.ctx.consensus().session().await, &session).await?; self.sync_pruning_point_utxoset(&session, pruning_point).await?; Ok(()) @@ -303,7 +293,7 @@ impl IbdFlow { async fn sync_headers( &mut self, consensus: &ConsensusProxy, - syncer_header_selected_tip: Hash, + syncer_virtual_selected_parent: Hash, highest_known_syncer_chain_hash: Hash, relay_block: &Block, ) -> Result<(), ProtocolError> { @@ -315,7 +305,7 @@ impl IbdFlow { Payload::RequestHeaders, RequestHeadersMessage { low_hash: Some(highest_known_syncer_chain_hash.into()), - high_hash: Some(syncer_header_selected_tip.into()) + high_hash: Some(syncer_virtual_selected_parent.into()) } )) .await?; @@ -343,7 +333,7 @@ impl IbdFlow { progress_reporter.report_completion(prev_chunk_len); } - self.sync_missing_relay_past_headers(consensus, syncer_header_selected_tip, relay_block.hash()).await?; + self.sync_missing_relay_past_headers(consensus, syncer_virtual_selected_parent, relay_block.hash()).await?; Ok(()) } @@ -351,7 +341,7 @@ impl IbdFlow { async fn sync_missing_relay_past_headers( &mut self, consensus: &ConsensusProxy, - syncer_header_selected_tip: Hash, + syncer_virtual_selected_parent: Hash, relay_block_hash: Hash, ) -> Result<(), ProtocolError> { // Finished downloading syncer selected tip blocks, @@ -366,7 +356,7 @@ impl IbdFlow { .enqueue(make_message!( Payload::RequestAnticone, RequestAnticoneMessage { - block_hash: Some(syncer_header_selected_tip.into()), + block_hash: Some(syncer_virtual_selected_parent.into()), context_hash: Some(relay_block_hash.into()) } )) diff --git a/protocol/flows/src/v5/ibd/negotiate.rs b/protocol/flows/src/v5/ibd/negotiate.rs index c02c45f9e6..b001d3a061 100644 --- a/protocol/flows/src/v5/ibd/negotiate.rs +++ b/protocol/flows/src/v5/ibd/negotiate.rs @@ -12,7 +12,10 @@ use kaspa_p2p_lib::{ }; pub struct ChainNegotiationOutput { - pub syncer_header_selected_tip: Hash, + // Note: previous version peers (especially golang nodes) might return the headers selected tip here. Nonetheless + // we name here following the currently implemented logic by which the syncer returns the virtual selected parent + // chain on block locator queries + pub syncer_virtual_selected_parent: Hash, pub highest_known_syncer_chain_hash: Option, } @@ -42,7 +45,7 @@ impl IbdFlow { locator_hashes.last().unwrap() ); - let mut syncer_header_selected_tip = locator_hashes[0]; // Syncer header selected tip hash + let mut syncer_virtual_selected_parent = locator_hashes[0]; // Syncer sink (virtual selected parent) let highest_known_syncer_chain_hash: Option; let mut negotiation_restart_counter = 0; let mut negotiation_zoom_counts = 0; @@ -152,13 +155,13 @@ impl IbdFlow { ); initial_locator_len = locator_hashes.len(); - // Reset syncer's header selected tip - syncer_header_selected_tip = locator_hashes[0]; + // Reset syncer's virtual selected parent + syncer_virtual_selected_parent = locator_hashes[0]; } } debug!("Found highest known syncer chain block {:?} from peer {}", highest_known_syncer_chain_hash, self.router); - Ok(ChainNegotiationOutput { syncer_header_selected_tip, highest_known_syncer_chain_hash }) + Ok(ChainNegotiationOutput { syncer_virtual_selected_parent, highest_known_syncer_chain_hash }) } async fn get_syncer_chain_block_locator( diff --git a/protocol/flows/src/v5/request_ibd_chain_block_locator.rs b/protocol/flows/src/v5/request_ibd_chain_block_locator.rs index af0e637cee..8383eb8f74 100644 --- a/protocol/flows/src/v5/request_ibd_chain_block_locator.rs +++ b/protocol/flows/src/v5/request_ibd_chain_block_locator.rs @@ -38,11 +38,12 @@ impl RequestIbdChainBlockLocatorFlow { let (low, high) = msg.try_into()?; let locator = - match (self.ctx.consensus().session().await).async_create_headers_selected_chain_block_locator(low, high).await { + match (self.ctx.consensus().session().await).async_create_virtual_selected_chain_block_locator(low, high).await { Ok(locator) => Ok(locator), Err(e) => { let orig = e.clone(); if let ConsensusError::SyncManagerError(SyncManagerError::BlockNotInSelectedParentChain(_)) = e { + // This signals a reset to the locator zoom-in process. The syncee is expected to restart the search Ok(vec![]) } else { Err(orig) diff --git a/rpc/service/src/service.rs b/rpc/service/src/service.rs index c6ffd18b06..e79021730c 100644 --- a/rpc/service/src/service.rs +++ b/rpc/service/src/service.rs @@ -500,7 +500,7 @@ impl RpcApi for RpcCoreService { self.consensus_converter.get_difficulty_ratio(session.async_get_virtual_bits().await), session.async_get_virtual_past_median_time().await, session.async_get_virtual_parents().await.iter().copied().collect::>(), - session.async_pruning_point().await.unwrap_or_default(), + session.async_pruning_point().await, session.async_get_virtual_daa_score().await, )) } diff --git a/testing/integration/src/integration_tests.rs b/testing/integration/src/integration_tests.rs index 225bf5279b..61bc67ab72 100644 --- a/testing/integration/src/integration_tests.rs +++ b/testing/integration/src/integration_tests.rs @@ -1032,6 +1032,9 @@ async fn json_test(file_path: &str, concurrency: bool) { // Assert that at least one body tip was resolved with valid UTXO assert!(tc.body_tips().iter().copied().any(|h| tc.block_status(h) == BlockStatus::StatusUTXOValid)); + // Assert that the indexed selected chain store matches the virtual chain obtained + // through the reachability iterator + assert_selected_chain_store_matches_virtual_chain(&tc); let virtual_utxos: HashSet = HashSet::from_iter(tc.get_virtual_utxos(None, usize::MAX, false).into_iter().map(|(outpoint, _)| outpoint)); let utxoindex_utxos = utxoindex.read().get_all_outpoints().unwrap(); @@ -1605,6 +1608,8 @@ async fn difficulty_test() { #[tokio::test] async fn selected_chain_test() { + kaspa_core::log::try_init_logger("info"); + let config = ConfigBuilder::new(MAINNET_PARAMS) .skip_proof_of_work() .edit_consensus_params(|p| { @@ -1614,12 +1619,12 @@ async fn selected_chain_test() { let consensus = TestConsensus::new(&config); let wait_handles = consensus.init(); - consensus.add_block_with_parents(1.into(), vec![config.genesis.hash]).await.unwrap(); + consensus.add_utxo_valid_block_with_parents(1.into(), vec![config.genesis.hash], vec![]).await.unwrap(); for i in 2..7 { let hash = i.into(); - consensus.add_block_with_parents(hash, vec![(i - 1).into()]).await.unwrap(); + consensus.add_utxo_valid_block_with_parents(hash, vec![(i - 1).into()], vec![]).await.unwrap(); } - consensus.add_block_with_parents(7.into(), vec![1.into()]).await.unwrap(); // Adding a non chain block shouldn't affect the selected chain store. + consensus.add_utxo_valid_block_with_parents(7.into(), vec![1.into()], vec![]).await.unwrap(); // Adding a non chain block shouldn't affect the selected chain store. assert_eq!(consensus.selected_chain_store.read().get_by_index(0).unwrap(), config.genesis.hash); for i in 1..7 { @@ -1627,10 +1632,10 @@ async fn selected_chain_test() { } assert!(consensus.selected_chain_store.read().get_by_index(7).is_err()); - consensus.add_block_with_parents(8.into(), vec![config.genesis.hash]).await.unwrap(); + consensus.add_utxo_valid_block_with_parents(8.into(), vec![config.genesis.hash], vec![]).await.unwrap(); for i in 9..15 { let hash = i.into(); - consensus.add_block_with_parents(hash, vec![(i - 1).into()]).await.unwrap(); + consensus.add_utxo_valid_block_with_parents(hash, vec![(i - 1).into()], vec![]).await.unwrap(); } assert_eq!(consensus.selected_chain_store.read().get_by_index(0).unwrap(), config.genesis.hash); @@ -1641,18 +1646,34 @@ async fn selected_chain_test() { // We now check a situation where there's a shorter selected chain (3 blocks) with more blue work for i in 15..23 { - consensus.add_block_with_parents(i.into(), vec![config.genesis.hash]).await.unwrap(); + consensus.add_utxo_valid_block_with_parents(i.into(), vec![config.genesis.hash], vec![]).await.unwrap(); } - consensus.add_block_with_parents(23.into(), (15..23).map(|i| i.into()).collect_vec()).await.unwrap(); + consensus.add_utxo_valid_block_with_parents(23.into(), (15..23).map(|i| i.into()).collect_vec(), vec![]).await.unwrap(); assert_eq!(consensus.selected_chain_store.read().get_by_index(0).unwrap(), config.genesis.hash); - assert_eq!(consensus.selected_chain_store.read().get_by_index(1).unwrap(), 22.into()); // We expect 23's selected parent to be 22 because of GHOSTDAG tie breaer rules. + assert_eq!(consensus.selected_chain_store.read().get_by_index(1).unwrap(), 22.into()); // We expect 23's selected parent to be 22 because of GHOSTDAG tie-breaking rules. assert_eq!(consensus.selected_chain_store.read().get_by_index(2).unwrap(), 23.into()); assert!(consensus.selected_chain_store.read().get_by_index(3).is_err()); + assert_selected_chain_store_matches_virtual_chain(&consensus); consensus.shutdown(wait_handles); } +fn assert_selected_chain_store_matches_virtual_chain(consensus: &TestConsensus) { + let pruning_point = consensus.pruning_point(); + let iter1 = selected_chain_store_iterator(consensus, pruning_point); + let iter2 = consensus.reachability_service().backward_chain_iterator(consensus.get_sink(), pruning_point, false); + itertools::assert_equal(iter1, iter2); +} + +fn selected_chain_store_iterator(consensus: &TestConsensus, pruning_point: Hash) -> impl Iterator + '_ { + let selected_chain_read = consensus.selected_chain_store.read(); + let (idx, current) = selected_chain_read.get_tip().unwrap(); + std::iter::once(current) + .chain((0..idx).rev().map(move |i| selected_chain_read.get_by_index(i).unwrap())) + .take_while(move |&h| h != pruning_point) +} + #[tokio::test] async fn staging_consensus_test() { let config = ConfigBuilder::new(MAINNET_PARAMS).build();