Skip to content

Commit

Permalink
fix(mem-leak): running_swap never shrinks (#2301)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariocynicys authored Jan 22, 2025
1 parent 3075a10 commit da29651
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 60 deletions.
12 changes: 4 additions & 8 deletions mm2src/mm2_main/src/lp_native_dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ use crate::lp_message_service::{init_message_service, InitMessageServiceError};
use crate::lp_network::{lp_network_ports, p2p_event_process_loop, subscribe_to_topic, NetIdError};
use crate::lp_ordermatch::{broadcast_maker_orders_keep_alive_loop, clean_memory_loop, init_ordermatch_context,
lp_ordermatch_loop, orders_kick_start, BalanceUpdateOrdermatchHandler, OrdermatchInitError};
use crate::lp_swap::{running_swaps_num, swap_kick_starts};
use crate::lp_swap;
use crate::lp_swap::swap_kick_starts;
use crate::lp_wallet::{initialize_wallet_passphrase, WalletInitError};
use crate::rpc::spawn_rpc;

Expand Down Expand Up @@ -535,14 +536,9 @@ pub async fn lp_init(ctx: MmArc, version: String, datetime: String) -> MmInitRes
};
Timer::sleep(0.2).await
}
// Clearing up the running swaps removes any circular references that might prevent the context from being dropped.
lp_swap::clear_running_swaps(&ctx);

// wait for swaps to stop
loop {
if running_swaps_num(&ctx) == 0 {
break;
};
Timer::sleep(0.2).await
}
Ok(())
}

Expand Down
73 changes: 33 additions & 40 deletions mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ use std::convert::TryFrom;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex, Weak};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use uuid::Uuid;

Expand Down Expand Up @@ -518,7 +518,7 @@ struct LockedAmountInfo {
}

