From 1b14fe65c0833afb28f06afb86a5e0cf96a80a23 Mon Sep 17 00:00:00 2001 From: Peter Neuroth Date: Mon, 29 Jul 2024 17:41:15 +0200 Subject: [PATCH] trampoline: Add keepalive trampoline_pay version This commit adds `trampoline_pay_keepalive` to the greenlight grpc node client. This version does exactly the same as `trampoline_pay` but streams log messages from the node as long as `trampoline_pay` is inflight, in order to keep the connection busy. Signed-off-by: Peter Neuroth --- libs/gl-client-py/src/node.rs | 4 +- .../tests/plugins/trmp_htlc_hook.py | 11 +++ libs/gl-client/src/node/mod.rs | 90 ++++++++++++++++++- 3 files changed, 99 insertions(+), 6 deletions(-) diff --git a/libs/gl-client-py/src/node.rs b/libs/gl-client-py/src/node.rs index 0ce1237c9..12cea9cda 100644 --- a/libs/gl-client-py/src/node.rs +++ b/libs/gl-client-py/src/node.rs @@ -58,7 +58,7 @@ impl Node { } fn trampoline_pay( - &self, + &mut self, bolt11: String, trampoline_node_id: Vec, amount_msat: Option, @@ -76,7 +76,7 @@ impl Node { maxdelay: maxdelay.unwrap_or_default(), description: description.unwrap_or_default(), }; - let res = exec(async { self.client.clone().trampoline_pay(req).await }) + let res = exec(async { self.client.trampoline_pay_keepalive(req).await }) .map_err(error_calling_remote_method)? .into_inner(); convert(Ok(res)) diff --git a/libs/gl-client-py/tests/plugins/trmp_htlc_hook.py b/libs/gl-client-py/tests/plugins/trmp_htlc_hook.py index 066cda377..a20dfbc6b 100755 --- a/libs/gl-client-py/tests/plugins/trmp_htlc_hook.py +++ b/libs/gl-client-py/tests/plugins/trmp_htlc_hook.py @@ -4,6 +4,7 @@ from binascii import unhexlify from pyln.proto.primitives import varint_decode from io import BytesIO +import time INVOICE_TYPE = 33001 AMOUNT_TYPE = 33003 @@ -16,6 +17,7 @@ plugin.check_invoice = None plugin.check_amount = None plugin.payment_key = None +plugin.waitfor = None @plugin.hook("htlc_accepted") @@ -30,6 +32,9 @@ def on_htlc_accepted(htlc, onion, plugin, **kwargs): invoice_value = payment_metadata.read(invoice_length) assert invoice_type == INVOICE_TYPE + if plugin.waitfor is not None: + time.sleep(plugin.waitfor) + if plugin.check_invoice is not None: plugin.log( f"check invoice {invoice_value.decode('utf-8')} matches {plugin.check_invoice}" @@ -87,4 +92,10 @@ def unsetchecks(plugin): plugin.check_amount = None +@plugin.method("waitfor") +def waitfor(plugin, duration_sec): + """Waits for duration_sec before continuing with completion""" + plugin.waitfor = duration_sec + + plugin.run() diff --git a/libs/gl-client/src/node/mod.rs b/libs/gl-client/src/node/mod.rs index 458afc688..cc960a40d 100644 --- a/libs/gl-client/src/node/mod.rs +++ b/libs/gl-client/src/node/mod.rs @@ -3,9 +3,13 @@ use crate::pb::cln::node_client as cln_client; use crate::pb::node_client::NodeClient; use crate::pb::scheduler::{scheduler_client::SchedulerClient, ScheduleRequest}; use crate::tls::TlsConfig; -use crate::utils; +use crate::{pb, utils}; use anyhow::{anyhow, Result}; +use futures::StreamExt; use log::{debug, info, trace}; +use std::ops::{Deref, DerefMut}; +use tokio::sync::oneshot; +use tokio::task::JoinHandle; use tonic::transport::{Channel, Uri}; use tower::ServiceBuilder; @@ -13,7 +17,7 @@ use tower::ServiceBuilder; /// infrastructure. It is configured to authenticate itself with the /// device mTLS keypair and will sign outgoing requests with the same /// mTLS keypair. -pub type Client = NodeClient; +pub type GLClient = NodeClient; pub type GClient = GenericClient; @@ -23,6 +27,84 @@ pub trait GrpcClient { fn new_with_inner(inner: service::AuthService) -> Self; } +pub trait TrampolineClient { + fn trampoline_pay(&self) -> (); +} + +/// A wrapper for a `pb::greenlight` grpc node client that adds some custom +/// mehtods. It dereferences to the original tonic client stub. +#[derive(Clone)] +pub struct Client { + inner: GLClient, +} + +impl Client { + /// Is the same as `trampoline_pay` but keeps the grpc connection busy by + /// streaming the node logs as long as the trampoline_pay is inflight to + /// prevent an early timeout that would result in a transport error. + pub async fn trampoline_pay_keepalive( + &mut self, + request: impl tonic::IntoRequest, + ) -> Result, tonic::Status> { + let (stop_tx, mut stop_rx) = oneshot::channel::<()>(); + let mut inner = self.inner.clone(); + + let keep_alive_handle: JoinHandle> = tokio::spawn(async move { + let mut log_stream = inner + .stream_log(pb::greenlight::StreamLogRequest {}) + .await? + .into_inner(); + + loop { + tokio::select! { + _msg = &mut log_stream.next() => { + continue; + } + _ = &mut stop_rx => { + break; + } + }; + } + + Ok(()) + }); + + let res = self.inner.trampoline_pay(request).await; + if let Err(_) = stop_tx.send(()) { + debug!("could not send stop request to keep_alive_handle"); + }; + if let Err(err) = keep_alive_handle.await { + debug!("keep_alive_handle returned an error {}", err.to_string()); + }; + res + } + + pub fn into_inner(self) -> GLClient { + self.inner + } +} + +impl Deref for Client { + type Target = GLClient; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for Client { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl GrpcClient for Client { + fn new_with_inner(inner: service::AuthService) -> Self { + Self { + inner: GLClient::new(inner), + } + } +} /// A builder to configure a [`Client`] that can either connect to a /// node directly, assuming you have the `grpc_uri` that the node is /// listening on, or it can talk to the @@ -36,9 +118,9 @@ pub struct Node { rune: String, } -impl GrpcClient for Client { +impl GrpcClient for GLClient { fn new_with_inner(inner: service::AuthService) -> Self { - Client::new(inner) + GLClient::new(inner) } }