From 7075d516540dceb95a905612b13cb32c9979b9bc Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Mon, 15 Jul 2024 10:49:20 -0400 Subject: [PATCH] Allow nodes to fetch network config from peers --- builder/src/bin/permissioned-builder.rs | 1 + sequencer/src/api.rs | 58 ++++++++++++++++++++++++- sequencer/src/catchup.rs | 45 ++++++++++++++++++- sequencer/src/lib.rs | 18 ++++++-- sequencer/src/main.rs | 1 + sequencer/src/options.rs | 27 ++++++++++++ 6 files changed, 144 insertions(+), 6 deletions(-) diff --git a/builder/src/bin/permissioned-builder.rs b/builder/src/bin/permissioned-builder.rs index a83411ed38..b76d02b245 100644 --- a/builder/src/bin/permissioned-builder.rs +++ b/builder/src/bin/permissioned-builder.rs @@ -255,6 +255,7 @@ async fn main() -> anyhow::Result<()> { private_staking_key: private_staking_key.clone(), private_state_key, state_peers: opt.state_peers, + config_peers: None, catchup_backoff: Default::default(), }; diff --git a/sequencer/src/api.rs b/sequencer/src/api.rs index 436ff4e528..7c9f9c724b 100644 --- a/sequencer/src/api.rs +++ b/sequencer/src/api.rs @@ -1051,6 +1051,7 @@ mod test { metrics::NoMetrics, node_implementation::{ConsensusTime, NodeType}, }, + ValidatorConfig, }; use jf_merkle_tree::prelude::{MerkleProof, Sha3Node}; use portpicker::pick_unused_port; @@ -1063,7 +1064,8 @@ mod test { use vbs::version::Version; use self::{ - data_source::testing::TestableSequencerDataSource, sql::DataSource as SqlDataSource, + data_source::{testing::TestableSequencerDataSource, PublicHotShotConfig}, + sql::DataSource as SqlDataSource, }; use super::*; use crate::{ @@ -1664,4 +1666,58 @@ mod test { .unwrap(); assert_eq!(chain, new_chain); } + + #[async_std::test] + async fn test_fetch_config() { + setup_logging(); + setup_backtrace(); + + let port = pick_unused_port().expect("No ports free"); + let url: surf_disco::Url = format!("http://localhost:{port}").parse().unwrap(); + let client: Client = Client::new(url.clone()); + + let options = Options::with_port(port).config(Default::default()); + let anvil = Anvil::new().spawn(); + let l1 = anvil.endpoint().parse().unwrap(); + let network_config = TestConfigBuilder::default().l1_url(l1).build(); + let config = TestNetworkConfigBuilder::default() + .api_config(options) + .network_config(network_config) + .build(); + let network = TestNetwork::new(config).await; + client.connect(None).await; + + // Fetch a network config from the API server. The first peer URL is bogus, to test the + // failure/retry case. + let peers = StatePeers::::from_urls( + vec!["https://notarealnode.network".parse().unwrap(), url], + Default::default(), + ); + + // Fetch the config from node 1, a different node than the one running the service. + let validator = ValidatorConfig::generated_from_seed_indexed([0; 32], 1, 1, false); + let mut config = peers.fetch_config(validator.clone()).await; + + // Check the node-specific information in the recovered config is correct. + assert_eq!( + config.config.my_own_validator_config.public_key, + validator.public_key + ); + assert_eq!( + config.config.my_own_validator_config.private_key, + validator.private_key + ); + + // Check the public information is also correct (with respect to the node that actually + // served the config, for public keys). + config.config.my_own_validator_config = + ValidatorConfig::generated_from_seed_indexed([0; 32], 0, 1, true); + pretty_assertions::assert_eq!( + serde_json::to_value(PublicHotShotConfig::from(config.config)).unwrap(), + serde_json::to_value(PublicHotShotConfig::from( + network.cfg.hotshot_config().clone() + )) + .unwrap() + ); + } } diff --git a/sequencer/src/catchup.rs b/sequencer/src/catchup.rs index fbc2b2c7a3..9c08bafd37 100644 --- a/sequencer/src/catchup.rs +++ b/sequencer/src/catchup.rs @@ -8,7 +8,11 @@ use espresso_types::{ v0::traits::{PersistenceOptions, StateCatchup}, AccountQueryData, BackoffParams, BlockMerkleTree, ChainConfig, FeeAccount, FeeMerkleCommitment, }; -use hotshot_types::{data::ViewNumber, traits::node_implementation::ConsensusTime as _}; +use futures::future::FutureExt; +use hotshot_orchestrator::config::NetworkConfig; +use hotshot_types::{ + data::ViewNumber, traits::node_implementation::ConsensusTime as _, ValidatorConfig, +}; use jf_merkle_tree::{prelude::MerkleNode, ForgetableMerkleTreeScheme, MerkleTreeScheme}; use serde::de::DeserializeOwned; use surf_disco::Request; @@ -16,7 +20,13 @@ use tide_disco::error::ServerError; use url::Url; use vbs::version::StaticVersionType; -use crate::api::{data_source::CatchupDataSource, BlocksFrontier}; +use crate::{ + api::{ + data_source::{CatchupDataSource, PublicNetworkConfig}, + BlocksFrontier, + }, + PubKey, +}; // This newtype is probably not worth having. It's only used to be able to log // URLs before doing requests. @@ -71,6 +81,37 @@ impl StatePeers { backoff, } } + + pub async fn fetch_config( + &self, + my_own_validator_config: ValidatorConfig, + ) -> NetworkConfig { + self.backoff() + .retry(self, move |provider| { + let my_own_validator_config = my_own_validator_config.clone(); + async move { + for client in &provider.clients { + tracing::info!("fetching config from {}", client.url); + match client + .get::("config/hotshot") + .send() + .await + { + Ok(res) => { + return res.into_network_config(my_own_validator_config) + .context(format!("fetched config from {}, but failed to convert to private config", client.url)); + } + Err(err) => { + tracing::warn!("error fetching config from peer: {err:#}"); + } + } + } + bail!("could not fetch config from any peer"); + } + .boxed() + }) + .await + } } #[async_trait] diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index 03cf4f6529..e83261786c 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -102,6 +102,7 @@ pub struct NetworkParams { pub private_staking_key: BLSPrivKey, pub private_state_key: StateSignKey, pub state_peers: Vec, + pub config_peers: Option>, pub catchup_backoff: BackoffParams, /// The address to send to other Libp2p nodes to contact us @@ -168,12 +169,23 @@ pub async fn init_node( .with_context(|| "Failed to derive Libp2p peer ID")?; let mut persistence = persistence_opt.clone().create().await?; - let (mut config, wait_for_orchestrator) = match persistence.load_config().await? { - Some(config) => { + let (mut config, wait_for_orchestrator) = match ( + persistence.load_config().await?, + network_params.config_peers, + ) { + (Some(config), _) => { tracing::info!("loaded network config from storage, rejoining existing network"); (config, false) } - None => { + // If we were told to fetch the config from an already-started peer, do so. + (None, Some(peers)) => { + tracing::info!(?peers, "loading network config from peers"); + let peers = StatePeers::::from_urls(peers, network_params.catchup_backoff); + let config = peers.fetch_config(my_config.clone()).await; + (config, false) + } + // Otherwise, this is a fresh network; load from the orchestrator. + (None, None) => { tracing::info!("loading network config from orchestrator"); tracing::error!( "waiting for other nodes to connect, DO NOT RESTART until fully connected" diff --git a/sequencer/src/main.rs b/sequencer/src/main.rs index a5a878176f..1142e5c587 100644 --- a/sequencer/src/main.rs +++ b/sequencer/src/main.rs @@ -83,6 +83,7 @@ where private_staking_key, private_state_key, state_peers: opt.state_peers, + config_peers: opt.config_peers, catchup_backoff: opt.catchup_backoff, }; diff --git a/sequencer/src/options.rs b/sequencer/src/options.rs index 1eb88bf7bd..d8384bacb7 100644 --- a/sequencer/src/options.rs +++ b/sequencer/src/options.rs @@ -188,6 +188,16 @@ pub struct Options { #[derivative(Debug(format_with = "fmt_urls"))] pub state_peers: Vec, + /// Peer nodes use to fetch missing config + /// + /// Typically, the network-wide config is fetched from the orchestrator on startup and then + /// persisted and loaded from local storage each time the node restarts. However, if the + /// persisted config is missing when the node restarts (for example, the node is being migrated + /// to new persistent storage), it can instead be fetched directly from a peer. + #[clap(long, env = "ESPRESSO_SEQUENCER_CONFIG_PEERS", value_delimiter = ',')] + #[derivative(Debug(format_with = "fmt_opt_urls"))] + pub config_peers: Option>, + /// Exponential backoff for fetching missing state from peers. #[clap(flatten)] pub catchup_backoff: BackoffParams, @@ -230,6 +240,23 @@ fn fmt_urls(v: &[Url], fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Er ) } +fn fmt_opt_urls( + v: &Option>, + fmt: &mut std::fmt::Formatter, +) -> Result<(), std::fmt::Error> { + match v { + Some(urls) => { + write!(fmt, "Some(")?; + fmt_urls(urls, fmt)?; + write!(fmt, ")")?; + } + None => { + write!(fmt, "None")?; + } + } + Ok(()) +} + #[derive(Clone, Debug, Snafu)] pub struct ParseDurationError { reason: String,