Skip to content

Commit

Permalink
fix(zcoin): syncing and activation improvements (#2089)
Browse files Browse the repository at this point in the history
This commit does the following:
* Stops sending sync status after main sync state (main improvement).
* Removes first_sync_block from SyncStatus and stop sending first_sync_block  while activation is in progress.
* include first_sync_block in Activation Result only after activation is completed.
  • Loading branch information
borngraced authored Apr 9, 2024
1 parent ffe761e commit 92199d8
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 32 deletions.
5 changes: 5 additions & 0 deletions mm2src/coins/z_coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ impl ZCoin {
.await
}

#[inline]
pub async fn first_sync_block(&self) -> Result<FirstSyncBlock, MmError<BlockchainScanStopped>> {
self.z_fields.sync_state_connector.lock().await.first_sync_block().await
}

#[inline]
fn secp_keypair(&self) -> &KeyPair {
self.utxo_arc
Expand Down
60 changes: 41 additions & 19 deletions mm2src/coins/z_coin/z_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,29 +552,31 @@ pub(super) async fn init_light_client<'a>(
blocks_db.rewind_to_height(u32::MIN.into()).await?;
};

let first_sync_block = FirstSyncBlock {
requested: sync_height,
is_pre_sapling: sync_height < sapling_activation_height,
actual: sync_height.max(sapling_activation_height),
};
let sync_handle = SaplingSyncLoopHandle {
coin,
current_block: BlockHeight::from_u32(0),
blocks_db,
wallet_db: wallet_db.clone(),
consensus_params: builder.protocol_info.consensus_params.clone(),
sync_status_notifier,
main_sync_state_finished: false,
on_tx_gen_watcher,
watch_for_tx: None,
scan_blocks_per_iteration: builder.z_coin_params.scan_blocks_per_iteration,
scan_interval_ms: builder.z_coin_params.scan_interval_ms,
first_sync_block: FirstSyncBlock {
requested: sync_height,
is_pre_sapling: sync_height < sapling_activation_height,
actual: sync_height.max(sapling_activation_height),
},
first_sync_block: first_sync_block.clone(),
z_balance_event_sender,
};

let abort_handle = spawn_abortable(light_wallet_db_sync_loop(sync_handle, Box::new(light_rpc_clients)));

Ok((
SaplingSyncConnector::new_mutex_wrapped(sync_watcher, on_tx_gen_notifier, abort_handle),
SaplingSyncConnector::new_mutex_wrapped(sync_watcher, on_tx_gen_notifier, abort_handle, first_sync_block),
wallet_db,
))
}
Expand All @@ -590,6 +592,7 @@ pub(super) async fn init_native_client<'a>(
let coin = builder.ticker.to_string();
let (sync_status_notifier, sync_watcher) = channel(1);
let (on_tx_gen_notifier, on_tx_gen_watcher) = channel(1);

let checkpoint_block = builder.protocol_info.check_point_block.clone();
let sapling_height = builder.protocol_info.consensus_params.sapling_activation_height;
let checkpoint_height = checkpoint_block.clone().map(|b| b.height).unwrap_or(sapling_height) as u64;
Expand All @@ -609,17 +612,18 @@ pub(super) async fn init_native_client<'a>(
wallet_db: wallet_db.clone(),
consensus_params: builder.protocol_info.consensus_params.clone(),
sync_status_notifier,
main_sync_state_finished: false,
on_tx_gen_watcher,
watch_for_tx: None,
scan_blocks_per_iteration: builder.z_coin_params.scan_blocks_per_iteration,
scan_interval_ms: builder.z_coin_params.scan_interval_ms,
first_sync_block,
first_sync_block: first_sync_block.clone(),
z_balance_event_sender,
};
let abort_handle = spawn_abortable(light_wallet_db_sync_loop(sync_handle, Box::new(native_client)));

