Skip to content

Commit

Permalink
feat: multi-backend async support (#66)
Browse files Browse the repository at this point in the history
* feat: multi-backend async support

* chore: lint

* feat: add identity example

* feat: fix size of map

* feat: remove unused code

* feat: add spawn local fn

* use: spawn_local

* chore: add new version

* feat: add no_std async mutex

* feat: fix max buffer  to sender channel

---------

Co-authored-by: b-avb <[email protected]>
  • Loading branch information
S0c5 and b-avb authored Jul 30, 2024
1 parent f4d963a commit a53acea
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 53 deletions.
58 changes: 45 additions & 13 deletions sube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,66 @@ repository = "https://github.com/valibre-org/virto-dk/sube"
async-once-cell = "0.4.4"
blake2 = { version = "0.10.5", default-features = false }
codec = { version = "3.1.2", package = "parity-scale-codec", default-features = false }
frame-metadata = { version = "16.0.0", default-features = false, features = ["serde_full", "decode"] }
frame-metadata = { version = "16.0.0", default-features = false, features = [
"serde_full",
"decode",
] }
hex = { version = "0.4.3", default-features = false, features = ["alloc"] }
jsonrpc = { version = "0.12.1", default-features = false, optional = true }
log = "0.4.17"
scale-info = { version = "2.1.1", default-features = false, optional = true }
scales = { path="../scales", package = "scale-serialization", default-features = false, features = ["codec", "experimental-serializer", "json", "std"] }
scales = { path = "../scales", package = "scale-serialization", default-features = false, features = [
"codec",
"experimental-serializer",
"json",
"std",
] }
serde = { version = "1.0.137", default-features = false }
# TODO: shouldn't be a base dependeny. remove after: https://github.com/virto-network/virto-sdk/issues/53
serde_json = { version = "1.0.80", default-features = false, features = ["alloc", "arbitrary_precision"] }
serde_json = { version = "1.0.80", default-features = false, features = [
"alloc",
"arbitrary_precision",
] }
twox-hash = { version = "1.6.2", default-features = false }
url = "2.5.0"

# http backend
reqwest = { version = "0.12.5", optional = true, features = ["json"]}

# ws backend
futures-channel = { version = "0.3.21", default-features = false, features = ["alloc"], optional = true }
futures-util = { version = "0.3.21", default-features = false, features = ["sink"], optional = true }
async-mutex = { version = "1.4.0", optional = true }
futures-channel = { version = "0.3.21", default-features = false, features = [
"alloc",
], optional = true }
futures-util = { version = "0.3.21", default-features = false, features = [
"sink",
], optional = true }

async-tls = { version = "0.11.0", default-features = false, optional = true }

# bin target
async-std = { version = "1.11.0", optional = true }
paste = { version = "1.0" }
wasm-bindgen = { version = "0.2.91", optional = true }
wasm-bindgen = { version = "0.2.92", optional = true }
once_cell = { version = "1.17.1", optional = true }
heapless = { version = "0.7.16", optional = true }
heapless = "0.8.0"
anyhow = { version = "1.0.40", optional = true }
rand_core = {version = "0.6.3", optional = true }
rand_core = { version = "0.6.3", optional = true }
ewebsock = { git = "https://github.com/S0c5/ewebsock.git", optional = true, branch = "enhacement/aviod-blocking-operations-with-mpsc-futures" }
env_logger = "0.11.3"
no-std-async = "1.1.2"


[dev-dependencies]
async-std = { version = "1.11.0", features = ["attributes", "tokio1"] }
hex-literal = "0.3.4"
libwallet = { path = "../libwallet", default-features=false, features=["substrate", "mnemonic", "sr25519", "util_pin", "rand", "std" ] }
libwallet = { path = "../libwallet", default-features = false, features = [
"substrate",
"mnemonic",
"sr25519",
"util_pin",
"rand",
"std",
] }
rand_core = "0.6.3"

[features]
Expand All @@ -58,11 +82,19 @@ json = ["scales/json"]
std = []
no_std = []


v14 = ["dep:scale-info", "frame-metadata/current"]
ws = ["dep:async-mutex", "dep:async-std", "dep:ewebsock", "dep:futures-channel", "dep:futures-util", "dep:jsonrpc", "async-std/unstable"]
ws = [
"dep:async-std",
"dep:ewebsock",
"dep:futures-channel",
"dep:futures-util",
"dep:jsonrpc",
"async-std/unstable",
]
wss = ["dep:async-tls", "ws", "ewebsock/tls", "async-std/unstable"]
examples = ["dep:rand_core"]
js = ["http-web", "json", "v14", 'async-std/unstable', "ws", "dep:rand_core"]
js = ["http-web", "json", "v14", 'async-std/unstable', "wss", "dep:rand_core"]

[package.metadata.docs.rs]
features = ["http"]
Expand All @@ -71,4 +103,4 @@ features = ["http"]
members = [
"sube-js",
"cli"
]
]
32 changes: 32 additions & 0 deletions sube/examples/query_identity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use core::future::{Future, IntoFuture};
use serde::{Deserialize, Serialize};
use serde_json::{from_value, Value};
use sube::{sube, Response};

