Skip to content

Commit

Permalink
[feature] #3900: fetch_size query parameter
Browse files Browse the repository at this point in the history
Signed-off-by: Daniil Polyakov <[email protected]>
  • Loading branch information
Arjentix committed Nov 2, 2023
1 parent 4fccf3a commit 63f80a7
Show file tree
Hide file tree
Showing 23 changed files with 261 additions and 106 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ iroha_genesis = { workspace = true }
iroha_wasm_builder = { workspace = true }


derive_more = { workspace = true }
async-trait = { workspace = true }
color-eyre = { workspace = true }
eyre = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion cli/src/torii/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ impl Error {
QueryFailed(query_error)
| InstructionFailed(InstructionExecutionError::Query(query_error)) => match query_error
{
Evaluate(_) | Conversion(_) | UnknownCursor => StatusCode::BAD_REQUEST,
Evaluate(_) | Conversion(_) | UnknownCursor | FetchSizeTooBig => {
StatusCode::BAD_REQUEST
}
Signature(_) => StatusCode::UNAUTHORIZED,
Find(_) => StatusCode::NOT_FOUND,
},
Expand Down
24 changes: 12 additions & 12 deletions cli/src/torii/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
// FIXME: This can't be fixed, because one trait in `warp` is private.
#![allow(opaque_hidden_inferred_bound)]

use std::num::NonZeroUsize;

use eyre::{eyre, WrapErr};
use futures::TryStreamExt;
use iroha_config::{
Expand Down Expand Up @@ -45,11 +43,13 @@ fn client_query_request(
body::versioned::<SignedQuery>()
.and(sorting())
.and(paginate())
.and_then(|signed_query, sorting, pagination| async move {
.and(fetch_size())
.and_then(|signed_query, sorting, pagination, fetch_size| async move {
Result::<_, std::convert::Infallible>::Ok(http::ClientQueryRequest::query(
signed_query,
sorting,
pagination,
fetch_size,
))
})
.or(cursor().and_then(|cursor| async move {
Expand All @@ -73,6 +73,11 @@ fn paginate() -> impl warp::Filter<Extract = (Pagination,), Error = warp::Reject
warp::query()
}

/// Filter for warp which extracts fetch size
fn fetch_size() -> impl warp::Filter<Extract = (FetchSize,), Error = warp::Rejection> + Copy {
warp::query()
}

#[iroha_futures::telemetry_future]
async fn handle_instructions(
queue: Arc<Queue>,
Expand Down Expand Up @@ -101,7 +106,6 @@ async fn handle_instructions(
async fn handle_queries(
live_query_store: LiveQueryStoreHandle,
sumeragi: SumeragiHandle,
fetch_size: NonZeroUsize,

query_request: http::ClientQueryRequest,
) -> Result<Scale<BatchedResponse<Value>>> {
Expand All @@ -110,11 +114,12 @@ async fn handle_queries(
query: signed_query,
sorting,
pagination,
fetch_size,
}) => sumeragi.apply_wsv(|wsv| {
let valid_query = ValidQueryRequest::validate(signed_query, wsv)?;
let query_output = valid_query.execute(wsv)?;
live_query_store
.handle_query_output(query_output, fetch_size, &sorting, pagination)
.handle_query_output(query_output, &sorting, pagination, fetch_size)
.map_err(ValidationFail::from)
}),
QueryRequest::Cursor(cursor) => live_query_store
Expand Down Expand Up @@ -477,15 +482,10 @@ impl Torii {
))
.and(body::versioned()),
)
.or(endpoint4(
.or(endpoint3(
handle_queries,
warp::path(uri::QUERY)
.and(add_state!(
self.query_service,
self.sumeragi,
NonZeroUsize::try_from(self.iroha_cfg.torii.fetch_size)
.expect("u64 should always fit into usize")
))
.and(add_state!(self.query_service, self.sumeragi,))
.and(client_query_request()),
))
.or(endpoint2(
Expand Down
48 changes: 42 additions & 6 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,15 @@ pub struct ResultSet<T> {
client_cursor: usize,
}

impl<T> ResultSet<T> {
/// Get the length of the batch returned by Iroha.
///
/// This is controlled by `fetch_size` parameter of the query.
pub fn batch_len(&self) -> usize {
self.iter.len()
}
}

impl<T: Clone> Iterator for ResultSet<T>
where
Vec<T>: QueryOutput,
Expand Down Expand Up @@ -376,6 +385,7 @@ impl QueryRequest {
query: Vec::default(),
sorting: Sorting::default(),
pagination: Pagination::default(),
fetch_size: FetchSize::default(),
},
),
}
Expand All @@ -391,6 +401,7 @@ impl QueryRequest {
iroha_data_model::query::QueryRequest::Query(query_with_params) => builder
.params(Vec::from(query_with_params.sorting().clone()))
.params(Vec::from(*query_with_params.pagination()))
.params(Vec::from(*query_with_params.fetch_size()))
.body(query_with_params.query().clone()),
iroha_data_model::query::QueryRequest::Cursor(cursor) => {
builder.params(Vec::from(cursor))
Expand Down Expand Up @@ -800,6 +811,7 @@ impl Client {
filter: PredicateBox,
pagination: Pagination,
sorting: Sorting,
fetch_size: FetchSize,
) -> Result<(DefaultRequestBuilder, QueryResponseHandler<R::Output>)>
where
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
Expand All @@ -811,7 +823,9 @@ impl Client {
torii_url: self.torii_url.clone(),
headers: self.headers.clone(),
request: iroha_data_model::query::QueryRequest::Query(
iroha_data_model::query::QueryWithParameters::new(request, sorting, pagination),
iroha_data_model::query::QueryWithParameters::new(
request, sorting, pagination, fetch_size,
),
),
};

Expand All @@ -829,6 +843,7 @@ impl Client {
&self,
request: R,
pagination: Pagination,
fetch_size: FetchSize,
sorting: Sorting,
filter: PredicateBox,
) -> QueryResult<<R::Output as QueryOutput>::Target>
Expand All @@ -838,7 +853,7 @@ impl Client {
{
iroha_logger::trace!(?request, %pagination, ?sorting, ?filter);
let (req, mut resp_handler) =
self.prepare_query_request::<R>(request, filter, pagination, sorting)?;
self.prepare_query_request::<R>(request, filter, pagination, sorting, fetch_size)?;

let response = req.build()?.send()?;
let value = resp_handler.handle(&response)?;
Expand All @@ -855,6 +870,7 @@ impl Client {
&self,
request: R,
pagination: Pagination,
fetch_size: FetchSize,
sorting: Sorting,
) -> QueryResult<<R::Output as QueryOutput>::Target>
where
Expand All @@ -864,6 +880,7 @@ impl Client {
self.request_with_filter_and_pagination_and_sorting(
request,
pagination,
fetch_size,
sorting,
PredicateBox::default(),
)
Expand All @@ -877,6 +894,7 @@ impl Client {
&self,
request: R,
pagination: Pagination,
fetch_size: FetchSize,
filter: PredicateBox,
) -> QueryResult<<R::Output as QueryOutput>::Target>
where
Expand All @@ -886,6 +904,7 @@ impl Client {
self.request_with_filter_and_pagination_and_sorting(
request,
pagination,
fetch_size,
Sorting::default(),
filter,
)
Expand All @@ -908,6 +927,7 @@ impl Client {
self.request_with_filter_and_pagination_and_sorting(
request,
Pagination::default(),
FetchSize::default(),
sorting,
filter,
)
Expand All @@ -929,7 +949,12 @@ impl Client {
R::Output: QueryOutput,
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
{
self.request_with_filter_and_pagination(request, Pagination::default(), filter)
self.request_with_filter_and_pagination(
request,
Pagination::default(),
FetchSize::default(),
filter,
)
}

/// Query API entry point. Requests queries from `Iroha` peers with pagination.
Expand All @@ -943,12 +968,18 @@ impl Client {
&self,
request: R,
pagination: Pagination,
fetch_size: FetchSize,
) -> QueryResult<<R::Output as QueryOutput>::Target>
where
R::Output: QueryOutput,
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
{
self.request_with_filter_and_pagination(request, pagination, PredicateBox::default())
self.request_with_filter_and_pagination(
request,
pagination,
fetch_size,
PredicateBox::default(),
)
}

/// Query API entry point. Requests queries from `Iroha` peers with sorting.
Expand All @@ -964,7 +995,12 @@ impl Client {
R::Output: QueryOutput,
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
{
self.request_with_pagination_and_sorting(request, Pagination::default(), sorting)
self.request_with_pagination_and_sorting(
request,
Pagination::default(),
FetchSize::default(),
sorting,
)
}

/// Query API entry point. Requests queries from `Iroha` peers.
Expand All @@ -979,7 +1015,7 @@ impl Client {
R::Output: QueryOutput,
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
{
self.request_with_pagination(request, Pagination::default())
self.request_with_pagination(request, Pagination::default(), FetchSize::default())
}

/// Connect (through `WebSocket`) to listen for `Iroha` `pipeline` and `data` events.
Expand Down
45 changes: 35 additions & 10 deletions client/tests/integration/pagination.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
use std::num::{NonZeroU32, NonZeroU64};

use eyre::Result;
use iroha_client::client::{asset, QueryResult};
use iroha_client::client::{asset, Client, QueryResult};
use iroha_data_model::{asset::AssetDefinition, prelude::*, query::Pagination};
use test_network::*;

#[test]
fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() -> Result<()> {
fn limits_should_work() -> Result<()> {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(10_690).start_with_runtime();
wait_for_genesis_committed(&vec![client.clone()], 0);

let register: Vec<InstructionExpr> = ('a'..='z') // This is a subtle mistake, I'm glad we can lint it now.
.map(|c| c.to_string())
.map(|name| (name + "#wonderland").parse().expect("Valid"))
.map(|asset_definition_id| {
RegisterExpr::new(AssetDefinition::quantity(asset_definition_id)).into()
})
.collect();
client.submit_all_blocking(register)?;
register_assets(&client)?;

let vec = client
.request_with_pagination(
Expand All @@ -26,8 +19,40 @@ fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() ->
limit: NonZeroU32::new(5),
start: NonZeroU64::new(5),
},
FetchSize::default(),
)?
.collect::<QueryResult<Vec<_>>>()?;
assert_eq!(vec.len(), 5);
Ok(())
}

#[test]
fn fetch_size_should_work() -> Result<()> {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(11_120).start_with_runtime();
wait_for_genesis_committed(&vec![client.clone()], 0);

register_assets(&client)?;

let iter = client.request_with_pagination(
asset::all_definitions(),
Pagination {
limit: NonZeroU32::new(20),
start: NonZeroU64::new(0),
},
FetchSize::new(Some(NonZeroU32::new(12).expect("Valid"))),
)?;
assert_eq!(iter.batch_len(), 12);
Ok(())
}

fn register_assets(client: &Client) -> Result<()> {
let register: Vec<InstructionExpr> = ('a'..='z')
.map(|c| c.to_string())
.map(|name| (name + "#wonderland").parse().expect("Valid"))
.map(|asset_definition_id| {
RegisterExpr::new(AssetDefinition::quantity(asset_definition_id)).into()
})
.collect();
let _ = client.submit_all_blocking(register)?;
Ok(())
}
2 changes: 1 addition & 1 deletion client/tests/integration/permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn genesis_transactions_are_validated() {
// Starting peer
let (_rt, _peer, test_client) = <PeerBuilder>::new()
.with_genesis(genesis)
.with_port(11_100)
.with_port(11_110)
.start_with_runtime();

// Checking that peer contains no blocks multiple times
Expand Down
28 changes: 28 additions & 0 deletions client/tests/integration/queries/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,31 @@
use iroha_client::client::{self, ClientQueryError};
use iroha_data_model::{
query::{error::QueryExecutionFail, FetchSize, Pagination, MAX_FETCH_SIZE},
ValidationFail,
};
use test_network::*;

mod account;
mod asset;
mod role;

#[test]
fn too_big_fetch_size_is_not_allowed() {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(11_130).start_with_runtime();
wait_for_genesis_committed(&[client.clone()], 0);

let err = client
.request_with_pagination(
client::asset::all(),
Pagination::default(),
FetchSize::new(Some(MAX_FETCH_SIZE.checked_add(1).unwrap())),
)
.expect_err("Should fail");

assert!(matches!(
err,
ClientQueryError::Validation(ValidationFail::QueryFailed(
QueryExecutionFail::FetchSizeTooBig
))
));
}
Loading

0 comments on commit 63f80a7

Please sign in to comment.