Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify and claim swap if not verifiable in swap loop #681

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/core/src/persist/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl Persister {
.map(Swap::Send)
.collect();
let ongoing_receive_swaps: Vec<Swap> = self
.list_ongoing_receive_swaps()?
.list_ongoing_receive_swaps(None)?
.into_iter()
.map(Swap::Receive)
.collect();
Expand Down
21 changes: 17 additions & 4 deletions lib/core/src/persist/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,12 @@ impl Persister {
rs.created_at,
rs.state,
rs.pair_fees_json,
rs.version
rs.version,

-- Used for filtering
sync_state.is_local
FROM receive_swaps AS rs
LEFT JOIN sync_state ON rs.id = sync_state.data_id
{where_clause_str}
ORDER BY rs.created_at
"
Expand Down Expand Up @@ -222,12 +226,21 @@ impl Persister {
Ok(ongoing_receive)
}

pub(crate) fn list_ongoing_receive_swaps(&self) -> Result<Vec<ReceiveSwap>> {
pub(crate) fn list_ongoing_receive_swaps(
&self,
is_local: Option<bool>,
) -> Result<Vec<ReceiveSwap>> {
let con = self.get_connection()?;
let where_clause = vec![get_where_clause_state_in(&[
let mut where_clause = vec![get_where_clause_state_in(&[
PaymentState::Created,
PaymentState::Pending,
])];
if let Some(is_local) = is_local {
where_clause.push(format!(
"(sync_state.is_local = {} OR sync_state.is_local IS NULL)",
is_local as i8
));
}

self.list_receive_swaps_where(&con, where_clause)
}
Expand Down Expand Up @@ -426,7 +439,7 @@ mod tests {

// List ongoing receive swaps
storage.insert_or_update_receive_swap(&new_receive_swap(Some(PaymentState::Pending)))?;
let ongoing_swaps = storage.list_ongoing_receive_swaps()?;
let ongoing_swaps = storage.list_ongoing_receive_swaps(None)?;
assert_eq!(ongoing_swaps.len(), 4);

Ok(())
Expand Down
78 changes: 66 additions & 12 deletions lib/core/src/receive_swap.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::{str::FromStr, sync::Arc};

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use boltz_client::swaps::boltz::RevSwapStates;
use boltz_client::swaps::boltz::{self, SwapUpdateTxDetails};
use boltz_client::{Serialize, ToHex};
use boltz_client::{boltz, Serialize, ToHex};
use log::{debug, error, info, warn};
use lwk_wollet::elements::Txid;
use lwk_wollet::hashes::hex::DisplayHex;
use tokio::sync::{broadcast, Mutex};

use crate::chain::liquid::LiquidChainService;
use crate::model::PaymentState::*;
use crate::model::{BlockListener, PaymentState::*};
use crate::model::{Config, PaymentTxData, PaymentType, ReceiveSwap};
use crate::prelude::{Swap, Transaction};
use crate::{ensure_sdk, utils};
Expand All @@ -32,6 +33,17 @@ pub(crate) struct ReceiveSwapHandler {
liquid_chain_service: Arc<Mutex<dyn LiquidChainService>>,
}

#[async_trait]
impl BlockListener for ReceiveSwapHandler {
async fn on_bitcoin_block(&self, _height: u32) {}

async fn on_liquid_block(&self, height: u32) {
if let Err(e) = self.claim_confirmed_lockups(height).await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to claim also unconfirmed that are in the zero-conf range?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The claim should have happened for TransactionMempool status already. There the tx is verified without the confirmation check.

Shall we add this logic here also to only verify confirmation on non zero-conf swaps?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking perhaps the code that should claim on TransactionMempool status might also not find the transaction in the mempool and skip the claim. But this is ok I think to focus on confirmed ones as it is a fallback after all.

error!("Error claiming confirmed lockups: {e:?}");
}
}
}

impl ReceiveSwapHandler {
pub(crate) fn new(
config: Config,
Expand Down Expand Up @@ -116,7 +128,7 @@ impl ReceiveSwapHandler {

// looking for lockup script history to verify lockup was broadcasted
if let Err(e) = self
.verify_lockup_tx(&receive_swap, &transaction, false)
.verify_lockup_tx(&receive_swap, &transaction.id, &transaction.hex, false)
.await
{
return Err(anyhow!(
Expand Down Expand Up @@ -193,7 +205,7 @@ impl ReceiveSwapHandler {

// looking for lockup script history to verify lockup was broadcasted and confirmed
if let Err(e) = self
.verify_lockup_tx(&receive_swap, &transaction, true)
.verify_lockup_tx(&receive_swap, &transaction.id, &transaction.hex, true)
.await
{
return Err(anyhow!(
Expand Down Expand Up @@ -369,6 +381,52 @@ impl ReceiveSwapHandler {
}
}

async fn claim_confirmed_lockups(&self, height: u32) -> Result<()> {
let receive_swaps: Vec<ReceiveSwap> = self
.persister
.list_ongoing_receive_swaps(Some(true))?
.into_iter()
.filter(|s| s.lockup_tx_id.is_some() && s.claim_tx_id.is_none())
.collect();
info!(
"Rescanning {} Receive Swap(s) lockup txs at height {}",
receive_swaps.len(),
height
);
for swap in receive_swaps {
if let Err(e) = self.claim_confirmed_lockup(&swap).await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to skip these with claim_tx_id set here? Otherwise we will probably worst case log here an error.

error!(
"Error rescanning server lockup of incoming Chain Swap {}: {e:?}",
swap.id,
);
}
}
Ok(())
}

async fn claim_confirmed_lockup(&self, receive_swap: &ReceiveSwap) -> Result<()> {
let Some(tx_id) = receive_swap.lockup_tx_id.clone() else {
// Skip the rescan if there is no lockup_tx_id yet
return Ok(());
};
let swap_id = &receive_swap.id;
let tx_hex = self
.liquid_chain_service
.lock()
.await
.get_transaction_hex(&Txid::from_str(&tx_id)?)
.await?
.ok_or(anyhow!("Lockup tx not found for Receive swap {swap_id}"))?
.serialize()
.to_lower_hex_string();
self.verify_lockup_tx(receive_swap, &tx_id, &tx_hex, true)
.await?;
Comment on lines +413 to +423
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but this seems a bit inefficient. Are we fetching the tx hex just for the sake of being able to reuse the existing verify_tx method? If so, maybe we should keep in mind to optimize this later

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there's any way to verify the tx without passing the whole hex string.
We need it to ensure ensure that the double hash of the encoded tx is equal to the provided tx id.

info!("Receive Swap {swap_id} lockup tx is confirmed");
self.claim(swap_id)
.await
.map_err(|e| anyhow!("Could not claim Receive Swap {swap_id}: {e:?}"))
}

fn validate_state_transition(
from_state: PaymentState,
to_state: PaymentState,
Expand Down Expand Up @@ -415,7 +473,8 @@ impl ReceiveSwapHandler {
async fn verify_lockup_tx(
&self,
receive_swap: &ReceiveSwap,
swap_update_tx: &SwapUpdateTxDetails,
tx_id: &str,
tx_hex: &str,
verify_confirmation: bool,
) -> Result<()> {
Comment on lines 473 to 479
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we also verify the incoming amount here?

// Looking for lockup script history to verify lockup was broadcasted
Expand All @@ -429,12 +488,7 @@ impl ReceiveSwapHandler {
self.liquid_chain_service
.lock()
.await
.verify_tx(
&address,
&swap_update_tx.id,
&swap_update_tx.hex,
verify_confirmation,
)
.verify_tx(&address, tx_id, tx_hex, verify_confirmation)
.await?;
Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions lib/core/src/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,12 @@ impl LiquidSdk {
// Update swap handlers
if is_new_liquid_block {
cloned.chain_swap_handler.on_liquid_block(current_liquid_block).await;
cloned.receive_swap_handler.on_liquid_block(current_liquid_block).await;
cloned.send_swap_handler.on_liquid_block(current_liquid_block).await;
}
if is_new_bitcoin_block {
cloned.chain_swap_handler.on_bitcoin_block(current_bitcoin_block).await;
cloned.receive_swap_handler.on_bitcoin_block(current_liquid_block).await;
cloned.send_swap_handler.on_bitcoin_block(current_bitcoin_block).await;
}
}
Expand Down
Loading