Skip to content

Commit

Permalink
Fix sync deadlock by avoiding async PotVerifier API with `tokio::task…
Browse files Browse the repository at this point in the history
…::spawn_blocking` entirely
  • Loading branch information
nazar-pc committed Oct 14, 2023
1 parent 5080f3a commit 6bbfb33
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 406 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 8 additions & 12 deletions crates/sc-consensus-subspace/src/slot_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,16 +324,12 @@ where
};

// Ensure proof of time is valid according to parent block
if !self
.pot_verifier
.is_output_valid(
pot_input,
Slot::from(u64::from(slot) - u64::from(parent_slot)),
proof_of_time,
parent_pot_parameters.next_parameters_change(),
)
.await
{
if !self.pot_verifier.is_output_valid(
pot_input,
Slot::from(u64::from(slot) - u64::from(parent_slot)),
proof_of_time,
parent_pot_parameters.next_parameters_change(),
) {
warn!(
target: "subspace",
"Proof of time is invalid, skipping block authoring at slot {slot:?}"
Expand Down Expand Up @@ -363,11 +359,11 @@ where

for slot in *parent_future_slot + 1..=*future_slot {
let slot = Slot::from(slot);
let maybe_slot_checkpoints_fut = self.pot_verifier.get_checkpoints(
let maybe_slot_checkpoints = self.pot_verifier.get_checkpoints(
checkpoints_pot_input.slot_iterations,
checkpoints_pot_input.seed,
);
let Some(slot_checkpoints) = maybe_slot_checkpoints_fut.await else {
let Some(slot_checkpoints) = maybe_slot_checkpoints else {
warn!("Proving failed during block authoring");
return None;
};
Expand Down
27 changes: 7 additions & 20 deletions crates/sc-consensus-subspace/src/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
use crate::Error;
use futures::lock::Mutex;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use log::{debug, info, trace, warn};
use rand::prelude::*;
use sc_client_api::backend::AuxStore;
Expand Down Expand Up @@ -309,14 +307,15 @@ where

// All checkpoints must be valid, at least according to the seed included in
// justifications
let verification_results = FuturesUnordered::new();
for checkpoints in &checkpoints {
if full_pot_verification {
verification_results.push(self.pot_verifier.verify_checkpoints(
seed,
slot_iterations,
checkpoints,
));
// Try to find invalid checkpoints
if !self
.pot_verifier
.verify_checkpoints(seed, slot_iterations, checkpoints)
{
return Err(VerificationError::InvalidProofOfTime);
}
} else {
// We inject verified checkpoints in order to avoid full proving when votes
// included in the block will inevitably be verified during block execution
Expand All @@ -340,18 +339,6 @@ where
slot_iterations = pot_input.slot_iterations;
seed = pot_input.seed;
}
// Try to find invalid checkpoints
if verification_results
// TODO: Ideally we'd use `find` here instead, but it does not yet exist:
// https://github.com/rust-lang/futures-rs/issues/2705
.filter(|&success| async move { !success })
.boxed()
.next()
.await
.is_some()
{
return Err(VerificationError::InvalidProofOfTime);
}
}

// Verify that block is signed properly
Expand Down
2 changes: 0 additions & 2 deletions crates/sc-proof-of-time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ include = [
]

[dependencies]
async-lock = "2.8.0"
atomic = "0.5.3"
core_affinity = "0.8.1"
derive_more = "0.99.17"
Expand All @@ -33,5 +32,4 @@ subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primiti
subspace-proof-of-time = { version = "0.1.0", path = "../subspace-proof-of-time" }
parking_lot = "0.12.1"
rayon = "1.8.0"
tokio = { version = "1.32.0", features = ["macros", "time"] }
tracing = "0.1.37"
35 changes: 18 additions & 17 deletions crates/sc-proof-of-time/src/source/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::hash::{Hash, Hasher};
use std::num::{NonZeroU32, NonZeroUsize};
use std::sync::{atomic, Arc};
use subspace_core_primitives::{PotCheckpoints, PotSeed, SlotNumber};
use tokio::runtime::Handle;
use tracing::{debug, error, trace, warn};

/// How many slots can proof be before it is too far
Expand Down Expand Up @@ -280,11 +279,11 @@ where
}
}

if self
.pot_verifier
.verify_checkpoints(proof.seed, proof.slot_iterations, &proof.checkpoints)
.await
{
if self.pot_verifier.verify_checkpoints(
proof.seed,
proof.slot_iterations,
&proof.checkpoints,
) {
debug!(%sender, slot = %proof.slot, "Full verification succeeded");

self.engine
Expand Down Expand Up @@ -385,27 +384,26 @@ where
}

// Avoid blocking gossip for too long
let handle = Handle::current();
rayon::spawn({
let engine = Arc::clone(&self.engine);
let pot_verifier = self.pot_verifier.clone();
let from_gossip_sender = self.from_gossip_sender.clone();
let topic = self.topic;

move || {
handle.block_on(Self::handle_potentially_matching_proofs(
Self::handle_potentially_matching_proofs(
next_slot_input,
potentially_matching_proofs,
engine,
&pot_verifier,
from_gossip_sender,
topic,
));
);
}
});
}

async fn handle_potentially_matching_proofs(
fn handle_potentially_matching_proofs(
next_slot_input: PotNextSlotInput,
potentially_matching_proofs: Vec<(GossipProof, Vec<PeerId>)>,
engine: Arc<Mutex<GossipEngine<Block>>>,
Expand All @@ -425,10 +423,11 @@ where

// Verify all proofs
for (proof, _senders) in &potentially_matching_proofs {
if pot_verifier
.verify_checkpoints(proof.seed, proof.slot_iterations, &proof.checkpoints)
.await
{
if pot_verifier.verify_checkpoints(
proof.seed,
proof.slot_iterations,
&proof.checkpoints,
) {
correct_proof.replace(*proof);
break;
}
Expand Down Expand Up @@ -468,15 +467,17 @@ where
}
sent = true;

if let Err(error) = from_gossip_sender.send((sender, proof)).await {
engine.lock().gossip_message(topic, proof.encode(), false);

if let Err(error) =
futures::executor::block_on(from_gossip_sender.send((sender, proof)))
{
warn!(
%error,
slot = %proof.slot,
"Failed to send future proof",
);
}

engine.lock().gossip_message(topic, proof.encode(), false);
}
} else {
let engine = engine.lock();
Expand Down
Loading

0 comments on commit 6bbfb33

Please sign in to comment.