Ok((
SaplingSyncConnector::new_mutex_wrapped(sync_watcher, on_tx_gen_notifier, abort_handle),
SaplingSyncConnector::new_mutex_wrapped(sync_watcher, on_tx_gen_notifier, abort_handle, first_sync_block),
wallet_db,
))
}
Expand Down Expand Up @@ -661,20 +665,18 @@ impl SaplingSyncRespawnGuard {
/// - `TemporaryError(String)`: Represents a temporary error state, with an associated error message
/// providing details about the error.
/// - `Finishing`: Represents the finishing state of an operation.
#[derive(Debug)]
pub enum SyncStatus {
UpdatingBlocksCache {
first_sync_block: FirstSyncBlock,
current_scanned_block: u64,
latest_block: u64,
},
BuildingWalletDb {
first_sync_block: FirstSyncBlock,
current_scanned_block: u64,
latest_block: u64,
},
TemporaryError(String),
Finished {
first_sync_block: FirstSyncBlock,
block_number: u64,
},
}
Expand Down Expand Up @@ -706,6 +708,8 @@ pub struct SaplingSyncLoopHandle {
consensus_params: ZcoinConsensusParams,
/// Notifies about sync status without stopping the loop, e.g. on coin activation
sync_status_notifier: AsyncSender<SyncStatus>,
/// Signal to determine if main sync state is finished.
main_sync_state_finished: bool,
/// If new tx is required to be generated, we stop the sync and respawn it after tx is sent
/// This watcher waits for such notification
on_tx_gen_watcher: AsyncReceiver<OneshotSender<(Self, Box<dyn ZRpcOps>)>>,
Expand All @@ -717,39 +721,49 @@ pub struct SaplingSyncLoopHandle {
}

impl SaplingSyncLoopHandle {
fn first_sync_block(&self) -> FirstSyncBlock { self.first_sync_block.clone() }

#[inline]
fn notify_blocks_cache_status(&mut self, current_scanned_block: u64, latest_block: u64) {
if self.main_sync_state_finished {
return;
}
self.sync_status_notifier
.try_send(SyncStatus::UpdatingBlocksCache {
current_scanned_block,
latest_block,
first_sync_block: self.first_sync_block(),
})
.debug_log_with_msg("No one seems interested in SyncStatus");
}

fn notify_building_wallet_db(&mut self, current_scanned_block: u64, latest_block: u64) {
if self.main_sync_state_finished {
return;
}
self.sync_status_notifier
.try_send(SyncStatus::BuildingWalletDb {
current_scanned_block,
latest_block,
first_sync_block: self.first_sync_block(),
})
.debug_log_with_msg("No one seems interested in SyncStatus");
}

fn notify_on_error(&mut self, error: String) {
if self.main_sync_state_finished {
return;
}
self.sync_status_notifier
.try_send(SyncStatus::TemporaryError(error))
.debug_log_with_msg("No one seems interested in SyncStatus");
}

fn notify_sync_finished(&mut self) {
if self.main_sync_state_finished {
return;
} else {
self.main_sync_state_finished = true
}
self.sync_status_notifier
.try_send(SyncStatus::Finished {
block_number: self.current_block.into(),
first_sync_block: self.first_sync_block(),
})
.debug_log_with_msg("No one seems interested in SyncStatus");
}
Expand Down Expand Up @@ -781,7 +795,7 @@ impl SaplingSyncLoopHandle {
/// For more notes on the process, check https://github.com/zcash/librustzcash/blob/master/zcash_client_backend/src/data_api/chain.rs#L2
async fn scan_validate_and_update_blocks(&mut self) -> Result<(), MmError<ZcoinStorageError>> {
let blocks_db = self.blocks_db.clone();
let wallet_db = self.wallet_db.clone().db;
let wallet_db = self.wallet_db.db.clone();
let mut wallet_ops = wallet_db.get_update_ops().expect("get_update_ops always returns Ok");

if let Err(e) = blocks_db
Expand Down Expand Up @@ -929,23 +943,31 @@ pub(super) struct SaplingSyncConnector {
sync_watcher: SyncWatcher,
on_tx_gen_notifier: NewTxNotifier,
abort_handle: Arc<Mutex<AbortOnDropHandle>>,
first_sync_block: FirstSyncBlock,
}

impl SaplingSyncConnector {
#[allow(unused)]
#[inline]
pub(super) fn new_mutex_wrapped(
simple_sync_watcher: SyncWatcher,
sync_watcher: SyncWatcher,
on_tx_gen_notifier: NewTxNotifier,
abort_handle: AbortOnDropHandle,
first_sync_block: FirstSyncBlock,
) -> AsyncMutex<Self> {
AsyncMutex::new(SaplingSyncConnector {
sync_watcher: simple_sync_watcher,
sync_watcher,
on_tx_gen_notifier,
abort_handle: Arc::new(Mutex::new(abort_handle)),
first_sync_block,
})
}

#[inline]
pub(super) async fn first_sync_block(&self) -> Result<FirstSyncBlock, MmError<BlockchainScanStopped>> {
Ok(self.first_sync_block.clone())
}

#[inline]
pub(super) async fn current_sync_status(&mut self) -> Result<SyncStatus, MmError<BlockchainScanStopped>> {
self.sync_watcher.next().await.or_mm_err(|| BlockchainScanStopped {})
Expand Down
15 changes: 2 additions & 13 deletions mm2src/coins_activation/src/z_coin_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct ZcoinActivationResult {
pub ticker: String,
pub current_block: u64,
pub wallet_balance: CoinBalanceReport,
pub first_sync_block: Option<FirstSyncBlock>,
pub first_sync_block: FirstSyncBlock,
}

impl CurrentBlock for ZcoinActivationResult {
Expand Down Expand Up @@ -77,12 +77,10 @@ impl GetAddressesBalances for ZcoinActivationResult {
pub enum ZcoinInProgressStatus {
ActivatingCoin,
UpdatingBlocksCache {
first_sync_block: FirstSyncBlock,
current_scanned_block: u64,
latest_block: u64,
},
BuildingWalletDb {
first_sync_block: FirstSyncBlock,
current_scanned_block: u64,
latest_block: u64,
},
Expand Down Expand Up @@ -247,20 +245,16 @@ impl InitStandaloneCoinActivationOps for ZCoin {
loop {
let in_progress_status = match coin.sync_status().await? {
SyncStatus::UpdatingBlocksCache {
first_sync_block,
current_scanned_block,
latest_block,
} => ZcoinInProgressStatus::UpdatingBlocksCache {
first_sync_block,
current_scanned_block,
latest_block,
},
SyncStatus::BuildingWalletDb {
first_sync_block,
current_scanned_block,
latest_block,
} => ZcoinInProgressStatus::BuildingWalletDb {
first_sync_block,
current_scanned_block,
latest_block,
},
Expand All @@ -287,12 +281,7 @@ impl InitStandaloneCoinActivationOps for ZCoin {
.map_to_mm(ZcoinInitError::CouldNotGetBlockCount)?;

let balance = self.my_balance().compat().await?;
let first_sync_block = match self.sync_status().await? {
SyncStatus::Finished { first_sync_block, .. }
| SyncStatus::BuildingWalletDb { first_sync_block, .. }
| SyncStatus::UpdatingBlocksCache { first_sync_block, .. } => Some(first_sync_block),
_ => None,
};
let first_sync_block = self.first_sync_block().await?;

Ok(ZcoinActivationResult {
ticker: self.ticker().into(),
Expand Down

0 comments on commit 92199d8

Please sign in to comment.