Skip to content

Commit

Permalink
[refactor] #3982: Clear live queries after smart contract end
Browse files Browse the repository at this point in the history
Signed-off-by: Daniil Polyakov <[email protected]>
  • Loading branch information
Arjentix committed Nov 16, 2023
1 parent b085fef commit 0ff9046
Show file tree
Hide file tree
Showing 22 changed files with 572 additions and 420 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ parity-scale-codec = { workspace = true, default-features = false, features = ["
tokio = { workspace = true, features = ["rt"] }
tokio-tungstenite = { workspace = true, features = ["native-tls"] }
futures-util = "0.3.28"
getset = { workspace = true }

[dev-dependencies]
iroha_wasm_builder = { workspace = true }
Expand Down
42 changes: 33 additions & 9 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use iroha_data_model::{
isi::Instruction,
predicate::PredicateBox,
prelude::*,
query::{Pagination, Query, Sorting},
query::{cursor::ForwardCursor, Pagination, Query, Sorting},
transaction::TransactionPayload,
BatchedResponse, ValidationFail,
};
Expand Down Expand Up @@ -346,20 +346,20 @@ impl_query_output! {
#[display(fmt = "{}@{torii_url}", "key_pair.public_key()")]
pub struct Client {
/// Url for accessing iroha node
torii_url: Url,
pub torii_url: Url,
/// Accounts keypair
key_pair: KeyPair,
pub key_pair: KeyPair,
/// Transaction time to live in milliseconds
transaction_ttl: Option<Duration>,
pub transaction_ttl: Option<Duration>,
/// Transaction status timeout
transaction_status_timeout: Duration,
pub transaction_status_timeout: Duration,
/// Current account
account_id: AccountId,
pub account_id: AccountId,
/// Http headers which will be appended to each request
headers: HashMap<String, String>,
pub headers: HashMap<String, String>,
/// If `true` add nonce, which makes different hashes for
/// transactions which occur repeatedly and/or simultaneously
add_transaction_nonce: bool,
pub add_transaction_nonce: bool,
}

/// Query request
Expand Down Expand Up @@ -388,6 +388,7 @@ impl QueryRequest {
),
}
}

fn assemble(self) -> DefaultRequestBuilder {
let builder = DefaultRequestBuilder::new(
HttpMethod::POST,
Expand Down Expand Up @@ -837,7 +838,7 @@ impl Client {
///
/// # Errors
/// Fails if sending request fails
pub fn request_with_filter_and_pagination_and_sorting<R: Query + Debug>(
pub(crate) fn request_with_filter_and_pagination_and_sorting<R: Query + Debug>(
&self,
request: R,
pagination: Pagination,
Expand Down Expand Up @@ -873,6 +874,29 @@ impl Client {
self.build_query(request).execute()
}

/// Query API entry point using cursor.
///
/// # Errors
/// Fails if sending request fails
pub fn request_with_cursor<O>(&self, cursor: ForwardCursor) -> QueryResult<O::Target>
where
O: QueryOutput,
<O as TryFrom<Value>>::Error: Into<eyre::Error>,
{
let request = QueryRequest {
torii_url: self.torii_url.clone(),
headers: self.headers.clone(),
request: iroha_data_model::query::QueryRequest::Cursor(cursor),
};
let response = request.clone().assemble().build()?.send()?;

let mut resp_handler = QueryResponseHandler::<O>::new(request);
let value = resp_handler.handle(&response)?;
let output = O::new(value, resp_handler);

Ok(output)
}

/// Query API entry point.
/// Creates a [`QueryRequestBuilder`] which can be used to configure requests queries from `Iroha` peers.
///
Expand Down
49 changes: 47 additions & 2 deletions client/tests/integration/queries/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::str::FromStr as _;

use eyre::{bail, Result};
use iroha_client::client::{self, ClientQueryError};
use iroha_data_model::{
query::{error::QueryExecutionFail, FetchSize, MAX_FETCH_SIZE},
ValidationFail,
prelude::*,
query::{cursor::ForwardCursor, error::QueryExecutionFail, MAX_FETCH_SIZE},
};
use test_network::*;

Expand All @@ -27,3 +30,45 @@ fn too_big_fetch_size_is_not_allowed() {
))
));
}

#[test]
fn live_query_is_dropped_after_smart_contract_end() -> Result<()> {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(11_140).start_with_runtime();
wait_for_genesis_committed(&[client.clone()], 0);

let wasm = iroha_wasm_builder::Builder::new(
"tests/integration/smartcontracts/query_assets_and_save_cursor",
)
.show_output()
.build()?
.optimize()?
.into_bytes()?;

let transaction = client.build_transaction(
WasmSmartContract::from_compiled(wasm),
UnlimitedMetadata::default(),
)?;
client.submit_transaction_blocking(&transaction)?;

let metadata_value = client.request(FindAccountKeyValueByIdAndKey::new(
client.account_id.clone(),
Name::from_str("cursor").unwrap(),
))?;
let Value::String(cursor) = metadata_value.0 else {
bail!("Expected `Value::String`, got {:?}", metadata_value.0);
};
let asset_cursor = serde_json::from_str::<ForwardCursor>(&cursor)?;

let err = client
.request_with_cursor::<Vec<Asset>>(asset_cursor)
.expect_err("Request with cursor from smart contract should fail");

assert!(matches!(
err,
ClientQueryError::Validation(ValidationFail::QueryFailed(
QueryExecutionFail::UnknownCursor
))
));

Ok(())
}
2 changes: 2 additions & 0 deletions client/tests/integration/smartcontracts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ members = [
"executor_with_admin",
"executor_with_custom_token",
"executor_with_migration_fail",
"query_assets_and_save_cursor",
]

[profile.dev]
Expand All @@ -27,6 +28,7 @@ opt-level = "z" # Optimize for size vs speed with "s"/"z"(removes vectorizat
codegen-units = 1 # Further reduces binary size but increases compilation time

[workspace.dependencies]
iroha_smart_contract = { version = "=2.0.0-pre-rc.20", path = "../../../../smart_contract", features = ["debug"]}
iroha_trigger = { version = "=2.0.0-pre-rc.20", path = "../../../../smart_contract/trigger", features = ["debug"]}
iroha_executor = { version = "=2.0.0-pre-rc.20", path = "../../../../smart_contract/executor" }
iroha_schema = { version = "=2.0.0-pre-rc.20", path = "../../../../schema" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl Executor {
}
}

// TODO (#4049): Fix unused `visit_register_domain()`
fn visit_register_domain(executor: &mut Executor, authority: &AccountId, _isi: Register<Domain>) {
if executor.block_height() == 0 {
pass!(executor)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "query_assets_and_save_cursor"

edition.workspace = true
version.workspace = true
authors.workspace = true

license.workspace = true

[lib]
crate-type = ['cdylib']

[dependencies]
iroha_smart_contract.workspace = true

panic-halt.workspace = true
lol_alloc.workspace = true
serde_json = { version = "1.0.108", default-features = false }
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//! Smart contract which executes [`FindAllAssets`] and saves cursor to the owner's metadata.
#![no_std]

#[cfg(not(test))]
extern crate panic_halt;

extern crate alloc;

use alloc::string::ToString as _;
use core::num::NonZeroU32;

use iroha_smart_contract::{parse, prelude::*};
use lol_alloc::{FreeListAllocator, LockedAllocator};

#[global_allocator]
static ALLOC: LockedAllocator<FreeListAllocator> = LockedAllocator::new(FreeListAllocator::new());

/// Execute [`FindAllAssets`] and save cursor to the owner's metadata.
#[iroha_smart_contract::main]
fn main(owner: AccountId) {
let asset_cursor = FindAllAssets
.fetch_size(FetchSize::new(Some(NonZeroU32::try_from(1).dbg_unwrap())))
.execute()
.dbg_unwrap();

let (_batch, cursor) = asset_cursor.into_raw_parts();

SetKeyValueExpr::new(
owner,
parse!("cursor" as Name),
serde_json::to_value(cursor)
.dbg_expect("Failed to convert cursor to JSON")
.to_string(),
)
.execute()
.dbg_expect("Failed to save cursor to the owner's metadata");
}
1 change: 1 addition & 0 deletions client/tests/integration/triggers/by_call_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ fn trigger_in_genesis_using_base64() -> Result<()> {
info!("Building trigger");
let wasm =
iroha_wasm_builder::Builder::new("tests/integration/smartcontracts/mint_rose_trigger")
.show_output()
.build()?
.optimize()?
.into_bytes()?;
Expand Down
1 change: 1 addition & 0 deletions client/tests/integration/triggers/time_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ fn mint_nft_for_every_user_every_1_sec() -> Result<()> {
let wasm = iroha_wasm_builder::Builder::new(
"tests/integration/smartcontracts/create_nft_for_every_user_trigger",
)
.show_output()
.build()?
.optimize()?
.into_bytes()?;
Expand Down
1 change: 1 addition & 0 deletions client/tests/integration/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ fn upgrade_executor(client: &Client, executor: impl AsRef<Path>) -> Result<()> {
info!("Building executor");

let wasm = iroha_wasm_builder::Builder::new(executor.as_ref())
.show_output()
.build()?
.optimize()?
.into_bytes()?;
Expand Down
Binary file modified configs/peer/executor.wasm
Binary file not shown.
28 changes: 20 additions & 8 deletions core/src/query/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use iroha_data_model::{
asset::AssetValue,
query::{
cursor::ForwardCursor, error::QueryExecutionFail, pagination::Pagination, sorting::Sorting,
FetchSize, DEFAULT_FETCH_SIZE, MAX_FETCH_SIZE,
FetchSize, QueryId, DEFAULT_FETCH_SIZE, MAX_FETCH_SIZE,
},
BatchedResponse, BatchedResponseV1, HasMetadata, IdentifiableBox, ValidationFail, Value,
};
Expand Down Expand Up @@ -67,7 +67,7 @@ type LiveQuery = Batched<Vec<Value>>;
/// Clients can handle their queries using [`LiveQueryStoreHandle`]
#[derive(Debug)]
pub struct LiveQueryStore {
queries: HashMap<String, (LiveQuery, Instant)>,
queries: HashMap<QueryId, (LiveQuery, Instant)>,
query_idle_time: Duration,
}

Expand Down Expand Up @@ -138,7 +138,7 @@ impl LiveQueryStore {
LiveQueryStoreHandle { message_sender }
}

fn insert(&mut self, query_id: String, live_query: LiveQuery) {
fn insert(&mut self, query_id: QueryId, live_query: LiveQuery) {
self.queries.insert(query_id, (live_query, Instant::now()));
}

Expand All @@ -148,8 +148,8 @@ impl LiveQueryStore {
}

enum Message {
Insert(String, Batched<Vec<Value>>),
Remove(String, oneshot::Sender<Option<Batched<Vec<Value>>>>),
Insert(QueryId, Batched<Vec<Value>>),
Remove(QueryId, oneshot::Sender<Option<Batched<Vec<Value>>>>),
}

/// Handle to interact with [`LiveQueryStore`].
Expand Down Expand Up @@ -207,13 +207,25 @@ impl LiveQueryStoreHandle {
self.construct_query_response(query_id, cursor.cursor.map(NonZeroU64::get), live_query)
}

fn insert(&self, query_id: String, live_query: LiveQuery) -> Result<()> {
/// Remove query from the storage if there is any.
///
/// Returns `true` if query was removed, `false` otherwise.
///
/// # Errors
///
/// - Returns [`Error::ConnectionClosed`] if [`QueryService`] is dropped,
/// - Otherwise throws up query output handling errors.
pub fn drop_query(&self, query_id: QueryId) -> Result<bool> {
self.remove(query_id).map(|query_opt| query_opt.is_some())
}

fn insert(&self, query_id: QueryId, live_query: LiveQuery) -> Result<()> {
self.message_sender
.blocking_send(Message::Insert(query_id, live_query))
.map_err(|_| Error::ConnectionClosed)
}

fn remove(&self, query_id: String) -> Result<Option<LiveQuery>> {
fn remove(&self, query_id: QueryId) -> Result<Option<LiveQuery>> {
let (sender, receiver) = oneshot::channel();

self.message_sender
Expand All @@ -225,7 +237,7 @@ impl LiveQueryStoreHandle {

fn construct_query_response(
&self,
query_id: String,
query_id: QueryId,
curr_cursor: Option<u64>,
mut live_query: Batched<Vec<Value>>,
) -> Result<BatchedResponse<Value>> {
Expand Down
Loading

0 comments on commit 0ff9046

Please sign in to comment.