Skip to content

Commit

Permalink
trampoline: Add keepalive trampoline_pay version
Browse files Browse the repository at this point in the history
This commit adds `trampoline_pay_keepalive` to the greenlight grpc node
client. This version does exactly the same as `trampoline_pay` but
streams log messages from the node as long as `trampoline_pay` is
inflight, in order to keep the connection busy.

Signed-off-by: Peter Neuroth <[email protected]>
  • Loading branch information
nepet committed Jul 29, 2024
1 parent ebb1c5f commit 1b14fe6
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 6 deletions.
4 changes: 2 additions & 2 deletions libs/gl-client-py/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Node {
}

fn trampoline_pay(
&self,
&mut self,
bolt11: String,
trampoline_node_id: Vec<u8>,
amount_msat: Option<u64>,
Expand All @@ -76,7 +76,7 @@ impl Node {
maxdelay: maxdelay.unwrap_or_default(),
description: description.unwrap_or_default(),
};
let res = exec(async { self.client.clone().trampoline_pay(req).await })
let res = exec(async { self.client.trampoline_pay_keepalive(req).await })
.map_err(error_calling_remote_method)?
.into_inner();
convert(Ok(res))
Expand Down
11 changes: 11 additions & 0 deletions libs/gl-client-py/tests/plugins/trmp_htlc_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from binascii import unhexlify
from pyln.proto.primitives import varint_decode
from io import BytesIO
import time

INVOICE_TYPE = 33001
AMOUNT_TYPE = 33003
Expand All @@ -16,6 +17,7 @@
plugin.check_invoice = None
plugin.check_amount = None
plugin.payment_key = None
plugin.waitfor = None


@plugin.hook("htlc_accepted")
Expand All @@ -30,6 +32,9 @@ def on_htlc_accepted(htlc, onion, plugin, **kwargs):
invoice_value = payment_metadata.read(invoice_length)
assert invoice_type == INVOICE_TYPE

if plugin.waitfor is not None:
time.sleep(plugin.waitfor)

if plugin.check_invoice is not None:
plugin.log(
f"check invoice {invoice_value.decode('utf-8')} matches {plugin.check_invoice}"
Expand Down Expand Up @@ -87,4 +92,10 @@ def unsetchecks(plugin):
plugin.check_amount = None


@plugin.method("waitfor")
def waitfor(plugin, duration_sec):
"""Waits for duration_sec before continuing with completion"""
plugin.waitfor = duration_sec


plugin.run()
90 changes: 86 additions & 4 deletions libs/gl-client/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ use crate::pb::cln::node_client as cln_client;
use crate::pb::node_client::NodeClient;
use crate::pb::scheduler::{scheduler_client::SchedulerClient, ScheduleRequest};
use crate::tls::TlsConfig;
use crate::utils;
use crate::{pb, utils};
use anyhow::{anyhow, Result};
use futures::StreamExt;
use log::{debug, info, trace};
use std::ops::{Deref, DerefMut};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tonic::transport::{Channel, Uri};
use tower::ServiceBuilder;

/// A client to the remotely running node on the greenlight
/// infrastructure. It is configured to authenticate itself with the
/// device mTLS keypair and will sign outgoing requests with the same
/// mTLS keypair.
pub type Client = NodeClient<service::AuthService>;
pub type GLClient = NodeClient<service::AuthService>;

pub type GClient = GenericClient<service::AuthService>;

Expand All @@ -23,6 +27,84 @@ pub trait GrpcClient {
fn new_with_inner(inner: service::AuthService) -> Self;
}

pub trait TrampolineClient {
fn trampoline_pay(&self) -> ();
}

/// A wrapper for a `pb::greenlight` grpc node client that adds some custom
/// mehtods. It dereferences to the original tonic client stub.
#[derive(Clone)]
pub struct Client {
inner: GLClient,
}

impl Client {
/// Is the same as `trampoline_pay` but keeps the grpc connection busy by
/// streaming the node logs as long as the trampoline_pay is inflight to
/// prevent an early timeout that would result in a transport error.
pub async fn trampoline_pay_keepalive(
&mut self,
request: impl tonic::IntoRequest<pb::greenlight::TrampolinePayRequest>,
) -> Result<tonic::Response<pb::greenlight::TrampolinePayResponse>, tonic::Status> {
let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
let mut inner = self.inner.clone();

let keep_alive_handle: JoinHandle<Result<(), tonic::Status>> = tokio::spawn(async move {
let mut log_stream = inner
.stream_log(pb::greenlight::StreamLogRequest {})
.await?
.into_inner();

loop {
tokio::select! {
_msg = &mut log_stream.next() => {
continue;
}
_ = &mut stop_rx => {
break;
}
};
}

Ok(())
});

let res = self.inner.trampoline_pay(request).await;
if let Err(_) = stop_tx.send(()) {
debug!("could not send stop request to keep_alive_handle");
};
if let Err(err) = keep_alive_handle.await {
debug!("keep_alive_handle returned an error {}", err.to_string());
};
res
}

pub fn into_inner(self) -> GLClient {
self.inner
}
}

impl Deref for Client {
type Target = GLClient;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl DerefMut for Client {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

impl GrpcClient for Client {
fn new_with_inner(inner: service::AuthService) -> Self {
Self {
inner: GLClient::new(inner),
}
}
}
/// A builder to configure a [`Client`] that can either connect to a
/// node directly, assuming you have the `grpc_uri` that the node is
/// listening on, or it can talk to the
Expand All @@ -36,9 +118,9 @@ pub struct Node {
rune: String,
}

impl GrpcClient for Client {
impl GrpcClient for GLClient {
fn new_with_inner(inner: service::AuthService) -> Self {
Client::new(inner)
GLClient::new(inner)
}
}

Expand Down

0 comments on commit 1b14fe6

Please sign in to comment.