Skip to content

Commit

Permalink
Merge pull request #1591 from Oscar-Pepper/revisit_update_scan_ranges
Browse files Browse the repository at this point in the history
Revisit update scan ranges
  • Loading branch information
Oscar-Pepper authored Dec 20, 2024
2 parents f9f3e5f + 08e8e0e commit 8745a0b
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 87 deletions.
5 changes: 4 additions & 1 deletion zingo-sync/src/scan/compact_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ use super::{

mod runners;

// TODO: move parameters to config module
const TRIAL_DECRYPT_TASK_SIZE: usize = 500;

pub(crate) fn scan_compact_blocks<P>(
compact_blocks: Vec<CompactBlock>,
parameters: &P,
Expand Down Expand Up @@ -147,7 +150,7 @@ fn trial_decrypt<P>(
where
P: Parameters + Send + 'static,
{
let mut runners = BatchRunners::<(), ()>::for_keys(100, scanning_keys);
let mut runners = BatchRunners::<(), ()>::for_keys(TRIAL_DECRYPT_TASK_SIZE, scanning_keys);
for block in compact_blocks {
runners.add_block(parameters, block.clone()).unwrap();
}
Expand Down
44 changes: 20 additions & 24 deletions zingo-sync/src/scan/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,11 @@ pub(crate) enum ScannerState {
}

impl ScannerState {
pub(crate) fn verify(&mut self) {
if let ScannerState::Verification = *self {
*self = ScannerState::Scan
} else {
panic!(
"ScanState is not Verification variant. Verification should only complete once!"
);
}
fn verified(&mut self) {
*self = ScannerState::Scan
}

fn shutdown(&mut self) {
fn scan_completed(&mut self) {
*self = ScannerState::Shutdown
}
}
Expand Down Expand Up @@ -86,10 +80,6 @@ where
}
}

pub(crate) fn state_mut(&mut self) -> &mut ScannerState {
&mut self.state
}

pub(crate) fn worker_poolsize(&self) -> usize {
self.workers.len()
}
Expand Down Expand Up @@ -158,21 +148,27 @@ where
{
match self.state {
ScannerState::Verification => {
if !wallet
.get_sync_state()
.unwrap()
let sync_state = wallet.get_sync_state().unwrap();
if !sync_state
.scan_ranges()
.iter()
.any(|scan_range| scan_range.priority() == ScanPriority::Verify)
{
// under these conditions the `Verify` scan range is currently being scanned.
// the reason why the logic looks for no `Verify` ranges in the sync state is because it is set to `Ignored`
// during scanning.
// if we were to continue to add new tasks and a re-org had occured the sync state would be unrecoverable.
return;
if sync_state
.scan_ranges()
.iter()
.any(|scan_range| scan_range.priority() == ScanPriority::Ignored)
{
// the last scan ranges with `Verify` priority are currently being scanned.
return;
} else {
// verification complete
self.state.verified();
return;
}
}

// scan the range with `Verify` priority
// scan ranges with `Verify` priority
if let Some(worker) = self.idle_worker() {
let scan_task = sync::state::create_scan_task(wallet)
.unwrap()
Expand All @@ -188,13 +184,13 @@ where
if let Some(scan_task) = sync::state::create_scan_task(wallet).unwrap() {
worker.add_scan_task(scan_task).unwrap();
} else if wallet.get_sync_state().unwrap().scan_complete() {
self.state.shutdown();
self.state.scan_completed();
}
}
}
ScannerState::Shutdown => {
// shutdown mempool
shutdown_mempool.store(true, atomic::Ordering::Relaxed);
shutdown_mempool.store(true, atomic::Ordering::Release);

// shutdown idle workers
while let Some(worker) = self.idle_worker() {
Expand Down
36 changes: 17 additions & 19 deletions zingo-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::error::SyncError;
use crate::keys::transparent::TransparentAddressId;
use crate::primitives::{NullifierMap, OutPointMap};
use crate::scan::error::{ContinuityError, ScanError};
use crate::scan::task::{Scanner, ScannerState};
use crate::scan::task::Scanner;
use crate::scan::transactions::scan_transaction;
use crate::scan::{DecryptedNoteData, ScanResults};
use crate::traits::{
Expand All @@ -33,8 +33,9 @@ pub(crate) mod spend;
pub(crate) mod state;
pub(crate) mod transparent;

// TODO: move parameters to config module
// TODO; replace fixed batches with orchard shard ranges (block ranges containing all note commitments to an orchard shard or fragment of a shard)
const BATCH_SIZE: u32 = 1_000;
const BATCH_SIZE: u32 = 5_000;
const VERIFY_BLOCK_RANGE_SIZE: u32 = 10;
const MAX_VERIFICATION_WINDOW: u32 = 100; // TODO: fail if re-org goes beyond this window

Expand All @@ -52,11 +53,16 @@ where

// create channel for sending fetch requests and launch fetcher task
let (fetch_request_sender, fetch_request_receiver) = mpsc::unbounded_channel();
let fetcher_handle = tokio::spawn(client::fetch::fetch(
fetch_request_receiver,
client.clone(),
consensus_parameters.clone(),
));
let client_clone = client.clone();
let consensus_parameters_clone = consensus_parameters.clone();
let fetcher_handle = tokio::spawn(async move {
client::fetch::fetch(
fetch_request_receiver,
client_clone,
consensus_parameters_clone,
)
.await
});

let wallet_height = state::get_wallet_height(consensus_parameters, wallet).unwrap();
let chain_height = client::get_chain_height(fetch_request_sender.clone())
Expand All @@ -77,11 +83,9 @@ where
let (mempool_transaction_sender, mut mempool_transaction_receiver) = mpsc::channel(10);
let shutdown_mempool = Arc::new(AtomicBool::new(false));
let shutdown_mempool_clone = shutdown_mempool.clone();
let mempool_handle = tokio::spawn(mempool_monitor(
client,
mempool_transaction_sender,
shutdown_mempool_clone,
));
let mempool_handle = tokio::spawn(async move {
mempool_monitor(client, mempool_transaction_sender, shutdown_mempool_clone).await
});

transparent::update_addresses_and_locators(
consensus_parameters,
Expand Down Expand Up @@ -126,7 +130,6 @@ where
&ufvks,
scan_range,
scan_results,
scanner.state_mut(),
)
.await
.unwrap();
Expand Down Expand Up @@ -189,18 +192,13 @@ async fn process_scan_results<P, W>(
ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
scan_range: ScanRange,
scan_results: Result<ScanResults, ScanError>,
scanner_state: &mut ScannerState,
) -> Result<(), SyncError>
where
P: consensus::Parameters,
W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
{
match scan_results {
Ok(results) => {
if scan_range.priority() == ScanPriority::Verify {
scanner_state.verify();
}

update_wallet_data(wallet, results).unwrap();
spend::update_transparent_spends(wallet).unwrap();
spend::update_shielded_spends(
Expand Down Expand Up @@ -445,7 +443,7 @@ async fn mempool_monitor(
.await
.unwrap();
loop {
if shutdown_mempool.load(atomic::Ordering::Relaxed) {
if shutdown_mempool.load(atomic::Ordering::Acquire) {
break;
}

Expand Down
Loading

0 comments on commit 8745a0b

Please sign in to comment.