Skip to content

Commit

Permalink
changes from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
AloeareV committed Jan 20, 2025
1 parent b0a289f commit a7753d8
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 71 deletions.
2 changes: 1 addition & 1 deletion integration-tests/tests/wallet_to_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ mod wallet_basic {
)
.await
.unwrap();


dbg!(unfinalised_transactions.clone());

Expand Down
6 changes: 1 addition & 5 deletions zaino-state/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,7 @@ impl ZcashIndexer for FetchServiceSubscriber {
) -> Result<GetSubtrees, Self::Error> {
Ok(self
.fetcher
.get_subtrees_by_index(
pool,
start_index.0,
limit.map(|limit_index| limit_index.0),
)
.get_subtrees_by_index(pool, start_index.0, limit.map(|limit_index| limit_index.0))
.await?
.into())
}
Expand Down
130 changes: 68 additions & 62 deletions zaino-state/src/local_cache/finalised_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,29 @@ impl FinalisedState {
};

finalised_state.sync_db_from_reorg().await?;

finalised_state.write_task_handle =
Some(finalised_state.spawn_writer(block_receiver).await?);

finalised_state.read_task_handle = Some(finalised_state.spawn_reader(request_rx).await?);
finalised_state.spawn_writer(block_receiver).await?;
finalised_state.spawn_reader(request_rx).await?;

finalised_state.status.store(StatusType::Ready.into());

Ok(finalised_state)
}

async fn spawn_writer(
&self,
&mut self,
mut block_receiver: tokio::sync::mpsc::Receiver<(Height, Hash, CompactBlock)>,
) -> Result<tokio::task::JoinHandle<()>, FinalisedStateError> {
let finalised_state = self.clone();
) -> Result<(), FinalisedStateError> {
let finalised_state = Self {
fetcher: self.fetcher.clone(),
database: Arc::clone(&self.database),
heights_to_hashes: self.heights_to_hashes,
hashes_to_blocks: self.hashes_to_blocks,
request_sender: self.request_sender.clone(),
read_task_handle: None,
write_task_handle: None,
status: self.status.clone(),
config: self.config.clone(),
};

let writer_handle = tokio::spawn(async move {
while let Some((height, mut hash, mut compact_block)) = block_receiver.recv().await {
Expand Down Expand Up @@ -262,14 +269,25 @@ impl FinalisedState {
}
});

Ok(writer_handle)
self.write_task_handle = Some(writer_handle);
Ok(())
}

async fn spawn_reader(
&self,
&mut self,
mut request_receiver: tokio::sync::mpsc::Receiver<DbRequest>,
) -> Result<tokio::task::JoinHandle<()>, FinalisedStateError> {
let finalised_state = self.clone();
) -> Result<(), FinalisedStateError> {
let finalised_state = Self {
fetcher: self.fetcher.clone(),
database: Arc::clone(&self.database),
heights_to_hashes: self.heights_to_hashes,
hashes_to_blocks: self.hashes_to_blocks,
request_sender: self.request_sender.clone(),
read_task_handle: None,
write_task_handle: None,
status: self.status.clone(),
config: self.config.clone(),
};

let reader_handle = tokio::spawn(async move {
while let Some(DbRequest {
Expand All @@ -291,7 +309,8 @@ impl FinalisedState {
}
});

Ok(reader_handle)
self.read_task_handle = Some(reader_handle);
Ok(())
}

/// Syncs database with the server,
Expand Down Expand Up @@ -396,45 +415,48 @@ impl FinalisedState {
// Wait for server to sync to with p2p network and sync new blocks.
if !self.config.network.is_regtest() && !self.config.no_sync {
self.status.store(StatusType::Syncing.into());
loop {
let blockchain_info = self.fetcher.get_blockchain_info().await?;
let server_height = blockchain_info.blocks.0;
for block_height in (sync_height + 1)..(server_height - 99) {
if self.get_hash(block_height).is_ok() {
self.delete_block(Height(block_height))?;
}
loop {
match fetch_block_from_node(
&self.fetcher,
HashOrHeight::Height(Height(block_height)),
)
.await
{
Ok((hash, block)) => {
self.insert_block((Height(block_height), hash, block))?;
break;
}
Err(e) => {
self.status.store(StatusType::RecoverableError.into());
eprintln!("{e}");
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
loop {
let blockchain_info = self.fetcher.get_blockchain_info().await?;
let server_height = blockchain_info.blocks.0;
for block_height in (sync_height + 1)..(server_height - 99) {
if self.get_hash(block_height).is_ok() {
self.delete_block(Height(block_height))?;
}
loop {
match fetch_block_from_node(
&self.fetcher,
HashOrHeight::Height(Height(block_height)),
)
.await
{
Ok((hash, block)) => {
self.insert_block((Height(block_height), hash, block))?;
break;
}
Err(e) => {
self.status.store(StatusType::RecoverableError.into());
eprintln!("{e}");
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
}
sync_height = server_height - 99;
if (blockchain_info.blocks.0 as i64 - blockchain_info.estimated_height.0 as i64).abs() <= 10 {
break;
} else {
println!(" - Validator syncing with network. ZainoDB chain height: {}, Validator chain height: {}, Estimated Network chain height: {}",
}
sync_height = server_height - 99;
if (blockchain_info.blocks.0 as i64 - blockchain_info.estimated_height.0 as i64)
.abs()
<= 10
{
break;
} else {
println!(" - Validator syncing with network. ZainoDB chain height: {}, Validator chain height: {}, Estimated Network chain height: {}",
&sync_height,
&blockchain_info.blocks.0,
&blockchain_info.estimated_height.0
&blockchain_info.blocks.0,
&blockchain_info.estimated_height.0
);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
}
}
}

Ok(())
Expand Down Expand Up @@ -588,22 +610,6 @@ impl Drop for FinalisedState {
}
}

impl Clone for FinalisedState {
fn clone(&self) -> Self {
Self {
fetcher: self.fetcher.clone(),
database: Arc::clone(&self.database),
heights_to_hashes: self.heights_to_hashes,
hashes_to_blocks: self.hashes_to_blocks,
request_sender: self.request_sender.clone(),
read_task_handle: None,
write_task_handle: None,
status: self.status.clone(),
config: self.config.clone(),
}
}
}

/// A subscriber to a [`NonFinalisedState`].
#[derive(Debug, Clone)]
pub struct FinalisedStateSubscriber {
Expand Down
9 changes: 6 additions & 3 deletions zaino-state/src/local_cache/non_finalised_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,15 @@ impl NonFinalisedState {
non_finalised_state.status.store(StatusType::Syncing.into());
loop {
let blockchain_info = fetcher.get_blockchain_info().await?;
if (blockchain_info.blocks.0 as i64 - blockchain_info.estimated_height.0 as i64).abs() <= 10 {
if (blockchain_info.blocks.0 as i64 - blockchain_info.estimated_height.0 as i64)
.abs()
<= 10
{
break;
} else {
println!(" - Validator syncing with network. Validator chain height: {}, Estimated Network chain height: {}",
&blockchain_info.blocks.0,
&blockchain_info.estimated_height.0
&blockchain_info.blocks.0,
&blockchain_info.estimated_height.0
);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
Expand Down

0 comments on commit a7753d8

Please sign in to comment.