Skip to content

Commit

Permalink
set default cliend_id for rpc tasks
Browse files Browse the repository at this point in the history
this sets a default client id to zero for rpc managed tasks, so for the default case of single user they don't have to provide an ID to get progress updates.
this also introduces a new composition struct `RpcInitReq` for init rpcs which that holds the clinet_id instead of having to declare it in each RPC request. also the client_id is not removed from the RpcTask and supplied to the task manager directly so we don't have to store the clinet_id in each task as well.
  • Loading branch information
mariocynicys committed Oct 6, 2024
1 parent 41b35e1 commit 91d98ed
Show file tree
Hide file tree
Showing 17 changed files with 113 additions and 102 deletions.
24 changes: 13 additions & 11 deletions mm2src/coins/lp_coins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2177,7 +2177,6 @@ pub struct WithdrawRequest {
#[cfg(target_arch = "wasm32")]
#[serde(default)]
broadcast: bool,
client_id: Option<u64>,
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -5666,7 +5665,7 @@ pub mod for_tests {
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::MmResult;
use mm2_number::BigDecimal;
use rpc_task::RpcTaskStatus;
use rpc_task::{RpcInitReq, RpcTaskStatus};
use std::str::FromStr;

/// Helper to call init_withdraw and wait for completion
Expand All @@ -5678,15 +5677,18 @@ pub mod for_tests {
from_derivation_path: Option<&str>,
fee: Option<WithdrawFee>,
) -> MmResult<TransactionDetails, WithdrawError> {
let withdraw_req = WithdrawRequest {
amount: BigDecimal::from_str(amount).unwrap(),
from: from_derivation_path.map(|from_derivation_path| WithdrawFrom::DerivationPath {
derivation_path: from_derivation_path.to_owned(),
}),
to: to.to_owned(),
coin: ticker.to_owned(),
fee,
..Default::default()
let withdraw_req = RpcInitReq {
client_id: 0,
inner: WithdrawRequest {
amount: BigDecimal::from_str(amount).unwrap(),
from: from_derivation_path.map(|from_derivation_path| WithdrawFrom::DerivationPath {
derivation_path: from_derivation_path.to_owned(),
}),
to: to.to_owned(),
coin: ticker.to_owned(),
fee,
..Default::default()
},
};
let init = init_withdraw(ctx.clone(), withdraw_req).await.unwrap();
let timeout = wait_until_ms(150000);
Expand Down
13 changes: 6 additions & 7 deletions mm2src/coins/rpc_command/get_new_address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use rpc_task::rpc_common::{CancelRpcTaskError, CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusError,
RpcTaskStatusRequest, RpcTaskUserActionError};
use rpc_task::{RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus,
RpcTaskTypes};
use rpc_task::{RpcInitReq, RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared,
RpcTaskStatus, RpcTaskTypes};
use std::time::Duration;

pub type GetNewAddressUserAction = HwRpcTaskUserAction;
Expand Down Expand Up @@ -204,7 +204,6 @@ pub struct GetNewAddressRequest {
coin: String,
#[serde(flatten)]
params: GetNewAddressParams,
client_id: Option<u64>,
}

#[derive(Clone, Deserialize)]
Expand Down Expand Up @@ -292,8 +291,6 @@ impl RpcTaskTypes for InitGetNewAddressTask {
impl RpcTask for InitGetNewAddressTask {
fn initial_status(&self) -> Self::InProgressStatus { GetNewAddressInProgressStatus::Preparing }

fn client_id(&self) -> Option<u64> { self.req.client_id }

// Do nothing if the task has been cancelled.
async fn cancel(self) {}

Expand Down Expand Up @@ -382,13 +379,15 @@ pub async fn get_new_address(
/// TODO remove once GUI integrates `task::get_new_address::init`.
pub async fn init_get_new_address(
ctx: MmArc,
req: GetNewAddressRequest,
req: RpcInitReq<GetNewAddressRequest>,
) -> MmResult<InitRpcTaskResponse, GetNewAddressRpcError> {
let (client_id, req) = (req.client_id, req.inner);
let coin = lp_coinfind_or_err(&ctx, &req.coin).await?;
let coins_ctx = CoinsContext::from_ctx(&ctx).map_to_mm(GetNewAddressRpcError::Internal)?;
let spawner = coin.spawner();
let task = InitGetNewAddressTask { ctx, coin, req };
let task_id = GetNewAddressTaskManager::spawn_rpc_task(&coins_ctx.get_new_address_manager, &spawner, task)?;
let task_id =
GetNewAddressTaskManager::spawn_rpc_task(&coins_ctx.get_new_address_manager, &spawner, task, client_id)?;
Ok(InitRpcTaskResponse { task_id })
}

Expand Down
12 changes: 6 additions & 6 deletions mm2src/coins/rpc_command/init_account_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use rpc_task::rpc_common::{CancelRpcTaskError, CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusError,
RpcTaskStatusRequest};
use rpc_task::{RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, RpcTaskTypes};
use rpc_task::{RpcInitReq, RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus,
RpcTaskTypes};

pub type AccountBalanceUserAction = SerdeInfallible;
pub type AccountBalanceAwaitingStatus = SerdeInfallible;
Expand All @@ -31,7 +32,6 @@ pub struct InitAccountBalanceRequest {
coin: String,
#[serde(flatten)]
params: InitAccountBalanceParams,
client_id: Option<u64>,
}

#[derive(Clone, Deserialize)]
Expand Down Expand Up @@ -66,8 +66,6 @@ impl RpcTaskTypes for InitAccountBalanceTask {
impl RpcTask for InitAccountBalanceTask {
fn initial_status(&self) -> Self::InProgressStatus { AccountBalanceInProgressStatus::RequestingAccountBalance }

fn client_id(&self) -> Option<u64> { self.req.client_id }

// Do nothing if the task has been cancelled.
async fn cancel(self) {}

Expand All @@ -92,13 +90,15 @@ impl RpcTask for InitAccountBalanceTask {

pub async fn init_account_balance(
ctx: MmArc,
req: InitAccountBalanceRequest,
req: RpcInitReq<InitAccountBalanceRequest>,
) -> MmResult<InitRpcTaskResponse, HDAccountBalanceRpcError> {
let (client_id, req) = (req.client_id, req.inner);
let coin = lp_coinfind_or_err(&ctx, &req.coin).await?;
let spawner = coin.spawner();
let coins_ctx = CoinsContext::from_ctx(&ctx).map_to_mm(HDAccountBalanceRpcError::Internal)?;
let task = InitAccountBalanceTask { coin, req };
let task_id = AccountBalanceTaskManager::spawn_rpc_task(&coins_ctx.account_balance_task_manager, &spawner, task)?;
let task_id =
AccountBalanceTaskManager::spawn_rpc_task(&coins_ctx.account_balance_task_manager, &spawner, task, client_id)?;
Ok(InitRpcTaskResponse { task_id })
}

Expand Down
13 changes: 6 additions & 7 deletions mm2src/coins/rpc_command/init_create_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use mm2_err_handle::prelude::*;
use parking_lot::Mutex as PaMutex;
use rpc_task::rpc_common::{CancelRpcTaskError, CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusError,
RpcTaskStatusRequest, RpcTaskUserActionError};
use rpc_task::{RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus,
RpcTaskTypes};
use rpc_task::{RpcInitReq, RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared,
RpcTaskStatus, RpcTaskTypes};
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -164,7 +164,6 @@ pub struct CreateNewAccountRequest {
coin: String,
#[serde(flatten)]
params: CreateNewAccountParams,
client_id: Option<u64>,
}

#[derive(Clone, Deserialize)]
Expand Down Expand Up @@ -239,8 +238,6 @@ impl RpcTaskTypes for InitCreateAccountTask {
impl RpcTask for InitCreateAccountTask {
fn initial_status(&self) -> Self::InProgressStatus { CreateAccountInProgressStatus::Preparing }

fn client_id(&self) -> Option<u64> { self.req.client_id }

async fn cancel(self) {
if let Some(account_id) = self.task_state.create_account_id() {
// We created the account already, so need to revert the changes.
Expand Down Expand Up @@ -332,8 +329,9 @@ impl RpcTask for InitCreateAccountTask {

pub async fn init_create_new_account(
ctx: MmArc,
req: CreateNewAccountRequest,
req: RpcInitReq<CreateNewAccountRequest>,
) -> MmResult<InitRpcTaskResponse, CreateAccountRpcError> {
let (client_id, req) = (req.client_id, req.inner);
let coin = lp_coinfind_or_err(&ctx, &req.coin).await?;
let coins_ctx = CoinsContext::from_ctx(&ctx).map_to_mm(CreateAccountRpcError::Internal)?;
let spawner = coin.spawner();
Expand All @@ -343,7 +341,8 @@ pub async fn init_create_new_account(
req,
task_state: CreateAccountState::default(),
};
let task_id = CreateAccountTaskManager::spawn_rpc_task(&coins_ctx.create_account_manager, &spawner, task)?;
let task_id =
CreateAccountTaskManager::spawn_rpc_task(&coins_ctx.create_account_manager, &spawner, task, client_id)?;
Ok(InitRpcTaskResponse { task_id })
}

Expand Down
12 changes: 6 additions & 6 deletions mm2src/coins/rpc_command/init_scan_for_new_addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use rpc_task::rpc_common::{CancelRpcTaskError, CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusError,
RpcTaskStatusRequest};
use rpc_task::{RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, RpcTaskTypes};
use rpc_task::{RpcInitReq, RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus,
RpcTaskTypes};

pub type ScanAddressesUserAction = SerdeInfallible;
pub type ScanAddressesAwaitingStatus = SerdeInfallible;
Expand Down Expand Up @@ -43,7 +44,6 @@ pub struct ScanAddressesRequest {
coin: String,
#[serde(flatten)]
params: ScanAddressesParams,
client_id: Option<u64>,
}

#[derive(Clone, Deserialize)]
Expand Down Expand Up @@ -88,8 +88,6 @@ impl RpcTask for InitScanAddressesTask {
#[inline]
fn initial_status(&self) -> Self::InProgressStatus { ScanAddressesInProgressStatus::InProgress }

fn client_id(&self) -> Option<u64> { self.req.client_id }

// Do nothing if the task has been cancelled.
async fn cancel(self) {}

Expand All @@ -111,13 +109,15 @@ impl RpcTask for InitScanAddressesTask {

pub async fn init_scan_for_new_addresses(
ctx: MmArc,
req: ScanAddressesRequest,
req: RpcInitReq<ScanAddressesRequest>,
) -> MmResult<InitRpcTaskResponse, HDAccountBalanceRpcError> {
let (client_id, req) = (req.client_id, req.inner);
let coin = lp_coinfind_or_err(&ctx, &req.coin).await?;
let spawner = coin.spawner();
let coins_ctx = CoinsContext::from_ctx(&ctx).map_to_mm(HDAccountBalanceRpcError::Internal)?;
let task = InitScanAddressesTask { req, coin };
let task_id = ScanAddressesTaskManager::spawn_rpc_task(&coins_ctx.scan_addresses_manager, &spawner, task)?;
let task_id =
ScanAddressesTaskManager::spawn_rpc_task(&coins_ctx.scan_addresses_manager, &spawner, task, client_id)?;
Ok(InitRpcTaskResponse { task_id })
}

Expand Down
13 changes: 8 additions & 5 deletions mm2src/coins/rpc_command/init_withdraw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use rpc_task::rpc_common::{CancelRpcTaskError, CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusError,
RpcTaskStatusRequest, RpcTaskUserActionError};
use rpc_task::{RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatusAlias, RpcTaskTypes};
use rpc_task::{RpcInitReq, RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatusAlias,
RpcTaskTypes};

pub type WithdrawAwaitingStatus = HwRpcTaskAwaitingStatus;
pub type WithdrawUserAction = HwRpcTaskUserAction;
Expand All @@ -32,7 +33,11 @@ pub trait CoinWithdrawInit {
) -> WithdrawInitResult<TransactionDetails>;
}

pub async fn init_withdraw(ctx: MmArc, request: WithdrawRequest) -> WithdrawInitResult<InitWithdrawResponse> {
pub async fn init_withdraw(
ctx: MmArc,
request: RpcInitReq<WithdrawRequest>,
) -> WithdrawInitResult<InitWithdrawResponse> {
let (client_id, request) = (request.client_id, request.inner);
let coin = lp_coinfind_or_err(&ctx, &request.coin).await?;
let spawner = coin.spawner();
let task = WithdrawTask {
Expand All @@ -41,7 +46,7 @@ pub async fn init_withdraw(ctx: MmArc, request: WithdrawRequest) -> WithdrawInit
request,
};
let coins_ctx = CoinsContext::from_ctx(&ctx).map_to_mm(WithdrawError::InternalError)?;
let task_id = WithdrawTaskManager::spawn_rpc_task(&coins_ctx.withdraw_task_manager, &spawner, task)?;
let task_id = WithdrawTaskManager::spawn_rpc_task(&coins_ctx.withdraw_task_manager, &spawner, task, client_id)?;
Ok(InitWithdrawResponse { task_id })
}

Expand Down Expand Up @@ -123,8 +128,6 @@ impl RpcTaskTypes for WithdrawTask {
impl RpcTask for WithdrawTask {
fn initial_status(&self) -> Self::InProgressStatus { WithdrawInProgressStatus::Preparing }

fn client_id(&self) -> Option<u64> { self.request.client_id }

// Do nothing if the task has been cancelled.
async fn cancel(self) {}

Expand Down
12 changes: 5 additions & 7 deletions mm2src/coins_activation/src/init_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use mm2_err_handle::mm_error::{MmError, MmResult, NotEqual, NotMmError};
use mm2_err_handle::prelude::*;
use rpc_task::rpc_common::{CancelRpcTaskError, CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusError,
RpcTaskStatusRequest, RpcTaskUserActionError, RpcTaskUserActionRequest};
use rpc_task::{RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus,
RpcTaskTypes, TaskId};
use rpc_task::{RpcInitReq, RpcTask, RpcTaskError, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared,
RpcTaskStatus, RpcTaskTypes, TaskId};
use ser_error_derive::SerializeErrorType;
use serde_derive::{Deserialize, Serialize};
use std::time::Duration;
Expand All @@ -38,7 +38,6 @@ pub type CancelInitTokenError = CancelRpcTaskError;
pub struct InitTokenReq<T> {
ticker: String,
activation_params: T,
client_id: Option<u64>,
}

/// Trait for the initializing a token using the task manager.
Expand Down Expand Up @@ -83,14 +82,15 @@ pub trait InitTokenActivationOps: Into<MmCoinEnum> + TokenOf + Clone + Send + Sy
/// Implementation of the init token RPC command.
pub async fn init_token<Token>(
ctx: MmArc,
request: InitTokenReq<Token::ActivationRequest>,
request: RpcInitReq<InitTokenReq<Token::ActivationRequest>>,
) -> MmResult<InitTokenResponse, InitTokenError>
where
Token: InitTokenActivationOps + Send + Sync + 'static,
Token::InProgressStatus: InitTokenInitialStatus,
InitTokenError: From<Token::ActivationError>,
(Token::ActivationError, InitTokenError): NotEqual,
{
let (client_id, request) = (request.client_id, request.inner);
if let Ok(Some(_)) = lp_coinfind(&ctx, &request.ticker).await {
return MmError::err(InitTokenError::TokenIsAlreadyActivated { ticker: request.ticker });
}
Expand All @@ -117,7 +117,7 @@ where
};
let task_manager = Token::rpc_task_manager(&coins_act_ctx);

let task_id = RpcTaskManager::spawn_rpc_task(task_manager, &spawner, task)
let task_id = RpcTaskManager::spawn_rpc_task(task_manager, &spawner, task, client_id)
.mm_err(|e| InitTokenError::Internal(e.to_string()))?;

Ok(InitTokenResponse { task_id })
Expand Down Expand Up @@ -196,8 +196,6 @@ where
<Token::InProgressStatus as InitTokenInitialStatus>::initial_status()
}

fn client_id(&self) -> Option<u64> { self.request.client_id }

/// Try to disable the coin in case if we managed to register it already.
async fn cancel(self) {
if let Ok(c_ctx) = CoinsContext::from_ctx(&self.ctx) {
Expand Down
14 changes: 5 additions & 9 deletions mm2src/coins_activation/src/l2/init_l2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use common::SuccessResponse;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use rpc_task::rpc_common::{CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusRequest, RpcTaskUserActionRequest};
use rpc_task::{RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus, RpcTaskTypes};
use rpc_task::{RpcInitReq, RpcTask, RpcTaskHandleShared, RpcTaskManager, RpcTaskManagerShared, RpcTaskStatus,
RpcTaskTypes};
use serde_derive::Deserialize;
use serde_json::Value as Json;

Expand All @@ -24,7 +25,6 @@ pub type InitL2TaskHandleShared<L2> = RpcTaskHandleShared<InitL2Task<L2>>;
pub struct InitL2Req<T> {
ticker: String,
activation_params: T,
client_id: Option<u64>,
}

pub trait L2ProtocolParams {
Expand Down Expand Up @@ -68,13 +68,14 @@ pub trait InitL2ActivationOps: Into<MmCoinEnum> + Send + Sync + 'static {

pub async fn init_l2<L2>(
ctx: MmArc,
req: InitL2Req<L2::ActivationParams>,
req: RpcInitReq<InitL2Req<L2::ActivationParams>>,
) -> Result<InitL2Response, MmError<InitL2Error>>
where
L2: InitL2ActivationOps,
InitL2Error: From<L2::ActivationError>,
(L2::ActivationError, InitL2Error): NotEqual,
{
let (client_id, req) = (req.client_id, req.inner);
let ticker = req.ticker.clone();
if let Ok(Some(_)) = lp_coinfind(&ctx, &ticker).await {
return MmError::err(InitL2Error::L2IsAlreadyActivated(ticker));
Expand All @@ -99,19 +100,17 @@ where

let coins_act_ctx = CoinsActivationContext::from_ctx(&ctx).map_to_mm(InitL2Error::Internal)?;
let spawner = ctx.spawner();
let client_id = req.client_id;
let task = InitL2Task::<L2> {
ctx,
ticker,
client_id,
platform_coin,
validated_params,
protocol_conf,
coin_conf,
};
let task_manager = L2::rpc_task_manager(&coins_act_ctx);

let task_id = RpcTaskManager::spawn_rpc_task(task_manager, &spawner, task)
let task_id = RpcTaskManager::spawn_rpc_task(task_manager, &spawner, task, client_id)
.mm_err(|e| InitL2Error::Internal(e.to_string()))?;

Ok(InitL2Response { task_id })
Expand Down Expand Up @@ -164,7 +163,6 @@ pub async fn cancel_init_l2<L2: InitL2ActivationOps>(
pub struct InitL2Task<L2: InitL2ActivationOps> {
ctx: MmArc,
ticker: String,
client_id: Option<u64>,
platform_coin: L2::PlatformCoin,
validated_params: L2::ValidatedParams,
protocol_conf: L2::ProtocolInfo,
Expand All @@ -188,8 +186,6 @@ where
<L2::InProgressStatus as InitL2InitialStatus>::initial_status()
}

fn client_id(&self) -> Option<u64> { self.client_id }

/// Try to disable the coin in case if we managed to register it already.
async fn cancel(self) {
if let Ok(ctx) = CoinsContext::from_ctx(&self.ctx) {
Expand Down
Loading

0 comments on commit 91d98ed

Please sign in to comment.