Skip to content

Commit

Permalink
external message handler
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Jul 22, 2024
1 parent 5db2dc7 commit a599c5d
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 84 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ ark-ff = "0.4"
ark-poly = "0.4"
ark-serialize = "0.4"
ark-srs = "0.3.1"
async-compatibility-layer = { version = "1.1", default-features = false, features = [
async-compatibility-layer = { version = "1.2.1", default-features = false, features = [
"logging-utils",
] }
async-once-cell = "0.5"
Expand Down
1 change: 1 addition & 0 deletions builder/src/bin/permissioned-builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ async fn main() -> anyhow::Result<()> {
private_staking_key: private_staking_key.clone(),
private_state_key,
state_peers: opt.state_peers,
public_api_url: None,
config_peers: None,
catchup_backoff: Default::default(),
};
Expand Down
41 changes: 21 additions & 20 deletions sequencer/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures::{
};
use hotshot::{
traits::election::static_committee::GeneralStaticCommittee,
types::{Event, SystemContextHandle},
types::{Event, EventType, SystemContextHandle},
Memberships, SystemContext,
};
use hotshot_events_service::events_source::{EventConsumer, EventsStreamer};
Expand All @@ -31,8 +31,9 @@ use url::Url;
use vbs::version::StaticVersionType;

use crate::{
roll_call::RollCall, state_signature::StateSigner, static_stake_table_commitment, Node,
SeqTypes,
external_event_handler::{self, ExternalEventHandler},
state_signature::StateSigner,
static_stake_table_commitment, Node, SeqTypes,
};
/// The consensus handle
pub type Consensus<N, P> = SystemContextHandle<SeqTypes, Node<N, P>>;
Expand All @@ -52,10 +53,6 @@ pub struct SequencerContext<
/// Context for generating state signatures.
state_signer: Arc<StateSigner<Ver>>,

/// Roll call for external messages
#[derivative(Debug = "ignore")]
roll_call: Arc<RollCall<N>>,

/// An orchestrator to wait for before starting consensus.
#[derivative(Debug = "ignore")]
wait_for_orchestrator: Option<Arc<OrchestratorClient>>,
Expand Down Expand Up @@ -86,6 +83,7 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, Ver: StaticVersionTyp
state_relay_server: Option<Url>,
metrics: &dyn Metrics,
stake_table_capacity: u64,
public_api_url: Option<Url>,
_: Ver,
) -> anyhow::Result<Self> {
let config = &network_config.config;
Expand Down Expand Up @@ -156,16 +154,18 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, Ver: StaticVersionTyp
state_signer = state_signer.with_relay_server(url);
}

let roll_call = RollCall {
network,
identifier: instance_state.node_id.to_string(),
};
// Create the roll call info we will be using
let roll_call_info = external_event_handler::RollCallInfo { public_api_url };

// Create the external event handler
let external_event_handler = ExternalEventHandler::new(network, roll_call_info)
.with_context(|| "Failed to create external event handler")?;

Ok(Self::new(
handle,
persistence,
state_signer,
roll_call,
external_event_handler,
event_streamer,
instance_state,
network_config,
Expand All @@ -177,7 +177,7 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, Ver: StaticVersionTyp
handle: Consensus<N, P>,
persistence: Arc<RwLock<P>>,
state_signer: StateSigner<Ver>,
roll_call: RollCall<N>,
external_event_handler: ExternalEventHandler<N>,
event_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
node_state: NodeState,
config: NetworkConfig<PubKey>,
Expand All @@ -187,7 +187,6 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, Ver: StaticVersionTyp
let mut ctx = Self {
handle: Arc::new(RwLock::new(handle)),
state_signer: Arc::new(state_signer),
roll_call: Arc::new(roll_call),
tasks: Default::default(),
detached: false,
wait_for_orchestrator: None,
Expand All @@ -201,7 +200,7 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, Ver: StaticVersionTyp
events,
persistence,
ctx.state_signer.clone(),
ctx.roll_call.clone(),
external_event_handler,
Some(event_streamer.clone()),
),
);
Expand Down Expand Up @@ -328,7 +327,7 @@ async fn handle_events<Ver: StaticVersionType, N: ConnectedNetwork<PubKey>>(
mut events: impl Stream<Item = Event<SeqTypes>> + Unpin,
persistence: Arc<RwLock<impl SequencerPersistence>>,
state_signer: Arc<StateSigner<Ver>>,
roll_call: Arc<RollCall<N>>,
external_event_handler: ExternalEventHandler<N>,
events_streamer: Option<Arc<RwLock<EventsStreamer<SeqTypes>>>>,
) {
while let Some(event) = events.next().await {
Expand All @@ -342,10 +341,12 @@ async fn handle_events<Ver: StaticVersionType, N: ConnectedNetwork<PubKey>>(
// Generate state signature.
state_signer.handle_event(&event).await;

// Handle the external message (maybe we name this "external handler" or something)
if let Err(e) = roll_call.handle_event(&event).await {
tracing::warn!(error = ?e, "Failed to handle external message");
};
// Handle external messages
if let EventType::ExternalMessageReceived(external_message_bytes) = &event.event {
if let Err(err) = external_event_handler.handle_event(external_message_bytes) {
tracing::warn!("Failed to handle external message: {:?}", err);
};
}

// Send the event via the event streaming service
if let Some(events_streamer) = events_streamer.as_ref() {
Expand Down
149 changes: 149 additions & 0 deletions sequencer/src/external_event_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
//! Should probably rename this to "external" or something
use std::{collections::BTreeSet, sync::Arc};

use anyhow::{Context, Result};
use async_compatibility_layer::channel::{Receiver, Sender};
use async_std::task::{self, JoinHandle};
use espresso_types::PubKey;
use hotshot::types::BLSPubKey;
use hotshot_types::traits::network::{BroadcastDelay, ConnectedNetwork};
use serde::{Deserialize, Serialize};
use url::Url;

/// An external message that can be sent to or received from a node
#[derive(Serialize, Deserialize, Clone)]
pub enum ExternalMessage {
/// A request for a node to respond with its identifier
/// Contains the public key of the node that is requesting the roll call
RollCallRequest(BLSPubKey),

/// A response to a roll call request
/// Contains the identifier of the node
RollCallResponse(RollCallInfo),
}

/// Information about a node that is used in a roll call response
#[derive(Serialize, Deserialize, Clone)]
pub struct RollCallInfo {
// The public API URL of the node
pub public_api_url: Option<Url>,
}

/// The external event handler state
pub struct ExternalEventHandler<N: ConnectedNetwork<PubKey>> {
// The network to respond over
pub network: Arc<N>,

// The `RollCallInfo` of the node (used in the roll call response)
pub roll_call_info: RollCallInfo,

// The tasks that are running
pub tasks: Vec<JoinHandle<()>>,

// The outbound message queue
pub outbound_message_sender: Sender<OutboundMessage>,
}

// The different types of outbound messages (broadcast or direct)
pub enum OutboundMessage {
Direct(Vec<u8>, PubKey),
Broadcast(Vec<u8>),
}

impl<N: ConnectedNetwork<PubKey>> ExternalEventHandler<N> {
/// Creates a new `ExternalEventHandler` with the given network and roll call info
pub fn new(network: Arc<N>, roll_call_info: RollCallInfo) -> Result<Self> {
// Create the outbound message queue
let (outbound_message_sender, outbound_message_receiver) =
async_compatibility_layer::channel::bounded(50);

// Spawn the outbound message handling loop
let outbound_message_loop = async_std::task::spawn(Self::outbound_message_loop(
outbound_message_receiver,
network.clone(),
));

// We just started, so queue an outbound RollCall message
let roll_call_message = ExternalMessage::RollCallResponse(roll_call_info.clone());
let roll_call_message_bytes = bincode::serialize(&roll_call_message)
.with_context(|| "Failed to serialize roll call message for initial broadcast")?;
outbound_message_sender
.try_send(OutboundMessage::Broadcast(roll_call_message_bytes))
.with_context(|| "External outbound message queue is somehow full")?;

Ok(Self {
network,
roll_call_info,
tasks: vec![outbound_message_loop],
outbound_message_sender,
})
}

/// Handles an event
///
/// # Errors
/// If the message type is unknown or if there is an error serializing or deserializing the message
pub fn handle_event(&self, external_message_bytes: &[u8]) -> Result<()> {
// Deserialize the external message
let external_message = bincode::deserialize(external_message_bytes)
.with_context(|| "Failed to deserialize external message")?;

// Match the type
match external_message {
ExternalMessage::RollCallRequest(pub_key) => {
// If it's a roll call request, send our information
let response = ExternalMessage::RollCallResponse(self.roll_call_info.clone());

// Serialize the response
let response_bytes = bincode::serialize(&response)
.with_context(|| "Failed to serialize roll call response")?;

// Send the response
self.outbound_message_sender
.try_send(OutboundMessage::Direct(response_bytes, pub_key))
.with_context(|| "External outbound message queue is full")?;
}

_ => {
return Err(anyhow::anyhow!("Unknown external message type"));
}
}
Ok(())
}

/// The main loop for sending outbound messages.
/// This is a queue so that we don't block the main event loop when sending messages.
async fn outbound_message_loop(mut receiver: Receiver<OutboundMessage>, network: Arc<N>) {
while let Ok(message) = receiver.recv().await {
// Match the message type
match message {
OutboundMessage::Direct(message, recipient) => {
// Send the message directly to the recipient
if let Err(err) = network.direct_message(message, recipient).await {
tracing::error!("Failed to send message: {:?}", err);
};
}

OutboundMessage::Broadcast(message) => {
// Broadcast the message to the global topic
if let Err(err) = network
.broadcast_message(message, BTreeSet::new(), BroadcastDelay::None)
.await
{
tracing::error!("Failed to broadcast message: {:?}", err);
};
}
}
}
}
}

impl<N: ConnectedNetwork<PubKey>> Drop for ExternalEventHandler<N> {
fn drop(&mut self) {
// Cancel all tasks
for task in self.tasks.drain(..) {
task::block_on(task.cancel());
}
}
}
6 changes: 5 additions & 1 deletion sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ pub mod catchup;
pub mod context;
pub mod genesis;

mod external_event_handler;
pub mod hotshot_commitment;
pub mod options;
pub mod state_signature;
mod roll_call;

mod message_compat_tests;

Expand Down Expand Up @@ -105,6 +105,8 @@ pub struct NetworkParams {
pub state_peers: Vec<Url>,
pub config_peers: Option<Vec<Url>>,
pub catchup_backoff: BackoffParams,
/// The address to advertise as our public API's URL
pub public_api_url: Option<Url>,

/// The address to send to other Libp2p nodes to contact us
pub libp2p_advertise_address: SocketAddr,
Expand Down Expand Up @@ -355,6 +357,7 @@ pub async fn init_node<P: PersistenceOptions, Ver: StaticVersionType + 'static>(
Some(network_params.state_relay_server_url),
metrics,
genesis.stake_table.capacity,
network_params.public_api_url,
bind_version,
)
.await?;
Expand Down Expand Up @@ -691,6 +694,7 @@ pub mod testing {
self.state_relay_url.clone(),
metrics,
stake_table_capacity,
None, // The public API URL
bind_version,
)
.await
Expand Down
1 change: 1 addition & 0 deletions sequencer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ where
libp2p_bootstrap_nodes: opt.libp2p_bootstrap_nodes,
orchestrator_url: opt.orchestrator_url,
state_relay_server_url: opt.state_relay_server_url,
public_api_url: opt.public_api_url,
private_staking_key,
private_state_key,
state_peers: opt.state_peers,
Expand Down
5 changes: 5 additions & 0 deletions sequencer/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ pub struct Options {
)]
pub libp2p_bind_address: String,

/// The URL we advertise to other nodes as being for our public API.
/// Should be supplied in `http://host:port` form.
#[clap(long, env = "ESPRESSO_SEQUENCER_PUBLIC_API_URL")]
pub public_api_url: Option<Url>,

/// The address we advertise to other nodes as being a Libp2p endpoint.
/// Should be supplied in `host:port` form.
#[clap(
Expand Down
Loading

0 comments on commit a599c5d

Please sign in to comment.