Skip to content

Commit

Permalink
feat!: Sign all query parameters, implement query filters in wasm
Browse files Browse the repository at this point in the history
Signed-off-by: Nikita Strygin <[email protected]>
  • Loading branch information
DCNick3 committed May 16, 2024
1 parent a28e9c0 commit ce01b6f
Show file tree
Hide file tree
Showing 20 changed files with 520 additions and 250 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 15 additions & 21 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,11 @@ pub struct Client {
}

/// Query request
#[derive(Debug, Clone, serde::Serialize)]
#[derive(Debug, Clone)]
pub struct QueryRequest {
torii_url: Url,
headers: HashMap<String, String>,
request: crate::data_model::query::QueryRequest<Vec<u8>>,
request: crate::data_model::query::QueryRequest<SignedQuery>,
}

impl QueryRequest {
Expand All @@ -377,12 +377,8 @@ impl QueryRequest {
torii_url: format!("http://{torii_url}").parse().unwrap(),
headers: HashMap::new(),
request: crate::data_model::query::QueryRequest::Query(
crate::data_model::query::QueryWithParameters {
query: Vec::default(),
sorting: Sorting::default(),
pagination: Pagination::default(),
fetch_size: FetchSize::default(),
},
ClientQueryBuilder::new(FindAllAccounts, test_samples::ALICE_ID.clone())
.sign(&test_samples::ALICE_KEYPAIR),
),
}
}
Expand All @@ -395,11 +391,9 @@ impl QueryRequest {
.headers(self.headers);

match self.request {
crate::data_model::query::QueryRequest::Query(query_with_params) => builder
.params(query_with_params.sorting().clone().into_query_parameters())
.params(query_with_params.pagination().into_query_parameters())
.params(query_with_params.fetch_size().into_query_parameters())
.body(query_with_params.query().clone()),
crate::data_model::query::QueryRequest::Query(signed_query) => {
builder.body(signed_query.encode())
}
crate::data_model::query::QueryRequest::Cursor(cursor) => {
builder.params(Vec::from(cursor))
}
Expand Down Expand Up @@ -490,7 +484,7 @@ impl Client {
///
/// # Errors
/// Fails if signature generation fails
pub fn sign_query(&self, query: QueryBuilder) -> SignedQuery {
pub fn sign_query(&self, query: ClientQueryBuilder) -> SignedQuery {
query.sign(&self.key_pair)
}

Expand Down Expand Up @@ -822,17 +816,17 @@ impl Client {
where
<R::Output as TryFrom<QueryOutputBox>>::Error: Into<eyre::Error>,
{
let query_builder = QueryBuilder::new(request, self.account_id.clone()).with_filter(filter);
let request = self.sign_query(query_builder).encode_versioned();
let query_builder = ClientQueryBuilder::new(request, self.account_id.clone())
.with_filter(filter)
.with_pagination(pagination)
.with_sorting(sorting)
.with_fetch_size(fetch_size);
let request = self.sign_query(query_builder);

let query_request = QueryRequest {
torii_url: self.torii_url.clone(),
headers: self.headers.clone(),
request: crate::data_model::query::QueryRequest::Query(
crate::data_model::query::QueryWithParameters::new(
request, sorting, pagination, fetch_size,
),
),
request: crate::data_model::query::QueryRequest::Query(request),
};

(
Expand Down
Binary file modified configs/swarm/executor.wasm
100644 → 100755
Binary file not shown.
90 changes: 19 additions & 71 deletions core/src/query/store.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,23 @@
//! This module contains [`LiveQueryStore`] actor.
use std::{
cmp::Ordering,
num::NonZeroU64,
time::{Duration, Instant},
};

use indexmap::IndexMap;
use iroha_config::parameters::actual::LiveQueryStore as Config;
use iroha_data_model::{
asset::AssetValue,
query::{
cursor::ForwardCursor, error::QueryExecutionFail, pagination::Pagination, sorting::Sorting,
FetchSize, QueryId, QueryOutputBox, DEFAULT_FETCH_SIZE, MAX_FETCH_SIZE,
},
BatchedResponse, BatchedResponseV1, HasMetadata, IdentifiableBox, ValidationFail,
query::{cursor::ForwardCursor, error::QueryExecutionFail, QueryId, QueryOutputBox},
BatchedResponse, BatchedResponseV1, ValidationFail,
};
use iroha_logger::trace;
use parity_scale_codec::{Decode, Encode};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot};

use super::{
cursor::{Batch as _, Batched, UnknownCursor},
pagination::Paginate as _,
};
use crate::smartcontracts::query::LazyQueryOutput;
use super::cursor::{Batched, UnknownCursor};
use crate::smartcontracts::query::ProcessedQueryOutput;

/// Query service error.
#[derive(Debug, thiserror::Error, Copy, Clone, Serialize, Deserialize, Encode, Decode)]
Expand Down Expand Up @@ -157,36 +149,26 @@ pub struct LiveQueryStoreHandle {
}

impl LiveQueryStoreHandle {
/// Apply sorting and pagination to the query output.
/// Construct a batched response from a post-processed query output.
///
/// # Errors
///
/// - Returns [`Error::ConnectionClosed`] if [`LiveQueryStore`] is dropped,
/// - Otherwise throws up query output handling errors.
pub fn handle_query_output(
&self,
query_output: LazyQueryOutput<'_>,
sorting: &Sorting,
pagination: Pagination,
fetch_size: FetchSize,
query_output: ProcessedQueryOutput,
) -> Result<BatchedResponse<QueryOutputBox>> {
match query_output {
LazyQueryOutput::QueryOutput(batch) => {
ProcessedQueryOutput::Single(batch) => {
let cursor = ForwardCursor::default();
let result = BatchedResponseV1 { batch, cursor };
Ok(result.into())
}
LazyQueryOutput::Iter(iter) => {
let fetch_size = fetch_size.fetch_size.unwrap_or(DEFAULT_FETCH_SIZE);
if fetch_size > MAX_FETCH_SIZE {
return Err(Error::FetchSizeTooBig);
}

let live_query = Self::apply_sorting_and_pagination(iter, sorting, pagination);
ProcessedQueryOutput::Iter(live_query) => {
let query_id = uuid::Uuid::new_v4().to_string();

let curr_cursor = Some(0);
let live_query = live_query.batched(fetch_size);
self.construct_query_response(query_id, curr_cursor, live_query)
}
}
Expand Down Expand Up @@ -260,57 +242,18 @@ impl LiveQueryStoreHandle {

Ok(query_response.into())
}

fn apply_sorting_and_pagination(
iter: impl Iterator<Item = QueryOutputBox>,
sorting: &Sorting,
pagination: Pagination,
) -> Vec<QueryOutputBox> {
if let Some(key) = &sorting.sort_by_metadata_key {
let mut pairs: Vec<(Option<QueryOutputBox>, QueryOutputBox)> = iter
.map(|value| {
let key = match &value {
QueryOutputBox::Identifiable(IdentifiableBox::Asset(asset)) => {
match asset.value() {
AssetValue::Store(store) => store.get(key).cloned().map(Into::into),
_ => None,
}
}
QueryOutputBox::Identifiable(v) => TryInto::<&dyn HasMetadata>::try_into(v)
.ok()
.and_then(|has_metadata| has_metadata.metadata().get(key))
.cloned()
.map(Into::into),
_ => None,
};
(key, value)
})
.collect();
pairs.sort_by(
|(left_key, _), (right_key, _)| match (left_key, right_key) {
(Some(l), Some(r)) => l.cmp(r),
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
(None, None) => Ordering::Equal,
},
);
pairs
.into_iter()
.map(|(_, val)| val)
.paginate(pagination)
.collect()
} else {
iter.paginate(pagination).collect()
}
}
}

#[cfg(test)]
mod tests {
use iroha_data_model::metadata::MetadataValueBox;
use iroha_data_model::{
metadata::MetadataValueBox,
query::{predicate::PredicateBox, FetchSize, Pagination, Sorting},
};
use nonzero_ext::nonzero;

use super::*;
use crate::smartcontracts::query::LazyQueryOutput;

#[test]
fn query_message_order_preserved() {
Expand All @@ -319,6 +262,7 @@ mod tests {
let query_store_handle = threaded_rt.block_on(async { query_store.start() });

for i in 0..10_000 {
let filter = PredicateBox::default();
let pagination = Pagination::default();
let fetch_size = FetchSize {
fetch_size: Some(nonzero!(1_u32)),
Expand All @@ -331,8 +275,12 @@ mod tests {

let mut counter = 0;

let query_output = query_output
.apply_postprocessing(&filter, &sorting, pagination, fetch_size)
.unwrap();

let (batch, mut cursor) = query_store_handle
.handle_query_output(query_output, &sorting, pagination, fetch_size)
.handle_query_output(query_output)
.unwrap()
.into();
let QueryOutputBox::Vec(v) = batch else {
Expand Down
Loading

0 comments on commit ce01b6f

Please sign in to comment.