Skip to content

Commit

Permalink
Allow nodes to fetch network config from peers
Browse files Browse the repository at this point in the history
  • Loading branch information
jbearer committed Jul 15, 2024
1 parent 94eaec3 commit 7075d51
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 6 deletions.
1 change: 1 addition & 0 deletions builder/src/bin/permissioned-builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ async fn main() -> anyhow::Result<()> {
private_staking_key: private_staking_key.clone(),
private_state_key,
state_peers: opt.state_peers,
config_peers: None,
catchup_backoff: Default::default(),
};

Expand Down
58 changes: 57 additions & 1 deletion sequencer/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,7 @@ mod test {
metrics::NoMetrics,
node_implementation::{ConsensusTime, NodeType},
},
ValidatorConfig,
};
use jf_merkle_tree::prelude::{MerkleProof, Sha3Node};
use portpicker::pick_unused_port;
Expand All @@ -1063,7 +1064,8 @@ mod test {
use vbs::version::Version;

use self::{
data_source::testing::TestableSequencerDataSource, sql::DataSource as SqlDataSource,
data_source::{testing::TestableSequencerDataSource, PublicHotShotConfig},
sql::DataSource as SqlDataSource,
};
use super::*;
use crate::{
Expand Down Expand Up @@ -1664,4 +1666,58 @@ mod test {
.unwrap();
assert_eq!(chain, new_chain);
}

#[async_std::test]
async fn test_fetch_config() {
setup_logging();
setup_backtrace();

let port = pick_unused_port().expect("No ports free");
let url: surf_disco::Url = format!("http://localhost:{port}").parse().unwrap();
let client: Client<ServerError, SequencerVersion> = Client::new(url.clone());

let options = Options::with_port(port).config(Default::default());
let anvil = Anvil::new().spawn();
let l1 = anvil.endpoint().parse().unwrap();
let network_config = TestConfigBuilder::default().l1_url(l1).build();
let config = TestNetworkConfigBuilder::default()
.api_config(options)
.network_config(network_config)
.build();
let network = TestNetwork::new(config).await;
client.connect(None).await;

// Fetch a network config from the API server. The first peer URL is bogus, to test the
// failure/retry case.
let peers = StatePeers::<SequencerVersion>::from_urls(
vec!["https://notarealnode.network".parse().unwrap(), url],
Default::default(),
);

// Fetch the config from node 1, a different node than the one running the service.
let validator = ValidatorConfig::generated_from_seed_indexed([0; 32], 1, 1, false);
let mut config = peers.fetch_config(validator.clone()).await;

// Check the node-specific information in the recovered config is correct.
assert_eq!(
config.config.my_own_validator_config.public_key,
validator.public_key
);
assert_eq!(
config.config.my_own_validator_config.private_key,
validator.private_key
);

// Check the public information is also correct (with respect to the node that actually
// served the config, for public keys).
config.config.my_own_validator_config =
ValidatorConfig::generated_from_seed_indexed([0; 32], 0, 1, true);
pretty_assertions::assert_eq!(
serde_json::to_value(PublicHotShotConfig::from(config.config)).unwrap(),
serde_json::to_value(PublicHotShotConfig::from(
network.cfg.hotshot_config().clone()
))
.unwrap()
);
}
}
45 changes: 43 additions & 2 deletions sequencer/src/catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,25 @@ use espresso_types::{
v0::traits::{PersistenceOptions, StateCatchup},
AccountQueryData, BackoffParams, BlockMerkleTree, ChainConfig, FeeAccount, FeeMerkleCommitment,
};
use hotshot_types::{data::ViewNumber, traits::node_implementation::ConsensusTime as _};
use futures::future::FutureExt;
use hotshot_orchestrator::config::NetworkConfig;
use hotshot_types::{
data::ViewNumber, traits::node_implementation::ConsensusTime as _, ValidatorConfig,
};
use jf_merkle_tree::{prelude::MerkleNode, ForgetableMerkleTreeScheme, MerkleTreeScheme};
use serde::de::DeserializeOwned;
use surf_disco::Request;
use tide_disco::error::ServerError;
use url::Url;
use vbs::version::StaticVersionType;

use crate::api::{data_source::CatchupDataSource, BlocksFrontier};
use crate::{
api::{
data_source::{CatchupDataSource, PublicNetworkConfig},
BlocksFrontier,
},
PubKey,
};

// This newtype is probably not worth having. It's only used to be able to log
// URLs before doing requests.
Expand Down Expand Up @@ -71,6 +81,37 @@ impl<Ver: StaticVersionType> StatePeers<Ver> {
backoff,
}
}

