Skip to content

Commit

Permalink
implement persistent indexed_db session storage
Browse files Browse the repository at this point in the history
  • Loading branch information
borngraced committed Sep 26, 2024
1 parent 9f87a9e commit 6135665
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ async fn get_walletconnect_pubkey(
});
};

let walletconnect_ctx = WalletConnectCtx::from_ctx(ctx).expect("WalletConnectCtx should be initialized by now!");
let walletconnect_ctx =
WalletConnectCtx::try_from_ctx_or_initialize(ctx).expect("WalletConnectCtx should be initialized by now!");

let account = walletconnect_ctx
.cosmos_get_account(param.account_index, "cosmos", chain_id)
Expand Down
57 changes: 50 additions & 7 deletions mm2src/kdf_walletconnect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod storage;
use chain::{build_required_namespaces,
cosmos::{cosmos_get_accounts_impl, CosmosAccount},
SUPPORTED_CHAINS};
use common::executor::SpawnFuture;
use common::{executor::Timer,
log::{error, info}};
use error::WalletConnectCtxError;
Expand All @@ -34,7 +35,7 @@ use relay_rpc::{auth::{ed25519_dalek::SigningKey, AuthToken},
use serde_json::Value;
use session::{propose::send_proposal, Session, SymKeyPair};
use std::{sync::Arc, time::Duration};
use storage::SessionStorageDb;
use storage::{SessionStorageDb, WalletConnectStorageOps};
use wc_common::{decode_and_decrypt_type0, encrypt_and_encode, EnvelopeType};

pub(crate) const SUPPORTED_PROTOCOL: &str = "irn";
Expand Down Expand Up @@ -97,7 +98,7 @@ impl WalletConnectCtx {
})
}

