diff --git a/mm2src/coins/lp_coins.rs b/mm2src/coins/lp_coins.rs index 01302a38ba..a9badd82b6 100644 --- a/mm2src/coins/lp_coins.rs +++ b/mm2src/coins/lp_coins.rs @@ -2177,7 +2177,6 @@ pub struct WithdrawRequest { #[cfg(target_arch = "wasm32")] #[serde(default)] broadcast: bool, - client_id: Option, } #[derive(Debug, Deserialize)] @@ -5666,7 +5665,7 @@ pub mod for_tests { use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::MmResult; use mm2_number::BigDecimal; - use rpc_task::RpcTaskStatus; + use rpc_task::{RpcInitReq, RpcTaskStatus}; use std::str::FromStr; /// Helper to call init_withdraw and wait for completion @@ -5678,15 +5677,18 @@ pub mod for_tests { from_derivation_path: Option<&str>, fee: Option, ) -> MmResult { - let withdraw_req = WithdrawRequest { - amount: BigDecimal::from_str(amount).unwrap(), - from: from_derivation_path.map(|from_derivation_path| WithdrawFrom::DerivationPath { - derivation_path: from_derivation_path.to_owned(), - }), - to: to.to_owned(), - coin: ticker.to_owned(), - fee, - ..Default::default() + let withdraw_req = RpcInitReq { + client_id: 0, + inner: WithdrawRequest { + amount: BigDecimal::from_str(amount).unwrap(), + from: from_derivation_path.map(|from_derivation_path| WithdrawFrom::DerivationPath { + derivation_path: from_derivation_path.to_owned(), + }), + to: to.to_owned(), + coin: ticker.to_owned(), + fee, + ..Default::default() + }, }; let init = init_withdraw(ctx.clone(), withdraw_req).await.unwrap(); let timeout = wait_until_ms(150000); diff --git a/mm2src/coins/rpc_command/get_new_address.rs b/mm2src/coins/rpc_command/get_new_address.rs index aa3636f918..a4a1a0c5ce 100644 --- a/mm2src/coins/rpc_command/get_new_address.rs +++ b/mm2src/coins/rpc_command/get_new_address.rs @@ -16,8 +16,8 @@ use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; use rpc_task::rpc_common::{CancelRpcTaskError, CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusError, RpcTaskStatusRequest, RpcTaskUserActionError}; -use rpc_task::{RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, - RpcTaskTypes}; +use rpc_task::{RpcInitReq, RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, + RpcTaskStatus, RpcTaskTypes}; use std::time::Duration; pub type GetNewAddressUserAction = HwRpcTaskUserAction; @@ -204,7 +204,6 @@ pub struct GetNewAddressRequest { coin: String, #[serde(flatten)] params: GetNewAddressParams, - client_id: Option, } #[derive(Clone, Deserialize)] @@ -292,8 +291,6 @@ impl RpcTaskTypes for InitGetNewAddressTask { impl RpcTask for InitGetNewAddressTask { fn initial_status(&self) -> Self::InProgressStatus { GetNewAddressInProgressStatus::Preparing } - fn client_id(&self) -> Option { self.req.client_id } - // Do nothing if the task has been cancelled. async fn cancel(self) {} @@ -382,13 +379,15 @@ pub async fn get_new_address( /// TODO remove once GUI integrates `task::get_new_address::init`. pub async fn init_get_new_address( ctx: MmArc, - req: GetNewAddressRequest, + req: RpcInitReq, ) -> MmResult { + let (client_id, req) = (req.client_id, req.inner); let coin = lp_coinfind_or_err(&ctx, &req.coin).await?; let coins_ctx = CoinsContext::from_ctx(&ctx).map_to_mm(GetNewAddressRpcError::Internal)?; let spawner = coin.spawner(); let task = InitGetNewAddressTask { ctx, coin, req }; - let task_id = GetNewAddressTaskManager::spawn_rpc_task(&coins_ctx.get_new_address_manager, &spawner, task)?; + let task_id = + GetNewAddressTaskManager::spawn_rpc_task(&coins_ctx.get_new_address_manager, &spawner, task, client_id)?; Ok(InitRpcTaskResponse { task_id }) } diff --git a/mm2src/coins/rpc_command/init_account_balance.rs b/mm2src/coins/rpc_command/init_account_balance.rs index e7d7cea183..287c40dcb8 100644 --- a/mm2src/coins/rpc_command/init_account_balance.rs +++ b/mm2src/coins/rpc_command/init_account_balance.rs @@ -7,7 +7,8 @@ use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; use rpc_task::rpc_common::{CancelRpcTaskError, CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusError, RpcTaskStatusRequest}; -use rpc_task::{RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, RpcTaskTypes}; +use rpc_task::{RpcInitReq, RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, + RpcTaskTypes}; pub type AccountBalanceUserAction = SerdeInfallible; pub type AccountBalanceAwaitingStatus = SerdeInfallible; @@ -31,7 +32,6 @@ pub struct InitAccountBalanceRequest { coin: String, #[serde(flatten)] params: InitAccountBalanceParams, - client_id: Option, } #[derive(Clone, Deserialize)] @@ -66,8 +66,6 @@ impl RpcTaskTypes for InitAccountBalanceTask { impl RpcTask for InitAccountBalanceTask { fn initial_status(&self) -> Self::InProgressStatus { AccountBalanceInProgressStatus::RequestingAccountBalance } - fn client_id(&self) -> Option { self.req.client_id } - // Do nothing if the task has been cancelled. async fn cancel(self) {} @@ -92,13 +90,15 @@ impl RpcTask for InitAccountBalanceTask { pub async fn init_account_balance( ctx: MmArc, - req: InitAccountBalanceRequest, + req: RpcInitReq, ) -> MmResult { + let (client_id, req) = (req.client_id, req.inner); let coin = lp_coinfind_or_err(&ctx, &req.coin).await?; let spawner = coin.spawner(); let coins_ctx = CoinsContext::from_ctx(&ctx).map_to_mm(HDAccountBalanceRpcError::Internal)?; let task = InitAccountBalanceTask { coin, req }; - let task_id = AccountBalanceTaskManager::spawn_rpc_task(&coins_ctx.account_balance_task_manager, &spawner, task)?; + let task_id = + AccountBalanceTaskManager::spawn_rpc_task(&coins_ctx.account_balance_task_manager, &spawner, task, client_id)?; Ok(InitRpcTaskResponse { task_id }) } diff --git a/mm2src/coins/rpc_command/init_create_account.rs b/mm2src/coins/rpc_command/init_create_account.rs index 695b3e5170..ae139f09a5 100644 --- a/mm2src/coins/rpc_command/init_create_account.rs +++ b/mm2src/coins/rpc_command/init_create_account.rs @@ -14,8 +14,8 @@ use mm2_err_handle::prelude::*; use parking_lot::Mutex as PaMutex; use rpc_task::rpc_common::{CancelRpcTaskError, CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusError, RpcTaskStatusRequest, RpcTaskUserActionError}; -use rpc_task::{RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, - RpcTaskTypes}; +use rpc_task::{RpcInitReq, RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, + RpcTaskStatus, RpcTaskTypes}; use std::sync::Arc; use std::time::Duration; @@ -164,7 +164,6 @@ pub struct CreateNewAccountRequest { coin: String, #[serde(flatten)] params: CreateNewAccountParams, - client_id: Option, } #[derive(Clone, Deserialize)] @@ -239,8 +238,6 @@ impl RpcTaskTypes for InitCreateAccountTask { impl RpcTask for InitCreateAccountTask { fn initial_status(&self) -> Self::InProgressStatus { CreateAccountInProgressStatus::Preparing } - fn client_id(&self) -> Option { self.req.client_id } - async fn cancel(self) { if let Some(account_id) = self.task_state.create_account_id() { // We created the account already, so need to revert the changes. @@ -332,8 +329,9 @@ impl RpcTask for InitCreateAccountTask { pub async fn init_create_new_account( ctx: MmArc, - req: CreateNewAccountRequest, + req: RpcInitReq, ) -> MmResult { + let (client_id, req) = (req.client_id, req.inner); let coin = lp_coinfind_or_err(&ctx, &req.coin).await?; let coins_ctx = CoinsContext::from_ctx(&ctx).map_to_mm(CreateAccountRpcError::Internal)?; let spawner = coin.spawner(); @@ -343,7 +341,8 @@ pub async fn init_create_new_account( req, task_state: CreateAccountState::default(), }; - let task_id = CreateAccountTaskManager::spawn_rpc_task(&coins_ctx.create_account_manager, &spawner, task)?; + let task_id = + CreateAccountTaskManager::spawn_rpc_task(&coins_ctx.create_account_manager, &spawner, task, client_id)?; Ok(InitRpcTaskResponse { task_id }) } diff --git a/mm2src/coins/rpc_command/init_scan_for_new_addresses.rs b/mm2src/coins/rpc_command/init_scan_for_new_addresses.rs index ebbb4d4b86..5195248b7d 100644 --- a/mm2src/coins/rpc_command/init_scan_for_new_addresses.rs +++ b/mm2src/coins/rpc_command/init_scan_for_new_addresses.rs @@ -8,7 +8,8 @@ use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; use rpc_task::rpc_common::{CancelRpcTaskError, CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusError, RpcTaskStatusRequest}; -use rpc_task::{RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, RpcTaskTypes}; +use rpc_task::{RpcInitReq, RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, + RpcTaskTypes}; pub type ScanAddressesUserAction = SerdeInfallible; pub type ScanAddressesAwaitingStatus = SerdeInfallible; @@ -43,7 +44,6 @@ pub struct ScanAddressesRequest { coin: String, #[serde(flatten)] params: ScanAddressesParams, - client_id: Option, } #[derive(Clone, Deserialize)] @@ -88,8 +88,6 @@ impl RpcTask for InitScanAddressesTask { #[inline] fn initial_status(&self) -> Self::InProgressStatus { ScanAddressesInProgressStatus::InProgress } - fn client_id(&self) -> Option { self.req.client_id } - // Do nothing if the task has been cancelled. async fn cancel(self) {} @@ -111,13 +109,15 @@ impl RpcTask for InitScanAddressesTask { pub async fn init_scan_for_new_addresses( ctx: MmArc, - req: ScanAddressesRequest, + req: RpcInitReq, ) -> MmResult { + let (client_id, req) = (req.client_id, req.inner); let coin = lp_coinfind_or_err(&ctx, &req.coin).await?; let spawner = coin.spawner(); let coins_ctx = CoinsContext::from_ctx(&ctx).map_to_mm(HDAccountBalanceRpcError::Internal)?; let task = InitScanAddressesTask { req, coin }; - let task_id = ScanAddressesTaskManager::spawn_rpc_task(&coins_ctx.scan_addresses_manager, &spawner, task)?; + let task_id = + ScanAddressesTaskManager::spawn_rpc_task(&coins_ctx.scan_addresses_manager, &spawner, task, client_id)?; Ok(InitRpcTaskResponse { task_id }) } diff --git a/mm2src/coins/rpc_command/init_withdraw.rs b/mm2src/coins/rpc_command/init_withdraw.rs index 09728d4785..e82ccd4d63 100644 --- a/mm2src/coins/rpc_command/init_withdraw.rs +++ b/mm2src/coins/rpc_command/init_withdraw.rs @@ -7,7 +7,8 @@ use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; use rpc_task::rpc_common::{CancelRpcTaskError, CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusError, RpcTaskStatusRequest, RpcTaskUserActionError}; -use rpc_task::{RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatusAlias, RpcTaskTypes}; +use rpc_task::{RpcInitReq, RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatusAlias, + RpcTaskTypes}; pub type WithdrawAwaitingStatus = HwRpcTaskAwaitingStatus; pub type WithdrawUserAction = HwRpcTaskUserAction; @@ -32,7 +33,11 @@ pub trait CoinWithdrawInit { ) -> WithdrawInitResult; } -pub async fn init_withdraw(ctx: MmArc, request: WithdrawRequest) -> WithdrawInitResult { +pub async fn init_withdraw( + ctx: MmArc, + request: RpcInitReq, +) -> WithdrawInitResult { + let (client_id, request) = (request.client_id, request.inner); let coin = lp_coinfind_or_err(&ctx, &request.coin).await?; let spawner = coin.spawner(); let task = WithdrawTask { @@ -41,7 +46,7 @@ pub async fn init_withdraw(ctx: MmArc, request: WithdrawRequest) -> WithdrawInit request, }; let coins_ctx = CoinsContext::from_ctx(&ctx).map_to_mm(WithdrawError::InternalError)?; - let task_id = WithdrawTaskManager::spawn_rpc_task(&coins_ctx.withdraw_task_manager, &spawner, task)?; + let task_id = WithdrawTaskManager::spawn_rpc_task(&coins_ctx.withdraw_task_manager, &spawner, task, client_id)?; Ok(InitWithdrawResponse { task_id }) } @@ -123,8 +128,6 @@ impl RpcTaskTypes for WithdrawTask { impl RpcTask for WithdrawTask { fn initial_status(&self) -> Self::InProgressStatus { WithdrawInProgressStatus::Preparing } - fn client_id(&self) -> Option { self.request.client_id } - // Do nothing if the task has been cancelled. async fn cancel(self) {} diff --git a/mm2src/coins_activation/src/init_token.rs b/mm2src/coins_activation/src/init_token.rs index 38e433285f..58d9ed9180 100644 --- a/mm2src/coins_activation/src/init_token.rs +++ b/mm2src/coins_activation/src/init_token.rs @@ -15,8 +15,8 @@ use mm2_err_handle::mm_error::{MmError, MmResult, NotEqual, NotMmError}; use mm2_err_handle::prelude::*; use rpc_task::rpc_common::{CancelRpcTaskError, CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusError, RpcTaskStatusRequest, RpcTaskUserActionError, RpcTaskUserActionRequest}; -use rpc_task::{RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, - RpcTaskTypes, TaskId}; +use rpc_task::{RpcInitReq, RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, + RpcTaskStatus, RpcTaskTypes, TaskId}; use ser_error_derive::SerializeErrorType; use serde_derive::{Deserialize, Serialize}; use std::time::Duration; @@ -38,7 +38,6 @@ pub type CancelInitTokenError = CancelRpcTaskError; pub struct InitTokenReq { ticker: String, activation_params: T, - client_id: Option, } /// Trait for the initializing a token using the task manager. @@ -83,7 +82,7 @@ pub trait InitTokenActivationOps: Into + TokenOf + Clone + Send + Sy /// Implementation of the init token RPC command. pub async fn init_token( ctx: MmArc, - request: InitTokenReq, + request: RpcInitReq>, ) -> MmResult where Token: InitTokenActivationOps + Send + Sync + 'static, @@ -91,6 +90,7 @@ where InitTokenError: From, (Token::ActivationError, InitTokenError): NotEqual, { + let (client_id, request) = (request.client_id, request.inner); if let Ok(Some(_)) = lp_coinfind(&ctx, &request.ticker).await { return MmError::err(InitTokenError::TokenIsAlreadyActivated { ticker: request.ticker }); } @@ -117,7 +117,7 @@ where }; let task_manager = Token::rpc_task_manager(&coins_act_ctx); - let task_id = RpcTaskManager::spawn_rpc_task(task_manager, &spawner, task) + let task_id = RpcTaskManager::spawn_rpc_task(task_manager, &spawner, task, client_id) .mm_err(|e| InitTokenError::Internal(e.to_string()))?; Ok(InitTokenResponse { task_id }) @@ -196,8 +196,6 @@ where ::initial_status() } - fn client_id(&self) -> Option { self.request.client_id } - /// Try to disable the coin in case if we managed to register it already. async fn cancel(self) { if let Ok(c_ctx) = CoinsContext::from_ctx(&self.ctx) { diff --git a/mm2src/coins_activation/src/l2/init_l2.rs b/mm2src/coins_activation/src/l2/init_l2.rs index ab59fc2a6c..14aee68afe 100644 --- a/mm2src/coins_activation/src/l2/init_l2.rs +++ b/mm2src/coins_activation/src/l2/init_l2.rs @@ -10,7 +10,8 @@ use common::SuccessResponse; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; use rpc_task::rpc_common::{CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusRequest, RpcTaskUserActionRequest}; -use rpc_task::{RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, RpcTaskTypes}; +use rpc_task::{RpcInitReq, RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, + RpcTaskTypes}; use serde_derive::Deserialize; use serde_json::Value as Json; @@ -24,7 +25,6 @@ pub type InitL2TaskHandleShared = RpcTaskHandleShared>; pub struct InitL2Req { ticker: String, activation_params: T, - client_id: Option, } pub trait L2ProtocolParams { @@ -68,13 +68,14 @@ pub trait InitL2ActivationOps: Into + Send + Sync + 'static { pub async fn init_l2( ctx: MmArc, - req: InitL2Req, + req: RpcInitReq>, ) -> Result> where L2: InitL2ActivationOps, InitL2Error: From, (L2::ActivationError, InitL2Error): NotEqual, { + let (client_id, req) = (req.client_id, req.inner); let ticker = req.ticker.clone(); if let Ok(Some(_)) = lp_coinfind(&ctx, &ticker).await { return MmError::err(InitL2Error::L2IsAlreadyActivated(ticker)); @@ -99,11 +100,9 @@ where let coins_act_ctx = CoinsActivationContext::from_ctx(&ctx).map_to_mm(InitL2Error::Internal)?; let spawner = ctx.spawner(); - let client_id = req.client_id; let task = InitL2Task:: { ctx, ticker, - client_id, platform_coin, validated_params, protocol_conf, @@ -111,7 +110,7 @@ where }; let task_manager = L2::rpc_task_manager(&coins_act_ctx); - let task_id = RpcTaskManager::spawn_rpc_task(task_manager, &spawner, task) + let task_id = RpcTaskManager::spawn_rpc_task(task_manager, &spawner, task, client_id) .mm_err(|e| InitL2Error::Internal(e.to_string()))?; Ok(InitL2Response { task_id }) @@ -164,7 +163,6 @@ pub async fn cancel_init_l2( pub struct InitL2Task { ctx: MmArc, ticker: String, - client_id: Option, platform_coin: L2::PlatformCoin, validated_params: L2::ValidatedParams, protocol_conf: L2::ProtocolInfo, @@ -188,8 +186,6 @@ where ::initial_status() } - fn client_id(&self) -> Option { self.client_id } - /// Try to disable the coin in case if we managed to register it already. async fn cancel(self) { if let Ok(ctx) = CoinsContext::from_ctx(&self.ctx) { diff --git a/mm2src/coins_activation/src/platform_coin_with_tokens.rs b/mm2src/coins_activation/src/platform_coin_with_tokens.rs index 0c0c1f6106..ed72183116 100644 --- a/mm2src/coins_activation/src/platform_coin_with_tokens.rs +++ b/mm2src/coins_activation/src/platform_coin_with_tokens.rs @@ -16,8 +16,8 @@ use mm2_err_handle::prelude::*; use mm2_number::BigDecimal; use rpc_task::rpc_common::{CancelRpcTaskError, CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusError, RpcTaskStatusRequest, RpcTaskUserActionError, RpcTaskUserActionRequest}; -use rpc_task::{RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, - RpcTaskTypes, TaskId}; +use rpc_task::{RpcInitReq, RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, + RpcTaskStatus, RpcTaskTypes, TaskId}; use ser_error_derive::SerializeErrorType; use serde_derive::{Deserialize, Serialize}; use serde_json::Value as Json; @@ -223,7 +223,6 @@ pub struct EnablePlatformCoinWithTokensReq { ticker: String, #[serde(flatten)] request: T, - client_id: Option, } #[derive(Debug, Display, Serialize, SerializeErrorType, Clone)] @@ -510,8 +509,6 @@ where ::initial_status() } - fn client_id(&self) -> Option { self.request.client_id } - /// Try to disable the coin in case if we managed to register it already. async fn cancel(self) {} @@ -550,7 +547,7 @@ impl InitPlatformCoinWithTokensInitialStatus for InitPlatformCoinWithTokensInPro /// Implementation of the init platform coin with tokens RPC command. pub async fn init_platform_coin_with_tokens( ctx: MmArc, - request: EnablePlatformCoinWithTokensReq, + request: RpcInitReq>, ) -> MmResult where Platform: PlatformCoinWithTokensActivationOps + Send + Sync + 'static + Clone, @@ -558,6 +555,7 @@ where EnablePlatformCoinWithTokensError: From, (Platform::ActivationError, EnablePlatformCoinWithTokensError): NotEqual, { + let (client_id, request) = (request.client_id, request.inner); if let Ok(Some(_)) = lp_coinfind(&ctx, &request.ticker).await { return MmError::err(EnablePlatformCoinWithTokensError::PlatformIsAlreadyActivated( request.ticker, @@ -570,7 +568,7 @@ where let task = InitPlatformCoinWithTokensTask:: { ctx, request }; let task_manager = Platform::rpc_task_manager(&coins_act_ctx); - let task_id = RpcTaskManager::spawn_rpc_task(task_manager, &spawner, task) + let task_id = RpcTaskManager::spawn_rpc_task(task_manager, &spawner, task, client_id) .mm_err(|e| EnablePlatformCoinWithTokensError::Internal(e.to_string()))?; Ok(EnablePlatformCoinWithTokensResponse { task_id }) @@ -654,7 +652,7 @@ pub mod for_tests { use common::{executor::Timer, now_ms, wait_until_ms}; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::MmResult; - use rpc_task::RpcTaskStatus; + use rpc_task::{RpcInitReq, RpcTaskStatus}; use super::{init_platform_coin_with_tokens, init_platform_coin_with_tokens_status, EnablePlatformCoinWithTokensError, EnablePlatformCoinWithTokensReq, @@ -672,6 +670,10 @@ pub mod for_tests { EnablePlatformCoinWithTokensError: From, (Platform::ActivationError, EnablePlatformCoinWithTokensError): NotEqual, { + let request = RpcInitReq { + client_id: 0, + inner: request, + }; let init_result = init_platform_coin_with_tokens::(ctx.clone(), request) .await .unwrap(); diff --git a/mm2src/coins_activation/src/standalone_coin/init_standalone_coin.rs b/mm2src/coins_activation/src/standalone_coin/init_standalone_coin.rs index 3288832add..c4e7fa8c92 100644 --- a/mm2src/coins_activation/src/standalone_coin/init_standalone_coin.rs +++ b/mm2src/coins_activation/src/standalone_coin/init_standalone_coin.rs @@ -13,7 +13,8 @@ use mm2_err_handle::prelude::*; use mm2_metrics::MetricsArc; use mm2_number::BigDecimal; use rpc_task::rpc_common::{CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusRequest, RpcTaskUserActionRequest}; -use rpc_task::{RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, RpcTaskTypes}; +use rpc_task::{RpcInitReq, RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, + RpcTaskTypes}; use serde_derive::Deserialize; use serde_json::Value as Json; use std::collections::HashMap; @@ -28,7 +29,6 @@ pub type InitStandaloneCoinTaskHandleShared = RpcTaskHandleShared { ticker: String, activation_params: T, - client_id: Option, } #[async_trait] @@ -79,7 +79,7 @@ pub trait InitStandaloneCoinActivationOps: Into + Send + Sync + 'sta pub async fn init_standalone_coin( ctx: MmArc, - request: InitStandaloneCoinReq, + request: RpcInitReq>, ) -> MmResult where Standalone: InitStandaloneCoinActivationOps + Send + Sync + 'static, @@ -87,6 +87,7 @@ where InitStandaloneCoinError: From, (Standalone::ActivationError, InitStandaloneCoinError): NotEqual, { + let (client_id, request) = (request.client_id, request.inner); if let Ok(Some(_)) = lp_coinfind(&ctx, &request.ticker).await { return MmError::err(InitStandaloneCoinError::CoinIsAlreadyActivated { ticker: request.ticker }); } @@ -103,7 +104,7 @@ where }; let task_manager = Standalone::rpc_task_manager(&coins_act_ctx); - let task_id = RpcTaskManager::spawn_rpc_task(task_manager, &spawner, task) + let task_id = RpcTaskManager::spawn_rpc_task(task_manager, &spawner, task, client_id) .mm_err(|e| InitStandaloneCoinError::Internal(e.to_string()))?; Ok(InitStandaloneCoinResponse { task_id }) @@ -184,8 +185,6 @@ where ::initial_status() } - fn client_id(&self) -> Option { self.request.client_id } - /// Try to disable the coin in case if we managed to register it already. async fn cancel(self) { if let Ok(c_ctx) = CoinsContext::from_ctx(&self.ctx) { diff --git a/mm2src/coins_activation/src/utxo_activation/mod.rs b/mm2src/coins_activation/src/utxo_activation/mod.rs index 52244ed2dc..42764e5c93 100644 --- a/mm2src/coins_activation/src/utxo_activation/mod.rs +++ b/mm2src/coins_activation/src/utxo_activation/mod.rs @@ -16,7 +16,7 @@ pub mod for_tests { use common::{executor::Timer, now_ms, wait_until_ms}; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::{MmResult, NotEqual}; - use rpc_task::RpcTaskStatus; + use rpc_task::{RpcInitReq, RpcTaskStatus}; use crate::{init_standalone_coin, init_standalone_coin_status, standalone_coin::{InitStandaloneCoinActivationOps, InitStandaloneCoinError, @@ -34,6 +34,10 @@ pub mod for_tests { InitStandaloneCoinError: From, (Standalone::ActivationError, InitStandaloneCoinError): NotEqual, { + let request = RpcInitReq { + client_id: 0, + inner: request, + }; let init_result = init_standalone_coin::(ctx.clone(), request).await.unwrap(); let timeout = wait_until_ms(150000); loop { diff --git a/mm2src/mm2_main/src/lp_init/init_hw.rs b/mm2src/mm2_main/src/lp_init/init_hw.rs index 0315b17aa3..6148a44f53 100644 --- a/mm2src/mm2_main/src/lp_init/init_hw.rs +++ b/mm2src/mm2_main/src/lp_init/init_hw.rs @@ -12,8 +12,8 @@ use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; use rpc_task::rpc_common::{CancelRpcTaskError, CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusError, RpcTaskStatusRequest, RpcTaskUserActionError}; -use rpc_task::{RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, - RpcTaskTypes}; +use rpc_task::{RpcInitReq, RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, + RpcTaskStatus, RpcTaskTypes}; use std::sync::Arc; use std::time::Duration; @@ -101,7 +101,6 @@ pub enum InitHwInProgressStatus { #[derive(Deserialize, Clone)] pub struct InitHwRequest { device_pubkey: Option, - client_id: Option, } #[derive(Clone, Serialize, Debug, Deserialize)] @@ -130,8 +129,6 @@ impl RpcTaskTypes for InitHwTask { impl RpcTask for InitHwTask { fn initial_status(&self) -> Self::InProgressStatus { InitHwInProgressStatus::Initializing } - fn client_id(&self) -> Option { self.req.client_id } - async fn cancel(self) { if let Ok(crypto_ctx) = CryptoCtx::from_ctx(&self.ctx) { crypto_ctx.reset_hw_ctx() @@ -168,7 +165,8 @@ impl RpcTask for InitHwTask { } } -pub async fn init_trezor(ctx: MmArc, req: InitHwRequest) -> MmResult { +pub async fn init_trezor(ctx: MmArc, req: RpcInitReq) -> MmResult { + let (client_id, req) = (req.client_id, req.inner); let init_ctx = MmInitContext::from_ctx(&ctx).map_to_mm(InitHwError::Internal)?; let spawner = ctx.spawner(); let task = InitHwTask { @@ -176,7 +174,7 @@ pub async fn init_trezor(ctx: MmArc, req: InitHwRequest) -> MmResult, } #[derive(Clone, Serialize)] @@ -116,8 +115,6 @@ impl RpcTaskTypes for InitMetamaskTask { impl RpcTask for InitMetamaskTask { fn initial_status(&self) -> Self::InProgressStatus { InitMetamaskInProgressStatus::Initializing } - fn client_id(&self) -> Option { self.req.client_id } - async fn cancel(self) { if let Ok(crypto_ctx) = CryptoCtx::from_ctx(&self.ctx) { crypto_ctx.reset_metamask_ctx(); @@ -136,12 +133,13 @@ impl RpcTask for InitMetamaskTask { pub async fn connect_metamask( ctx: MmArc, - req: InitMetamaskRequest, + req: RpcInitReq, ) -> MmResult { + let (client_id, req) = (req.client_id, req.inner); let init_ctx = MmInitContext::from_ctx(&ctx).map_to_mm(InitMetamaskError::Internal)?; let spawner = ctx.spawner(); let task = InitMetamaskTask { ctx, req }; - let task_id = RpcTaskManager::spawn_rpc_task(&init_ctx.init_metamask_manager, &spawner, task)?; + let task_id = RpcTaskManager::spawn_rpc_task(&init_ctx.init_metamask_manager, &spawner, task, client_id)?; Ok(InitRpcTaskResponse { task_id }) } diff --git a/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs b/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs index 57a4709758..a6e312ff3d 100644 --- a/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs +++ b/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs @@ -5991,7 +5991,7 @@ mod trezor_tests { withdraw_status, MarketMakerIt, Mm2TestConf, ETH_SEPOLIA_NODES, ETH_SEPOLIA_SWAP_CONTRACT}; use mm2_test_helpers::structs::{InitTaskResult, RpcV2Response, TransactionDetails, WithdrawStatus}; - use rpc_task::{rpc_common::RpcTaskStatusRequest, RpcTaskStatus}; + use rpc_task::{rpc_common::RpcTaskStatusRequest, RpcInitReq, RpcTaskStatus}; use serde_json::{self as json, json, Value as Json}; use std::io::{stdin, stdout, BufRead, Write}; @@ -6008,7 +6008,7 @@ mod trezor_tests { let ctx = mm_ctx_with_custom_db_with_conf(Some(conf)); CryptoCtx::init_with_iguana_passphrase(ctx.clone(), "123456").unwrap(); // for now we need passphrase seed for init - let req: InitHwRequest = serde_json::from_value(json!({ "device_pubkey": null })).unwrap(); + let req: RpcInitReq = serde_json::from_value(json!({ "device_pubkey": null })).unwrap(); let res = match init_trezor(ctx.clone(), req).await { Ok(res) => res, _ => { diff --git a/mm2src/rpc_task/src/lib.rs b/mm2src/rpc_task/src/lib.rs index f5861f37cc..2e8f703d87 100644 --- a/mm2src/rpc_task/src/lib.rs +++ b/mm2src/rpc_task/src/lib.rs @@ -16,7 +16,7 @@ mod task; pub use handle::{RpcTaskHandle, RpcTaskHandleShared}; pub use manager::{RpcTaskManager, RpcTaskManagerShared}; -pub use task::{RpcTask, RpcTaskTypes}; +pub use task::{RpcInitReq, RpcTask, RpcTaskTypes}; pub type RpcTaskResult = Result>; pub type TaskId = u64; diff --git a/mm2src/rpc_task/src/manager.rs b/mm2src/rpc_task/src/manager.rs index 12f1a4ed23..7e8e38fc24 100644 --- a/mm2src/rpc_task/src/manager.rs +++ b/mm2src/rpc_task/src/manager.rs @@ -39,7 +39,12 @@ pub struct RpcTaskManager { impl RpcTaskManager { /// Create new instance of `RpcTaskHandle` attached to the only one `RpcTask`. /// This function registers corresponding RPC task in the `RpcTaskManager` and returns the task id. - pub fn spawn_rpc_task(this: &RpcTaskManagerShared, spawner: &F, mut task: Task) -> RpcTaskResult + pub fn spawn_rpc_task( + this: &RpcTaskManagerShared, + spawner: &F, + mut task: Task, + client_id: u64, + ) -> RpcTaskResult where F: SpawnFuture, { @@ -47,7 +52,7 @@ impl RpcTaskManager { let mut task_manager = this .lock() .map_to_mm(|e| RpcTaskError::Internal(format!("RpcTaskManager is not available: {}", e)))?; - task_manager.register_task(&task)? + task_manager.register_task(&task, client_id)? }; let task_handle = Arc::new(RpcTaskHandle { task_manager: RpcTaskManagerShared::downgrade(this), @@ -117,7 +122,7 @@ impl RpcTaskManager { fn get_client_id(&self, task_id: TaskId) -> Option { self.tasks.get(&task_id).and_then(|task| match task { - TaskStatusExt::InProgress { client_id, .. } | TaskStatusExt::Awaiting { client_id, .. } => *client_id, + TaskStatusExt::InProgress { client_id, .. } | TaskStatusExt::Awaiting { client_id, .. } => Some(*client_id), _ => None, }) } @@ -153,7 +158,7 @@ impl RpcTaskManager { } } - pub(crate) fn register_task(&mut self, task: &Task) -> RpcTaskResult<(TaskId, TaskAbortHandler)> { + pub(crate) fn register_task(&mut self, task: &Task, client_id: u64) -> RpcTaskResult<(TaskId, TaskAbortHandler)> { let task_id = next_rpc_task_id(); let (abort_handle, abort_handler) = oneshot::channel(); match self.tasks.entry(task_id) { @@ -162,7 +167,7 @@ impl RpcTaskManager { entry.insert(TaskStatusExt::InProgress { status: task.initial_status(), abort_handle, - client_id: task.client_id(), + client_id, }); Ok((task_id, abort_handler)) }, @@ -347,7 +352,7 @@ enum TaskStatusExt { status: Task::InProgressStatus, abort_handle: TaskAbortHandle, /// The ID of the client requesting the task. To stream out the updates & results for them. - client_id: Option, + client_id: u64, }, Awaiting { status: Task::AwaitingStatus, @@ -355,7 +360,7 @@ enum TaskStatusExt { next_in_progress_status: Task::InProgressStatus, abort_handle: TaskAbortHandle, /// The ID of the client requesting the task. To stream out the updates & results for them. - client_id: Option, + client_id: u64, }, /// `Cancelling` status is set on [`RpcTaskManager::cancel_task`]. /// This status is used to save the task state before it's actually canceled on [`RpcTaskHandle::on_canceled`], diff --git a/mm2src/rpc_task/src/task.rs b/mm2src/rpc_task/src/task.rs index c66fb4cef0..06f50e87da 100644 --- a/mm2src/rpc_task/src/task.rs +++ b/mm2src/rpc_task/src/task.rs @@ -15,13 +15,21 @@ pub trait RpcTaskTypes { pub trait RpcTask: RpcTaskTypes + Sized + Send + 'static { fn initial_status(&self) -> Self::InProgressStatus; - /// Returns the ID of the client that initiated/requesting the task. - /// - /// This is related to event streaming and is used to identify the client to whom the updates shall be sent. - fn client_id(&self) -> Option; - /// The method is invoked when the task has been cancelled. async fn cancel(self); async fn run(&mut self, task_handle: RpcTaskHandleShared) -> Result>; } + +#[derive(Deserialize)] +/// The general request for initializing an RPC Task. +/// +/// `client_id` is used to identify the client to which the task should stream out update events +/// to and is common in each request. Other data is request-specific. +pub struct RpcInitReq { + // If the client ID isn't included, assume it's 0. + #[serde(default)] + pub client_id: u64, + #[serde(flatten)] + pub inner: T, +}