pub async fn fetch_config(
&self,
my_own_validator_config: ValidatorConfig<PubKey>,
) -> NetworkConfig<PubKey> {
self.backoff()
.retry(self, move |provider| {
let my_own_validator_config = my_own_validator_config.clone();
async move {
for client in &provider.clients {
tracing::info!("fetching config from {}", client.url);
match client
.get::<PublicNetworkConfig>("config/hotshot")
.send()
.await
{
Ok(res) => {
return res.into_network_config(my_own_validator_config)
.context(format!("fetched config from {}, but failed to convert to private config", client.url));
}
Err(err) => {
tracing::warn!("error fetching config from peer: {err:#}");
}
}
}
bail!("could not fetch config from any peer");
}
.boxed()
})
.await
}
}

#[async_trait]
Expand Down
18 changes: 15 additions & 3 deletions sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ pub struct NetworkParams {
pub private_staking_key: BLSPrivKey,
pub private_state_key: StateSignKey,
pub state_peers: Vec<Url>,
pub config_peers: Option<Vec<Url>>,
pub catchup_backoff: BackoffParams,

/// The address to send to other Libp2p nodes to contact us
Expand Down Expand Up @@ -168,12 +169,23 @@ pub async fn init_node<P: PersistenceOptions, Ver: StaticVersionType + 'static>(
.with_context(|| "Failed to derive Libp2p peer ID")?;

let mut persistence = persistence_opt.clone().create().await?;
let (mut config, wait_for_orchestrator) = match persistence.load_config().await? {
Some(config) => {
let (mut config, wait_for_orchestrator) = match (
persistence.load_config().await?,
network_params.config_peers,
) {
(Some(config), _) => {
tracing::info!("loaded network config from storage, rejoining existing network");
(config, false)
}
None => {
// If we were told to fetch the config from an already-started peer, do so.
(None, Some(peers)) => {
tracing::info!(?peers, "loading network config from peers");
let peers = StatePeers::<Ver>::from_urls(peers, network_params.catchup_backoff);
let config = peers.fetch_config(my_config.clone()).await;
(config, false)
}
// Otherwise, this is a fresh network; load from the orchestrator.
(None, None) => {
tracing::info!("loading network config from orchestrator");
tracing::error!(
"waiting for other nodes to connect, DO NOT RESTART until fully connected"
Expand Down
1 change: 1 addition & 0 deletions sequencer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ where
private_staking_key,
private_state_key,
state_peers: opt.state_peers,
config_peers: opt.config_peers,
catchup_backoff: opt.catchup_backoff,
};

Expand Down
27 changes: 27 additions & 0 deletions sequencer/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,16 @@ pub struct Options {
#[derivative(Debug(format_with = "fmt_urls"))]
pub state_peers: Vec<Url>,

/// Peer nodes use to fetch missing config
///
/// Typically, the network-wide config is fetched from the orchestrator on startup and then
/// persisted and loaded from local storage each time the node restarts. However, if the
/// persisted config is missing when the node restarts (for example, the node is being migrated
/// to new persistent storage), it can instead be fetched directly from a peer.
#[clap(long, env = "ESPRESSO_SEQUENCER_CONFIG_PEERS", value_delimiter = ',')]
#[derivative(Debug(format_with = "fmt_opt_urls"))]
pub config_peers: Option<Vec<Url>>,

/// Exponential backoff for fetching missing state from peers.
#[clap(flatten)]
pub catchup_backoff: BackoffParams,
Expand Down Expand Up @@ -230,6 +240,23 @@ fn fmt_urls(v: &[Url], fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Er
)
}

fn fmt_opt_urls(
v: &Option<Vec<Url>>,
fmt: &mut std::fmt::Formatter,
) -> Result<(), std::fmt::Error> {
match v {
Some(urls) => {
write!(fmt, "Some(")?;
fmt_urls(urls, fmt)?;
write!(fmt, ")")?;
}
None => {
write!(fmt, "None")?;
}
}
Ok(())
}

#[derive(Clone, Debug, Snafu)]
pub struct ParseDurationError {
reason: String,
Expand Down

0 comments on commit 7075d51

Please sign in to comment.