Skip to content

Commit

Permalink
Add VID to webserver (#1954)
Browse files Browse the repository at this point in the history
* add vid to webserver

* minor fixes

* fix certificates/votes/tasks

* increase next view timeout for tests

* remove commented code
  • Loading branch information
rob-maron authored Oct 30, 2023
1 parent 0130965 commit 8ceb257
Show file tree
Hide file tree
Showing 12 changed files with 493 additions and 43 deletions.
152 changes: 148 additions & 4 deletions crates/hotshot/src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ struct Inner<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> {
/// Task map for quorum votes.
vote_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for vid votes
vid_vote_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for VID certs
vid_cert_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for VID disperse data
vid_disperse_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for DACs.
dac_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Expand Down Expand Up @@ -169,7 +178,7 @@ impl<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> Inner<M, KEY, TYPES> {
MessagePurpose::DAC => config::get_da_certificate_route(view_number),
MessagePurpose::VidDisperse => config::get_vid_disperse_route(view_number), // like `Proposal`
MessagePurpose::VidVote => config::get_vid_vote_route(view_number, vote_index), // like `Vote`
MessagePurpose::VidCert => config::get_vid_cert_route(view_number), // like `DAC`
MessagePurpose::VidCert => config::get_vid_certificate_route(view_number), // like `DAC`
};

if message_purpose == MessagePurpose::Data {
Expand Down Expand Up @@ -351,8 +360,10 @@ impl<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> Inner<M, KEY, TYPES> {
// TODO ED Should add extra error checking here to make sure we are intending to cancel a task
ConsensusIntentEvent::CancelPollForVotes(event_view)
| ConsensusIntentEvent::CancelPollForProposal(event_view)
| ConsensusIntentEvent::CancelPollForVIDVotes(event_view)
| ConsensusIntentEvent::CancelPollForVIDCertificate(event_view)
| ConsensusIntentEvent::CancelPollForDAC(event_view)
| ConsensusIntentEvent::CancelPollForViewSyncCertificate(event_view)
| ConsensusIntentEvent::CancelPollForVIDDisperse(event_view)
| ConsensusIntentEvent::CancelPollForViewSyncVotes(event_view) => {
if view_number == event_view {
debug!("Shutting down polling task for view {}", event_view);
Expand All @@ -371,7 +382,9 @@ impl<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> Inner<M, KEY, TYPES> {
}
}

_ => unimplemented!(),
_ => {
unimplemented!()
}
}
}
// Nothing on receiving channel
Expand Down Expand Up @@ -528,6 +541,9 @@ impl<
tx_index: Arc::default(),
proposal_task_map: Arc::default(),
vote_task_map: Arc::default(),
vid_vote_task_map: Arc::default(),
vid_cert_task_map: Arc::default(),
vid_disperse_task_map: Arc::default(),
dac_task_map: Arc::default(),
view_sync_cert_task_map: Arc::default(),
view_sync_vote_task_map: Arc::default(),
Expand Down Expand Up @@ -562,7 +578,7 @@ impl<
MessagePurpose::DAC => config::post_da_certificate_route(*view_number),
MessagePurpose::VidVote => config::post_vid_vote_route(*view_number),
MessagePurpose::VidDisperse => config::post_vid_disperse_route(*view_number),
MessagePurpose::VidCert => config::post_vid_cert_route(*view_number),
MessagePurpose::VidCert => config::post_vid_certificate_route(*view_number),
};

let network_msg: SendMsg<M> = SendMsg {
Expand Down Expand Up @@ -822,6 +838,46 @@ impl<
.await;
}
}
ConsensusIntentEvent::PollForVIDDisperse(view_number) => {
// Check if we already have a task for this (we shouldn't)

// Going to do a write lock since mostly likely we will need it - can change to upgradable read in the future
let mut task_map = self.inner.vid_disperse_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
// create new task
let (sender, receiver) = unbounded();
e.insert(sender);

async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::VidDisperse, view_number)
.await
{
error!(
"Background receive VID disperse polling encountered an error: {:?}",
e
);
}
}
});
} else {
error!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
if let Some((_, sender)) = task_map.remove_entry(&view_number.wrapping_sub(2)) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDDisperse(
view_number.wrapping_sub(2),
))
.await;
}
}
ConsensusIntentEvent::PollForCurrentProposal => {
// create new task
let (_, receiver) = unbounded();
Expand Down Expand Up @@ -878,6 +934,44 @@ impl<
.await;
}
}
ConsensusIntentEvent::PollForVIDVotes(view_number) => {
let mut task_map = self.inner.vid_vote_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
// create new task
let (sender, receiver) = unbounded();
e.insert(sender);
async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::VidVote, view_number)
.await
{
error!(
"Background receive proposal polling encountered an error: {:?}",
e
);
}
}
});
} else {
error!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
// TODO ED This won't work for vote collection, last task is more than 2 view ago depending on size of network, will need to rely on cancel task from consensus
if let Some((_, sender)) = task_map.remove_entry(&(view_number.wrapping_sub(2))) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDVotes(
view_number.wrapping_sub(2),
))
.await;
}
}

