Skip to content

Commit

Permalink
Extract signer loop into separate run_forever_inner()
Browse files Browse the repository at this point in the history
  • Loading branch information
ok300 authored and cdecker committed Apr 17, 2024
1 parent 31c55c4 commit 1d44455
Showing 1 changed file with 61 additions and 54 deletions.
115 changes: 61 additions & 54 deletions libs/gl-client/src/signer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use lightning_signer::bitcoin::Network;
use lightning_signer::node::NodeServices;
use lightning_signer::policy::filter::FilterRule;
use lightning_signer::util::crypto_utils;
use log::{debug, info, trace, warn};
use log::{debug, info, trace, warn, error};
use runeauth::{Condition, MapChecker, Restriction, Rune, RuneError};
use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
Expand Down Expand Up @@ -649,7 +649,10 @@ impl Signer {
}

/// Create and, if necessary, upgrade the scheduler
async fn init_scheduler(&self, scheduler_uri: String) -> Result<SchedulerClient<tonic::transport::channel::Channel>> {
async fn init_scheduler(
&self,
scheduler_uri: String,
) -> Result<SchedulerClient<tonic::transport::channel::Channel>> {
debug!("Connecting to scheduler at {scheduler_uri}");

let channel = Endpoint::from_shared(scheduler_uri)?
Expand Down Expand Up @@ -695,64 +698,68 @@ impl Signer {
Ok(scheduler)
}

/// The core signer loop. Connects to the signer and keeps the connection alive.
///
/// Used as inner loop for `run_forever_with_uri`.
async fn run_forever_inner(
&self,
mut scheduler: SchedulerClient<tonic::transport::channel::Channel>,
) -> Result<(), anyhow::Error> {
loop {
debug!("Calling scheduler.get_node_info");
let node_info_res = scheduler
.get_node_info(NodeInfoRequest {
node_id: self.id.clone(),

// This `wait` parameter means that the scheduler will
// not automatically schedule the node. Rather we are
// telling the scheduler we want to be told as soon as
// the node is being scheduled so we can re-attach to
// that.
wait: true,
})
.await;

let node_info = match node_info_res.map(|v| v.into_inner()) {
Ok(v) => {
debug!("Got node_info from scheduler: {:?}", v);
v
}
Err(e) => {
trace!("Got an error from the scheduler: {e}. Sleeping before retrying");
sleep(Duration::from_millis(1000)).await;
continue;
}
};

if node_info.grpc_uri.is_empty() {
trace!("Got an empty GRPC URI, node is not scheduled, sleeping and retrying");
sleep(Duration::from_millis(1000)).await;
continue;
}

if let Err(e) = self
.run_once(Uri::from_maybe_shared(node_info.grpc_uri)?)
.await
{
warn!("Error running against node: {e}");
}
}
}

pub async fn run_forever_with_uri(
&self,
mut shutdown: mpsc::Receiver<()>,
scheduler_uri: String,
) -> Result<(), anyhow::Error> {
let mut scheduler = self.init_scheduler(scheduler_uri).await?;

loop {
debug!("Calling scheduler.get_node_info");
let get_node = scheduler.get_node_info(NodeInfoRequest {
node_id: self.id.clone(),

// This `wait` parameter means that the scheduler will
// not automatically schedule the node. Rather we are
// telling the scheduler we want to be told as soon as
// the node is being scheduled so we can re-attach to
// that.
wait: true,
});
tokio::select! {
info = get_node => {
let node_info = match info
.map(|v| v.into_inner())
{
Ok(v) => {
debug!("Got node_info from scheduler: {:?}", v);
v
}
Err(e) => {
trace!(
"Got an error from the scheduler: {}. Sleeping before retrying",
e
);
sleep(Duration::from_millis(1000)).await;
continue;
}
};

if node_info.grpc_uri.is_empty() {
trace!("Got an empty GRPC URI, node is not scheduled, sleeping and retrying");
sleep(Duration::from_millis(1000)).await;
continue;
}

if let Err(e) = self
.run_once(Uri::from_maybe_shared(node_info.grpc_uri)?)
.await {
warn!("Error running against node: {}", e);
}


},
_ = shutdown.recv() => {
debug!("Received the signal to exit the signer loop");
break;
let scheduler = self.init_scheduler(scheduler_uri).await?;
tokio::select! {
run_forever_inner_res = self.run_forever_inner(scheduler) => {
error!("Inner signer loop exited unexpectedly: {run_forever_inner_res:?}");
},
};
}
_ = shutdown.recv() => debug!("Received the signal to exit the signer loop")
};

info!("Exiting the signer loop");
Ok(())
}
Expand Down

0 comments on commit 1d44455

Please sign in to comment.