From 941c47a9aabfd12d555e2f3ffafd9d35dc3307f8 Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Fri, 17 Nov 2023 13:04:00 +0100 Subject: [PATCH] plugin: Send a heartbeat as first message The heartbeat serves two purposes: 1) start sending the entire state to the signer, so it can cache it locally, and 2) trigger a pruning round on the signer state. --- libs/gl-client/src/signer/resolve.rs | 1 + libs/gl-plugin/src/node/mod.rs | 41 +++++++++++++++++++++++++++- libs/gl-signerproxy/src/hsmproxy.rs | 5 +++- 3 files changed, 45 insertions(+), 2 deletions(-) diff --git a/libs/gl-client/src/signer/resolve.rs b/libs/gl-client/src/signer/resolve.rs index 5ee0c129c..4ae4623be 100644 --- a/libs/gl-client/src/signer/resolve.rs +++ b/libs/gl-client/src/signer/resolve.rs @@ -20,6 +20,7 @@ impl Resolver { // we do an early pass: let accept = match req { // Commands that simply have no context to check against + Message::GetHeartbeat(_) => true, Message::Ecdh(_) => true, Message::Ping(_) => true, Message::Pong(_) => true, diff --git a/libs/gl-plugin/src/node/mod.rs b/libs/gl-plugin/src/node/mod.rs index c3845f293..6ce8d7b9b 100644 --- a/libs/gl-plugin/src/node/mod.rs +++ b/libs/gl-plugin/src/node/mod.rs @@ -282,9 +282,48 @@ impl Node for PluginNodeServer { let mut stream = self.stage.mystream().await; let signer_state = self.signer_state.clone(); let ctx = self.ctx.clone(); - + tokio::spawn(async move { trace!("hsmd hsm_id={} request processor started", hsm_id); + + { + // We start by immediately injecting a + // vls_protocol::Message::GetHeartbeat. This serves two + // purposes: already send the initial snapshot of the + // signer state to the signer as early as possible, and + // triggering a pruning on the signer, if enabled. In + // incremental mode this ensures that any subsequent, + // presumably time-critical messages, do not have to carry + // the large state with them. + + let state = signer_state.lock().await.clone(); + let state: Vec = state.into(); + let state: Vec = state + .into_iter() + .map(|s| pb::SignerStateEntry { + key: s.key, + version: s.version, + value: s.value, + }) + .collect(); + + let msg = vls_protocol::msgs::GetHeartbeat {}; + use vls_protocol::msgs::SerBolt; + let req = crate::pb::HsmRequest { + // Notice that the request_counter starts at 1000, to + // avoid collisions. + request_id: 0, + signer_state: state, + raw: msg.as_vec(), + requests: vec![], // No pending requests yet, nothing to authorize. + context: None, + }; + + if let Err(e) = tx.send(Ok(req)).await { + log::warn!("Failed to send heartbeat message to signer: {}", e); + } + } + loop { let mut req = match stream.next().await { Err(e) => { diff --git a/libs/gl-signerproxy/src/hsmproxy.rs b/libs/gl-signerproxy/src/hsmproxy.rs index 5e42f7a66..3413964d0 100644 --- a/libs/gl-signerproxy/src/hsmproxy.rs +++ b/libs/gl-signerproxy/src/hsmproxy.rs @@ -148,7 +148,10 @@ async fn grpc_connect() -> Result { pub async fn run() -> Result<(), Error> { let args: Vec = std::env::args().collect(); - let request_counter = Arc::new(atomic::AtomicUsize::new(0)); + + // Start the counter at 1000 so we can inject some message before + // real requests if we want to. + let request_counter = Arc::new(atomic::AtomicUsize::new(1000)); if args.len() == 2 && args[1] == "--version" { println!("{}", version()); return Ok(());