ConsensusIntentEvent::PollForDAC(view_number) => {
let mut task_map = self.inner.dac_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
Expand Down Expand Up @@ -914,6 +1008,43 @@ impl<
.await;
}
}

ConsensusIntentEvent::PollForVIDCertificate(view_number) => {
let mut task_map = self.inner.vid_cert_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
// create new task
let (sender, receiver) = unbounded();
e.insert(sender);
async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::VidCert, view_number)
.await
{
error!(
"Background receive proposal polling encountered an error: {:?}",
e
);
}
}
});
} else {
error!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
if let Some((_, sender)) = task_map.remove_entry(&(view_number.wrapping_sub(2))) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDCertificate(
view_number.wrapping_sub(2),
))
.await;
}
}
ConsensusIntentEvent::CancelPollForVotes(view_number) => {
let mut task_map = self.inner.vote_task_map.write().await;

Expand All @@ -927,6 +1058,19 @@ impl<
}
}

ConsensusIntentEvent::CancelPollForVIDVotes(view_number) => {
let mut task_map = self.inner.vid_vote_task_map.write().await;

if let Some((_, sender)) = task_map.remove_entry(&(view_number)) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDVotes(view_number))
.await;
}
}

ConsensusIntentEvent::PollForViewSyncCertificate(view_number) => {
let mut task_map = self.inner.view_sync_cert_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
Expand Down
7 changes: 2 additions & 5 deletions crates/task-impls/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,11 +1251,9 @@ where
let view = cert.view_number;
self.vid_certs.insert(view, cert);

// TODO Make sure we aren't voting for an arbitrarily old round for no reason
if self.vote_if_able().await {
self.current_proposal = None;
}
// RM TODO: VOTING
}

