From 3b7aa2f544614d3c0297084f690d4e7a0537ce29 Mon Sep 17 00:00:00 2001 From: Ross Savage Date: Mon, 20 Jan 2025 13:43:28 +0100 Subject: [PATCH] Verify and claim swap if not verifiable in swap loop --- lib/core/src/persist/mod.rs | 2 +- lib/core/src/persist/receive.rs | 21 +++++++-- lib/core/src/receive_swap.rs | 78 ++++++++++++++++++++++++++++----- lib/core/src/sdk.rs | 2 + 4 files changed, 86 insertions(+), 17 deletions(-) diff --git a/lib/core/src/persist/mod.rs b/lib/core/src/persist/mod.rs index 1719c1163..0176f0024 100644 --- a/lib/core/src/persist/mod.rs +++ b/lib/core/src/persist/mod.rs @@ -334,7 +334,7 @@ impl Persister { .map(Swap::Send) .collect(); let ongoing_receive_swaps: Vec = self - .list_ongoing_receive_swaps()? + .list_ongoing_receive_swaps(None)? .into_iter() .map(Swap::Receive) .collect(); diff --git a/lib/core/src/persist/receive.rs b/lib/core/src/persist/receive.rs index 468e4d306..1bf8440bb 100644 --- a/lib/core/src/persist/receive.rs +++ b/lib/core/src/persist/receive.rs @@ -156,8 +156,12 @@ impl Persister { rs.created_at, rs.state, rs.pair_fees_json, - rs.version + rs.version, + + -- Used for filtering + sync_state.is_local FROM receive_swaps AS rs + LEFT JOIN sync_state ON rs.id = sync_state.data_id {where_clause_str} ORDER BY rs.created_at " @@ -222,12 +226,21 @@ impl Persister { Ok(ongoing_receive) } - pub(crate) fn list_ongoing_receive_swaps(&self) -> Result> { + pub(crate) fn list_ongoing_receive_swaps( + &self, + is_local: Option, + ) -> Result> { let con = self.get_connection()?; - let where_clause = vec![get_where_clause_state_in(&[ + let mut where_clause = vec![get_where_clause_state_in(&[ PaymentState::Created, PaymentState::Pending, ])]; + if let Some(is_local) = is_local { + where_clause.push(format!( + "(sync_state.is_local = {} OR sync_state.is_local IS NULL)", + is_local as i8 + )); + } self.list_receive_swaps_where(&con, where_clause) } @@ -426,7 +439,7 @@ mod tests { // List ongoing receive swaps storage.insert_or_update_receive_swap(&new_receive_swap(Some(PaymentState::Pending)))?; - let ongoing_swaps = storage.list_ongoing_receive_swaps()?; + let ongoing_swaps = storage.list_ongoing_receive_swaps(None)?; assert_eq!(ongoing_swaps.len(), 4); Ok(()) diff --git a/lib/core/src/receive_swap.rs b/lib/core/src/receive_swap.rs index 4d57f6c34..e2b1cd6aa 100644 --- a/lib/core/src/receive_swap.rs +++ b/lib/core/src/receive_swap.rs @@ -1,15 +1,16 @@ use std::{str::FromStr, sync::Arc}; use anyhow::{anyhow, Result}; +use async_trait::async_trait; use boltz_client::swaps::boltz::RevSwapStates; -use boltz_client::swaps::boltz::{self, SwapUpdateTxDetails}; -use boltz_client::{Serialize, ToHex}; +use boltz_client::{boltz, Serialize, ToHex}; use log::{debug, error, info, warn}; +use lwk_wollet::elements::Txid; use lwk_wollet::hashes::hex::DisplayHex; use tokio::sync::{broadcast, Mutex}; use crate::chain::liquid::LiquidChainService; -use crate::model::PaymentState::*; +use crate::model::{BlockListener, PaymentState::*}; use crate::model::{Config, PaymentTxData, PaymentType, ReceiveSwap}; use crate::prelude::{Swap, Transaction}; use crate::{ensure_sdk, utils}; @@ -32,6 +33,17 @@ pub(crate) struct ReceiveSwapHandler { liquid_chain_service: Arc>, } +#[async_trait] +impl BlockListener for ReceiveSwapHandler { + async fn on_bitcoin_block(&self, _height: u32) {} + + async fn on_liquid_block(&self, height: u32) { + if let Err(e) = self.claim_confirmed_lockups(height).await { + error!("Error claiming confirmed lockups: {e:?}"); + } + } +} + impl ReceiveSwapHandler { pub(crate) fn new( config: Config, @@ -116,7 +128,7 @@ impl ReceiveSwapHandler { // looking for lockup script history to verify lockup was broadcasted if let Err(e) = self - .verify_lockup_tx(&receive_swap, &transaction, false) + .verify_lockup_tx(&receive_swap, &transaction.id, &transaction.hex, false) .await { return Err(anyhow!( @@ -193,7 +205,7 @@ impl ReceiveSwapHandler { // looking for lockup script history to verify lockup was broadcasted and confirmed if let Err(e) = self - .verify_lockup_tx(&receive_swap, &transaction, true) + .verify_lockup_tx(&receive_swap, &transaction.id, &transaction.hex, true) .await { return Err(anyhow!( @@ -369,6 +381,52 @@ impl ReceiveSwapHandler { } } + async fn claim_confirmed_lockups(&self, height: u32) -> Result<()> { + let receive_swaps: Vec = self + .persister + .list_ongoing_receive_swaps(Some(true))? + .into_iter() + .filter(|s| s.lockup_tx_id.is_some() && s.claim_tx_id.is_none()) + .collect(); + info!( + "Rescanning {} Receive Swap(s) lockup txs at height {}", + receive_swaps.len(), + height + ); + for swap in receive_swaps { + if let Err(e) = self.claim_confirmed_lockup(&swap).await { + error!( + "Error rescanning server lockup of incoming Chain Swap {}: {e:?}", + swap.id, + ); + } + } + Ok(()) + } + + async fn claim_confirmed_lockup(&self, receive_swap: &ReceiveSwap) -> Result<()> { + let Some(tx_id) = receive_swap.lockup_tx_id.clone() else { + // Skip the rescan if there is no lockup_tx_id yet + return Ok(()); + }; + let swap_id = &receive_swap.id; + let tx_hex = self + .liquid_chain_service + .lock() + .await + .get_transaction_hex(&Txid::from_str(&tx_id)?) + .await? + .ok_or(anyhow!("Lockup tx not found for Receive swap {swap_id}"))? + .serialize() + .to_lower_hex_string(); + self.verify_lockup_tx(receive_swap, &tx_id, &tx_hex, true) + .await?; + info!("Receive Swap {swap_id} lockup tx is confirmed"); + self.claim(swap_id) + .await + .map_err(|e| anyhow!("Could not claim Receive Swap {swap_id}: {e:?}")) + } + fn validate_state_transition( from_state: PaymentState, to_state: PaymentState, @@ -415,7 +473,8 @@ impl ReceiveSwapHandler { async fn verify_lockup_tx( &self, receive_swap: &ReceiveSwap, - swap_update_tx: &SwapUpdateTxDetails, + tx_id: &str, + tx_hex: &str, verify_confirmation: bool, ) -> Result<()> { // Looking for lockup script history to verify lockup was broadcasted @@ -429,12 +488,7 @@ impl ReceiveSwapHandler { self.liquid_chain_service .lock() .await - .verify_tx( - &address, - &swap_update_tx.id, - &swap_update_tx.hex, - verify_confirmation, - ) + .verify_tx(&address, tx_id, tx_hex, verify_confirmation) .await?; Ok(()) } diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index 34ada20df..09710c8ed 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -397,10 +397,12 @@ impl LiquidSdk { // Update swap handlers if is_new_liquid_block { cloned.chain_swap_handler.on_liquid_block(current_liquid_block).await; + cloned.receive_swap_handler.on_liquid_block(current_liquid_block).await; cloned.send_swap_handler.on_liquid_block(current_liquid_block).await; } if is_new_bitcoin_block { cloned.chain_swap_handler.on_bitcoin_block(current_bitcoin_block).await; + cloned.receive_swap_handler.on_bitcoin_block(current_liquid_block).await; cloned.send_swap_handler.on_bitcoin_block(current_bitcoin_block).await; } }