pub fn from_ctx(ctx: &MmArc) -> MmResult<Arc<WalletConnectCtx>, WalletConnectCtxError> {
pub fn try_from_ctx_or_initialize(ctx: &MmArc) -> MmResult<Arc<WalletConnectCtx>, WalletConnectCtxError> {
from_ctx(&ctx.wallet_connect, move || {
Self::try_init(ctx).map_err(|err| err.to_string())
})
Expand Down Expand Up @@ -323,18 +324,18 @@ impl WalletConnectCtx {
Ok(())
}

pub async fn published_message_event_loop(self: Arc<Self>) {
let self_clone = self.clone();
pub async fn message_handler_event_loop(self: Arc<Self>) {
let selfi = self.clone();
let mut recv = self.inbound_message_handler.lock().await;
while let Some(msg) = recv.next().await {
info!("received message");
if let Err(e) = self_clone.handle_single_message(msg).await {
if let Err(e) = selfi.handle_published_message(msg).await {
info!("Error processing message: {:?}", e);
}
}
}

async fn handle_single_message(&self, msg: PublishedMessage) -> MmResult<(), WalletConnectCtxError> {
async fn handle_published_message(&self, msg: PublishedMessage) -> MmResult<(), WalletConnectCtxError> {
let message = {
let key = self.sym_key(&msg.topic).await?;
decode_and_decrypt_type0(msg.message.as_bytes(), &key).unwrap()
Expand All @@ -353,7 +354,28 @@ impl WalletConnectCtx {
Ok(())
}

pub async fn spawn_connection_live_watcher(self: Arc<Self>) {
async fn load_session_from_storage(&self) -> MmResult<(), WalletConnectCtxError> {
let sessions = self
.storage
.db
.get_all_sessions()
.await
.mm_err(|err| WalletConnectCtxError::StorageError(err.to_string()))?;
if let Some(session) = sessions.first() {
info!("Session found! activating :{}", session.topic);

let mut ctx_session = self.session.lock().await;
*ctx_session = Some(session.clone());

// subcribe to session topics
self.client.subscribe(session.topic.clone()).await?;
self.client.subscribe(session.pairing_topic.clone()).await?;
}

Ok(())
}

async fn connection_live_watcher(self: Arc<Self>) {
let mut recv = self.connection_live_handler.lock().await;
let mut retry_count = 0;

Expand Down Expand Up @@ -394,3 +416,24 @@ impl WalletConnectCtx {
}
}
}

/// This function spwans related WalletConnect related tasks and needed initialization before
/// WalletConnect can be usable in KDF.
pub async fn initialize_walletconnect(ctx: &MmArc) -> MmResult<(), WalletConnectCtxError> {
// Initialized WalletConnectCtx
let wallet_connect = WalletConnectCtx::try_from_ctx_or_initialize(&ctx)?;

// WalletConnectCtx is initialized, now we can connect to relayer client.
wallet_connect.connect_client().await?;

// spawn message handler event loop
ctx.spawner().spawn(wallet_connect.clone().message_handler_event_loop());

// spawn WalletConnect client disconnect task
ctx.spawner().spawn(wallet_connect.clone().connection_live_watcher());

// load session from storage
wallet_connect.load_session_from_storage().await?;

Ok(())
}
19 changes: 16 additions & 3 deletions mm2src/kdf_walletconnect/src/session/propose.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use super::{settle::send_session_settle_request, Session};
use crate::{error::WalletConnectCtxError,
session::{SessionKey, SessionType, THIRTY_DAYS},
storage::WalletConnectStorageOps,
WalletConnectCtx};

use chrono::Utc;
use mm2_err_handle::map_to_mm::MapToMmResult;
use mm2_err_handle::prelude::MmResult;
use mm2_err_handle::prelude::*;
use relay_rpc::{domain::{MessageId, Topic},
rpc::params::{session::ProposeNamespaces,
session_propose::{Proposer, SessionProposeRequest, SessionProposeResponse},
Expand Down Expand Up @@ -68,14 +69,20 @@ pub async fn process_proposal_request(
.map_to_mm(|err| WalletConnectCtxError::InternalError(err.to_string()))?;

{
// save session to storage
ctx.storage
.db
.save_session(&session)
.await
.mm_err(|err| WalletConnectCtxError::StorageError(err.to_string()))?;

let mut old_session = ctx.session.lock().await;
*old_session = Some(session.clone());
let mut subs = ctx.subscriptions.lock().await;
subs.push(session_topic.clone());
}

{
println!("{:?}", session);
send_session_settle_request(ctx, &session).await?;
};

Expand Down Expand Up @@ -125,7 +132,13 @@ pub(crate) async fn process_session_propose_response(
session.controller.public_key = response.responder_public_key;

{
println!("{:?}", session);
// save session to storage
ctx.storage
.db
.save_session(&session)
.await
.mm_err(|err| WalletConnectCtxError::StorageError(err.to_string()))?;

let mut old_session = ctx.session.lock().await;
*old_session = Some(session);
let mut subs = ctx.subscriptions.lock().await;
Expand Down
17 changes: 15 additions & 2 deletions mm2src/kdf_walletconnect/src/storage/indexed_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl WalletConnectStorageOps for IDBSessionStorage {

async fn is_initialized(&self) -> MmResult<bool, Self::Error> { Ok(true) }

async fn save_session(&self, session: Session) -> MmResult<(), Self::Error> {
async fn save_session(&self, session: &Session) -> MmResult<(), Self::Error> {
let lock_db = self.lock_db().await?;
let transaction = lock_db.get_inner().transaction().await?;
let session_table = transaction.table::<Session>().await?;
Expand All @@ -93,6 +93,19 @@ impl WalletConnectStorageOps for IDBSessionStorage {
.map(|s| s.1))
}

async fn get_all_sessions(&self) -> MmResult<Vec<Session>, Self::Error> {
let lock_db = self.lock_db().await?;
let transaction = lock_db.get_inner().transaction().await?;
let session_table = transaction.table::<Session>().await?;

Ok(session_table
.get_all_items()
.await?
.into_iter()
.map(|s| s.1)
.collect::<Vec<_>>())
}

async fn delete_session(&self, topic: &Topic) -> MmResult<(), Self::Error> {
let lock_db = self.lock_db().await?;
let transaction = lock_db.get_inner().transaction().await?;
Expand All @@ -102,5 +115,5 @@ impl WalletConnectStorageOps for IDBSessionStorage {
Ok(())
}

async fn update_session(&self, session: Session) -> MmResult<(), Self::Error> { self.save_session(session).await }
async fn update_session(&self, session: &Session) -> MmResult<(), Self::Error> { self.save_session(&session).await }
}
7 changes: 4 additions & 3 deletions mm2src/kdf_walletconnect/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ pub(crate) trait WalletConnectStorageOps {

async fn init(&self) -> MmResult<(), Self::Error>;
async fn is_initialized(&self) -> MmResult<bool, Self::Error>;
async fn save_session(&self, session: Session) -> MmResult<(), Self::Error>;
async fn save_session(&self, session: &Session) -> MmResult<(), Self::Error>;
async fn get_session(&self, topic: &Topic) -> MmResult<Option<Session>, Self::Error>;
async fn get_all_sessions(&self) -> MmResult<Vec<Session>, Self::Error>;
async fn delete_session(&self, topic: &Topic) -> MmResult<(), Self::Error>;
async fn update_session(&self, session: Session) -> MmResult<(), Self::Error>;
async fn update_session(&self, session: &Session) -> MmResult<(), Self::Error>;
}

pub(crate) struct SessionStorageDb {
Expand Down Expand Up @@ -85,7 +86,7 @@ pub(crate) mod session_storage_tests {
);

// try save session
wc_ctx.storage.db.save_session(session.clone()).await.unwrap();
wc_ctx.storage.db.save_session(&session).await.unwrap();

// try get session
let db_session = wc_ctx.storage.db.get_session(&session.topic).await.unwrap();
Expand Down
7 changes: 5 additions & 2 deletions mm2src/kdf_walletconnect/src/storage/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl WalletConnectStorageOps for SqliteSessionStorage {
.map_to_mm(AsyncConnError::from)
}

async fn save_session(&self, session: Session) -> MmResult<(), Self::Error> {
async fn save_session(&self, session: &Session) -> MmResult<(), Self::Error> {
let lock = self.lock_db().await;
validate_table_name(SESSION_TBALE_NAME).map_err(AsyncConnError::from)?;
let sql = format!(
Expand All @@ -88,6 +88,7 @@ impl WalletConnectStorageOps for SqliteSessionStorage {
SESSION_TBALE_NAME
);

let session = session.clone();
lock.call(move |conn| {
let transaction = conn.transaction()?;

Expand Down Expand Up @@ -127,7 +128,9 @@ impl WalletConnectStorageOps for SqliteSessionStorage {

async fn get_session(&self, topic: &Topic) -> MmResult<Option<Session>, Self::Error> { todo!() }

async fn get_all_sessions(&self) -> MmResult<Vec<Session>, Self::Error> { todo!() }

async fn delete_session(&self, topic: &Topic) -> MmResult<(), Self::Error> { todo!() }

async fn update_session(&self, _session: Session) -> MmResult<(), Self::Error> { todo!() }
async fn update_session(&self, _session: &Session) -> MmResult<(), Self::Error> { todo!() }
}
19 changes: 6 additions & 13 deletions mm2src/mm2_main/src/lp_native_dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use common::log::{info, warn};
use crypto::{from_hw_error, CryptoCtx, HwError, HwProcessingError, HwRpcError, WithHwRpcError};
use derive_more::Display;
use enum_derives::EnumFromTrait;
use kdf_walletconnect::WalletConnectCtx;
use kdf_walletconnect::initialize_walletconnect;
use mm2_core::mm_ctx::{MmArc, MmCtx};
use mm2_err_handle::common_errors::InternalError;
use mm2_err_handle::prelude::*;
Expand Down Expand Up @@ -473,18 +473,6 @@ pub async fn lp_init_continue(ctx: MmArc) -> MmInitResult<()> {

init_message_service(&ctx).await?;

// connect walletconnect
let wallet_connect =
WalletConnectCtx::from_ctx(&ctx).map_err(|err| MmInitError::WalletInitError(err.to_string()))?;
wallet_connect
.connect_client()
.await
.map_err(|err| MmInitError::WalletInitError(err.to_string()))?;
ctx.spawner()
.spawn(wallet_connect.clone().published_message_event_loop());
ctx.spawner()
.spawn(wallet_connect.clone().spawn_connection_live_watcher());

let balance_update_ordermatch_handler = BalanceUpdateOrdermatchHandler::new(ctx.clone());
register_balance_update_handler(ctx.clone(), Box::new(balance_update_ordermatch_handler)).await;

Expand All @@ -503,6 +491,11 @@ pub async fn lp_init_continue(ctx: MmArc) -> MmInitResult<()> {
#[cfg(target_arch = "wasm32")]
init_wasm_event_streaming(&ctx);

// Initialize WalletConnect
initialize_walletconnect(&ctx)
.await
.mm_err(|err| MmInitError::WalletInitError(err.to_string()))?;

ctx.spawner().spawn(clean_memory_loop(ctx.weak()));

Ok(())
Expand Down
6 changes: 4 additions & 2 deletions mm2src/mm2_main/src/rpc/lp_commands/lp_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ pub async fn connect_to_peer(
ctx: MmArc,
req: ConnectPairingRequest,
) -> MmResult<ConnectPairingResponse, TrezorConnectionError> {
let walletconnect_ctx = WalletConnectCtx::from_ctx(&ctx).expect("WalletConnectCtx should be initialized by now!");
let walletconnect_ctx =
WalletConnectCtx::try_from_ctx_or_initialize(&ctx).expect("WalletConnectCtx should be initialized by now!");

let topic = walletconnect_ctx
.connect_to_pairing(&req.url, true)
Expand All @@ -159,7 +160,8 @@ pub async fn create_new_pairing(
ctx: MmArc,
_req: CreatePairingRequest,
) -> MmResult<CreatePairingResponse, TrezorConnectionError> {
let walletconnect_ctx = WalletConnectCtx::from_ctx(&ctx).expect("WalletConnectCtx should be initialized by now!");
let walletconnect_ctx =
WalletConnectCtx::try_from_ctx_or_initialize(&ctx).expect("WalletConnectCtx should be initialized by now!");

let url = walletconnect_ctx
.create_pairing(None)
Expand Down

0 comments on commit 6135665

Please sign in to comment.