HotShotEvent::ViewChange(new_view) => {
debug!("View Change event for view {}", *new_view);

Expand Down Expand Up @@ -1541,7 +1539,6 @@ pub fn consensus_event_filter<TYPES: NodeType, I: NodeImplementation<TYPES>>(
| HotShotEvent::QuorumVoteRecv(_)
| HotShotEvent::QCFormed(_)
| HotShotEvent::DACRecv(_)
| HotShotEvent::VidCertRecv(_)
| HotShotEvent::ViewChange(_)
| HotShotEvent::SendDABlockData(_)
| HotShotEvent::Timeout(_)
Expand Down
6 changes: 1 addition & 5 deletions crates/task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,7 @@ where
}
}
HotShotEvent::DAVoteRecv(vote) => {
// warn!(
// "DA vote recv, Main Task {:?}, key: {:?}",
// vote.current_view,
// self.committee_exchange.public_key()
// );
debug!("DA vote recv, Main Task {:?}", vote.current_view,);
// Check if we are the leader and the vote is from the sender.
let view = vote.current_view;
if !self.committee_exchange.is_leader(view) {
Expand Down
1 change: 1 addition & 0 deletions crates/task-impls/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ where

// TODO (Keyao) Determine and update where to publish VidDisperseSend.
// <https://github.com/EspressoSystems/HotShot/issues/1817>
debug!("publishing VID disperse for view {}", *view + 1);
self.event_stream
.publish(HotShotEvent::VidDisperseSend(
Proposal {
Expand Down
58 changes: 49 additions & 9 deletions crates/task-impls/src/vid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use hotshot_task::{
task::{FilterEvent, HandleEvent, HotShotTaskCompleted, HotShotTaskTypes, TS},
task_impls::{HSTWithEvent, TaskBuilder},
};
use hotshot_types::vote::VoteType;
use hotshot_types::traits::network::CommunicationChannel;
use hotshot_types::traits::network::ConsensusIntentEvent;
use hotshot_types::{
certificate::VIDCertificate, traits::election::SignedCertificate, vote::VIDVoteAccumulator,
};
Expand Down Expand Up @@ -141,13 +142,16 @@ where
{
match event {
HotShotEvent::VidVoteRecv(vote) => {
// TODO copy-pasted from DAVoteRecv https://github.com/EspressoSystems/HotShot/issues/1690
debug!("VID vote recv, collection task {:?}", vote.current_view);
// panic!("Vote handle received VID vote for view {}", *vote.current_view);

debug!("VID vote recv, collection task {:?}", vote.get_view());
// panic!("Vote handle received DA vote for view {}", *vote.current_view);
// For the case where we receive votes after we've made a certificate
if state.accumulator.is_right() {
debug!("VID accumulator finished view: {:?}", state.cur_view);
return (None, state);
}

let accumulator = state.accumulator.left().unwrap();

match state
.vid_exchange
.accumulate_vote(accumulator, &vote, &vote.block_commitment)
Expand All @@ -167,13 +171,19 @@ where
.await;

state.accumulator = Right(vid_cert.clone());
state
.vid_exchange
.network()
.inject_consensus_info(ConsensusIntentEvent::CancelPollForVIDVotes(
*vid_cert.view_number,
))
.await;

// Return completed at this point
return (Some(HotShotTaskCompleted::ShutDown), state);
}
}
}
HotShotEvent::Shutdown => return (Some(HotShotTaskCompleted::ShutDown), state),
_ => {
error!("unexpected event {:?}", event);
}
Expand Down Expand Up @@ -206,12 +216,10 @@ where
) -> Option<HotShotTaskCompleted> {
match event {
HotShotEvent::VidVoteRecv(vote) => {
// TODO copy-pasted from DAVoteRecv https://github.com/EspressoSystems/HotShot/issues/1690

// warn!(
// "VID vote recv, Main Task {:?}, key: {:?}",
// vote.current_view,
// self.vid_exchange.public_key()
// self.committee_exchange.public_key()
// );
// Check if we are the leader and the vote is from the sender.
let view = vote.current_view;
Expand Down Expand Up @@ -361,6 +369,9 @@ where
}
}
}
HotShotEvent::VidCertRecv(_) => {
// RM TODO
}
HotShotEvent::ViewChange(view) => {
if *self.cur_view >= *view {
return None;
Expand All @@ -371,6 +382,35 @@ where
}
self.cur_view = view;

// Start polling for VID disperse for the new view
self.vid_exchange
.network()
.inject_consensus_info(ConsensusIntentEvent::PollForVIDDisperse(
*self.cur_view + 1,
))
.await;

self.vid_exchange
.network()
.inject_consensus_info(ConsensusIntentEvent::PollForVIDCertificate(
*self.cur_view + 1,
))
.await;

// If we are not the next leader, we should exit
if !self.vid_exchange.is_leader(self.cur_view + 1) {
// panic!("We are not the DA leader for view {}", *self.cur_view + 1);
return None;
}

// Start polling for VID votes for the "next view"
self.vid_exchange
.network()
.inject_consensus_info(ConsensusIntentEvent::PollForVIDVotes(
*self.cur_view + 1,
))
.await;

return None;
}

Expand Down
Loading

0 comments on commit 8ceb257

Please sign in to comment.