#[async_std::main]
async fn main() -> sube::Result<()> {
env_logger::init();

let result = sube!("ws://localhost:11004/identity/superOf/0x6d6f646c6b762f636d7479738501000000000000000000000000000000000000").await?;

if let Response::Value(value) = result {
let data = serde_json::to_value(&value).expect("to be serializable");
println!(
"Account info: {}",
serde_json::to_string_pretty(&data).expect("it must return an str")
);
}

let query = format!("ws://localhost:11004/identity/identityOf/0xbe6ed76ac48d5c7f1c5d2cab8a1d1e7a451dcc24b624b088ef554fd47ba21139");

let r = sube!(&query).await?;

if let Response::Value(ref v) = r {
let json_value = serde_json::to_value(v).expect("to be serializable");
println!("json: {:?}", json_value);
let x = serde_json::to_string_pretty(&json_value).expect("it must return an str");
println!("Account info: {:?}", x);
}

Ok(())
}
99 changes: 72 additions & 27 deletions sube/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,9 @@ impl<'a> SubeBuilder<'a, (), ()> {
let url = chain_string_to_url(url.ok_or(Error::BadInput)?)?;
let path = url.path();

log::info!("building the backend for {}", url);
log::trace!("building the backend for {}", url);

let backend = BACKEND
.get_or_try_init(get_backend_by_url(url.clone()))
.await?;

let meta = META
.get_or_try_init(async {
match metadata {
Some(m) => Ok(m),
None => backend.metadata().await.map_err(|_| Error::BadMetadata),
}
})
.await?;
let (backend, meta) = get_multi_backend_by_url(url.clone(), metadata).await?;

Ok(match path {
"_meta" => Response::Meta(meta),
Expand Down Expand Up @@ -125,18 +114,7 @@ where
let path = url.path();
let body = body.ok_or(Error::BadInput)?;

let backend = BACKEND
.get_or_try_init(get_backend_by_url(url.clone()))
.await?;

let meta = META
.get_or_try_init(async {
match metadata {
Some(m) => Ok(m),
None => backend.metadata().await.map_err(|_| Error::BadMetadata),
}
})
.await?;
let (backend, meta) = get_multi_backend_by_url(url.clone(), metadata).await?;

Ok(match path {
"_meta" => Response::Meta(meta),
Expand All @@ -150,8 +128,75 @@ where
}
}

static BACKEND: async_once_cell::OnceCell<AnyBackend> = async_once_cell::OnceCell::new();
static META: async_once_cell::OnceCell<Metadata> = async_once_cell::OnceCell::new();
use heapless::FnvIndexMap as Map;
use no_std_async::Mutex;

static INSTANCE_BACKEND: async_once_cell::OnceCell<
Mutex<Map<String, Mutex<&'static AnyBackend>, 16>>,
> = async_once_cell::OnceCell::new();

static INSTANCE_METADATA: async_once_cell::OnceCell<
Mutex<Map<String, Mutex<&'static Metadata>, 16>>,
> = async_once_cell::OnceCell::new();

async fn get_metadata(backend: &AnyBackend, metadata: Option<Metadata>) -> SubeResult<Metadata> {
match metadata {
Some(m) => Ok(m),
None => backend.metadata().await.map_err(|_| Error::BadMetadata),
}
}

async fn get_multi_backend_by_url<'a>(
url: Url,
metadata: Option<Metadata>,
) -> SubeResult<(&'a AnyBackend, &'a Metadata)> {
let mut instance_backend = INSTANCE_BACKEND
.get_or_init(async { Mutex::new(Map::new()) })
.await
.lock()
.await;

let mut instance_metadata = INSTANCE_METADATA
.get_or_init(async { Mutex::new(Map::new()) })
.await
.lock()
.await;

let base_path = format!(
"{}://{}:{}",
url.scheme(),
url.host_str().expect("url to have a host"),
url.port().unwrap_or(80)
);

let cached_b = instance_backend.get(&base_path);
let cached_m = instance_metadata.get(&base_path);

match (cached_b, cached_m) {
(Some(b), Some(m)) => {
let b = *b.lock().await;
let m = *m.lock().await;
Ok((b, m))
}
_ => {
let backend = Box::new(get_backend_by_url(url.clone()).await?);
let backend = Box::leak::<'static>(backend);

instance_backend
.insert(base_path.clone(), Mutex::new(backend))
.map_err(|_| Error::CantInitBackend)?;

let metadata = Box::new(get_metadata(backend, metadata).await?);
let metadata = Box::leak::<'static>(metadata);

instance_metadata
.insert(base_path.clone(), Mutex::new(metadata))
.map_err(|_| Error::BadMetadata)?;

Ok((backend, metadata))
}
}
}

pub type BoxFuture<'a, T> = core::pin::Pin<Box<dyn Future<Output = T> + 'a>>;

Expand Down
8 changes: 6 additions & 2 deletions sube/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ mod prelude {
}

/// Surf based backend
#[cfg(any(feature = "http", feature = "http-web", feature = "js"))]
#[cfg(any(feature = "http", feature = "http-web"))]
pub mod http;
/// Tungstenite based backend
#[cfg(feature = "ws")]
Expand All @@ -58,7 +58,7 @@ mod hasher;
mod meta_ext;
mod signer;

#[cfg(any(feature = "http", feature = "http-web", feature = "ws", feature = "js"))]
#[cfg(any(feature = "http", feature = "http-web", feature = "ws"))]
pub mod rpc;
pub mod util;

Expand Down Expand Up @@ -488,6 +488,10 @@ pub enum Error {
AccountNotFound,
ConstantNotFound(String),
Platform(String),
CantInitBackend,
CantDecodeReponseForMeta,
CantDecodeRawQueryResponse,
CantFindMethodInPallet,
}

impl fmt::Display for Error {
Expand Down
2 changes: 1 addition & 1 deletion sube/src/meta_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl StorageKey {
.storage
.as_ref()
.and_then(|s| s.entries.iter().find(|e| e.name == item))
.ok_or(crate::Error::StorageKeyNotFound)?;
.ok_or(crate::Error::CantFindMethodInPallet)?;
log::trace!("map_keys={}", map_keys.iter().map(|x| x.as_ref()).collect::<Vec<&str>>().join(", "));
entry.ty.key(registry, &meta.name, &entry.name, map_keys)
}
Expand Down
11 changes: 6 additions & 5 deletions sube/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<R: Rpc> Backend for RpcClient<R> {
)
.await
.map_err(|err| {
log::info!("error {:?}", err);
log::error!("error state_queryStorageAt {:?}", err);
crate::Error::StorageKeyNotFound
})
}
Expand All @@ -75,7 +75,7 @@ impl<R: Rpc> Backend for RpcClient<R> {
)
.await
.map_err(|err| {
log::info!("error {:?}", err);
log::error!("error paged {:?}", err);
crate::Error::StorageKeyNotFound
})?;
log::info!("rpc call {:?}", r);
Expand All @@ -91,12 +91,12 @@ impl<R: Rpc> Backend for RpcClient<R> {
.rpc("state_getStorage", &[&format!("\"{}\"", &key)])
.await
.map_err(|e| {
log::debug!("RPC failure: {}", e);
log::error!("RPC failure: {}", e);
// NOTE it could fail for more reasons
crate::Error::StorageKeyNotFound
})?;

let response = hex::decode(&res[2..]).map_err(|_err| crate::Error::StorageKeyNotFound)?;
let response = hex::decode(&res[2..]).map_err(|_err| crate::Error::CantDecodeRawQueryResponse)?;

Ok(response)
}
Expand All @@ -109,6 +109,7 @@ impl<R: Rpc> Backend for RpcClient<R> {
.rpc::<serde_json::Value>("author_submitExtrinsic", &[&format!("\"{}\"", &extrinsic)])
.await
.map_err(|e| crate::Error::Node(e.to_string()))?;

Ok(())
}

Expand All @@ -118,7 +119,7 @@ impl<R: Rpc> Backend for RpcClient<R> {
.rpc("state_getMetadata", &[])
.await
.map_err(|e| crate::Error::Node(e.to_string()))?;
let response = hex::decode(&res[2..]).map_err(|_err| crate::Error::StorageKeyNotFound)?;
let response = hex::decode(&res[2..]).map_err(|_err| crate::Error::CantDecodeReponseForMeta)?;
let meta = from_bytes(&mut response.as_slice()).map_err(|_| crate::Error::BadMetadata)?;
log::trace!("Metadata {:#?}", meta);
Ok(meta)
Expand Down
18 changes: 13 additions & 5 deletions sube/src/ws.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloc::{collections::BTreeMap, sync::Arc};

use async_mutex::Mutex;
use no_std_async::Mutex;
use ewebsock::{WsEvent, WsMessage as Message, WsReceiver as Rx, WsSender as Tx};
use futures_channel::{mpsc, oneshot};
use futures_util::StreamExt as _;
Expand All @@ -22,6 +22,8 @@ use crate::{
Error,
};

const MAX_BUFFER: usize = usize::MAX >> 3;

type Id = u32;

pub struct Backend {
Expand Down Expand Up @@ -60,13 +62,19 @@ impl Rpc for Backend {
.lock()
.await
.try_send(Message::Text(msg))
.map_err(|_| standard_error(StandardError::InternalError, None))?;
.map_err(|err| {
log::error!("Error tx lock message: {:?}", err);
standard_error(StandardError::InternalError, None)
})?;

log::info!("sent CMD");
// wait for the matching response to arrive
let res = recv
.await
.map_err(|_| standard_error(StandardError::InternalError, None))?
.map_err(|err| {
log::error!("Error receiving message: {:?}", err);
standard_error(StandardError::InternalError, None)
})?
.result()?;

Ok(res)
Expand All @@ -84,8 +92,8 @@ impl Backend {

let (tx, rx) =
ewebsock::connect(url, ewebsock::Options::default()).map_err(Error::Platform)?;

let (sender, recv) = mpsc::channel::<Message>(0);
let (sender, recv) = mpsc::channel::<Message>(MAX_BUFFER);

let backend = Backend {
tx: Mutex::new(sender),
Expand Down

0 comments on commit a53acea

Please sign in to comment.