struct SwapsContext {
running_swaps: Mutex<Vec<Weak<dyn AtomicSwap>>>,
running_swaps: Mutex<HashMap<Uuid, Arc<dyn AtomicSwap>>>,
active_swaps_v2_infos: Mutex<HashMap<Uuid, ActiveSwapV2Info>>,
banned_pubkeys: Mutex<HashMap<H256Json, BanReason>>,
swap_msgs: Mutex<HashMap<Uuid, SwapMsgStore>>,
Expand All @@ -534,7 +534,7 @@ impl SwapsContext {
fn from_ctx(ctx: &MmArc) -> Result<Arc<SwapsContext>, String> {
Ok(try_s!(from_ctx(&ctx.swaps_ctx, move || {
Ok(SwapsContext {
running_swaps: Mutex::new(vec![]),
running_swaps: Mutex::new(HashMap::new()),
active_swaps_v2_infos: Mutex::new(HashMap::new()),
banned_pubkeys: Mutex::new(HashMap::new()),
swap_msgs: Mutex::new(HashMap::new()),
Expand Down Expand Up @@ -619,21 +619,21 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber {
let swap_ctx = SwapsContext::from_ctx(ctx).unwrap();
let swap_lock = swap_ctx.running_swaps.lock().unwrap();

let mut locked = swap_lock
.iter()
.filter_map(|swap| swap.upgrade())
.flat_map(|swap| swap.locked_amount())
.fold(MmNumber::from(0), |mut total_amount, locked| {
if locked.coin == coin {
total_amount += locked.amount;
}
if let Some(trade_fee) = locked.trade_fee {
if trade_fee.coin == coin && !trade_fee.paid_from_trading_vol {
total_amount += trade_fee.amount;
let mut locked =
swap_lock
.values()
.flat_map(|swap| swap.locked_amount())
.fold(MmNumber::from(0), |mut total_amount, locked| {
if locked.coin == coin {
total_amount += locked.amount;
}
}
total_amount
});
if let Some(trade_fee) = locked.trade_fee {
if trade_fee.coin == coin && !trade_fee.paid_from_trading_vol {
total_amount += trade_fee.amount;
}
}
total_amount
});
drop(swap_lock);

let locked_amounts = swap_ctx.locked_amounts.lock().unwrap();
Expand All @@ -654,14 +654,12 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber {
locked
}

/// Get number of currently running swaps
pub fn running_swaps_num(ctx: &MmArc) -> u64 {
/// Clear up all the running swaps.
///
/// This doesn't mean that these swaps will be stopped. They can only be stopped from the abortable systems they are running on top of.
pub fn clear_running_swaps(ctx: &MmArc) {
let swap_ctx = SwapsContext::from_ctx(ctx).unwrap();
let swaps = swap_ctx.running_swaps.lock().unwrap();
swaps.iter().fold(0, |total, swap| match swap.upgrade() {
Some(_) => total + 1,
None => total,
})
swap_ctx.running_swaps.lock().unwrap().clear();
}

/// Get total amount of selected coin locked by all currently ongoing swaps except the one with selected uuid
Expand All @@ -670,8 +668,7 @@ fn get_locked_amount_by_other_swaps(ctx: &MmArc, except_uuid: &Uuid, coin: &str)
let swap_lock = swap_ctx.running_swaps.lock().unwrap();

swap_lock
.iter()
.filter_map(|swap| swap.upgrade())
.values()
.filter(|swap| swap.uuid() != except_uuid)
.flat_map(|swap| swap.locked_amount())
.fold(MmNumber::from(0), |mut total_amount, locked| {
Expand All @@ -691,11 +688,9 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet<String>) -> Result<
let swap_ctx = try_s!(SwapsContext::from_ctx(ctx));
let swaps = try_s!(swap_ctx.running_swaps.lock());
let mut uuids = vec![];
for swap in swaps.iter() {
if let Some(swap) = swap.upgrade() {
if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) {
uuids.push(*swap.uuid())
}
for swap in swaps.values() {
if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) {
uuids.push(*swap.uuid())
}
}
drop(swaps);
Expand All @@ -711,15 +706,13 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet<String>) -> Result<

pub fn active_swaps(ctx: &MmArc) -> Result<Vec<(Uuid, u8)>, String> {
let swap_ctx = try_s!(SwapsContext::from_ctx(ctx));
let swaps = swap_ctx.running_swaps.lock().unwrap();
let mut uuids = vec![];
for swap in swaps.iter() {
if let Some(swap) = swap.upgrade() {
uuids.push((*swap.uuid(), LEGACY_SWAP_TYPE))
}
}

drop(swaps);
let mut uuids: Vec<_> = swap_ctx
.running_swaps
.lock()
.unwrap()
.keys()
.map(|uuid| (*uuid, LEGACY_SWAP_TYPE))
.collect();

let swaps_v2 = swap_ctx.active_swaps_v2_infos.lock().unwrap();
uuids.extend(swaps_v2.iter().map(|(uuid, info)| (*uuid, info.swap_type)));
Expand Down
10 changes: 8 additions & 2 deletions mm2src/mm2_main/src/lp_swap/maker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2120,10 +2120,14 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) {
};
}
let running_swap = Arc::new(swap);
let weak_ref = Arc::downgrade(&running_swap);
let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap();
swap_ctx.init_msg_store(running_swap.uuid, running_swap.taker);
swap_ctx.running_swaps.lock().unwrap().push(weak_ref);
// Register the swap in the running swaps map.
swap_ctx
.running_swaps
.lock()
.unwrap()
.insert(uuid, running_swap.clone());
let mut swap_fut = Box::pin(
async move {
let mut events;
Expand Down Expand Up @@ -2187,6 +2191,8 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) {
_swap = swap_fut => (), // swap finished normally
_touch = touch_loop => unreachable!("Touch loop can not stop!"),
};
// Remove the swap from the running swaps map.
swap_ctx.running_swaps.lock().unwrap().remove(&uuid);
}

pub struct MakerSwapPreparedParams {
Expand Down
24 changes: 14 additions & 10 deletions mm2src/mm2_main/src/lp_swap/taker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,14 +458,17 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
let ctx = swap.ctx.clone();
subscribe_to_topic(&ctx, swap_topic(&swap.uuid));
let mut status = ctx.log.status_handle();
let uuid = swap.uuid.to_string();
let uuid_str = uuid.to_string();
let to_broadcast = !(swap.maker_coin.is_privacy() || swap.taker_coin.is_privacy());
let running_swap = Arc::new(swap);
let weak_ref = Arc::downgrade(&running_swap);
let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap();
swap_ctx.init_msg_store(running_swap.uuid, running_swap.maker);
swap_ctx.running_swaps.lock().unwrap().push(weak_ref);

// Register the swap in the running swaps map.
swap_ctx
.running_swaps
.lock()
.unwrap()
.insert(uuid, running_swap.clone());
let mut swap_fut = Box::pin(
async move {
let mut events;
Expand All @@ -491,10 +494,10 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
}

if event.is_error() {
error!("[swap uuid={uuid}] {event:?}");
error!("[swap uuid={uuid_str}] {event:?}");
}

status.status(&[&"swap", &("uuid", uuid.as_str())], &event.status_str());
status.status(&[&"swap", &("uuid", uuid_str.as_str())], &event.status_str());
running_swap.apply_event(event);
}
match res.0 {
Expand All @@ -503,12 +506,12 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
},
None => {
if let Err(e) = mark_swap_as_finished(ctx.clone(), running_swap.uuid).await {
error!("!mark_swap_finished({}): {}", uuid, e);
error!("!mark_swap_finished({}): {}", uuid_str, e);
}

if to_broadcast {
if let Err(e) = broadcast_my_swap_status(&ctx, running_swap.uuid).await {
error!("!broadcast_my_swap_status({}): {}", uuid, e);
error!("!broadcast_my_swap_status({}): {}", uuid_str, e);
}
}
break;
Expand All @@ -522,6 +525,8 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
_swap = swap_fut => (), // swap finished normally
_touch = touch_loop => unreachable!("Touch loop can not stop!"),
};
// Remove the swap from the running swaps map.
swap_ctx.running_swaps.lock().unwrap().remove(&uuid);
}

#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
Expand Down Expand Up @@ -3308,8 +3313,7 @@ mod taker_swap_tests {
.unwrap();
let swaps_ctx = SwapsContext::from_ctx(&ctx).unwrap();
let arc = Arc::new(swap);
let weak_ref = Arc::downgrade(&arc);
swaps_ctx.running_swaps.lock().unwrap().push(weak_ref);
swaps_ctx.running_swaps.lock().unwrap().insert(arc.uuid, arc);

let actual = get_locked_amount(&ctx, "RICK");
assert_eq!(actual, MmNumber::from(0));
Expand Down

0 comments on commit da29651

Please sign in to comment.