Skip to content

Commit

Permalink
feat(queries): expose the remaining number of items in rust client API
Browse files Browse the repository at this point in the history
Signed-off-by: ⭐️NINIKA⭐️ <[email protected]>
  • Loading branch information
DCNick3 committed Aug 28, 2024
1 parent 47647b2 commit 76149b1
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 27 deletions.
12 changes: 6 additions & 6 deletions client/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,27 +159,27 @@ impl QueryExecutor for Client {
fn start_query(
&self,
query: QueryWithParams,
) -> Result<(QueryOutputBatchBox, Option<Self::Cursor>), Self::Error> {
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error> {
let request_head = self.get_query_request_head();

let request = QueryRequest::Start(query);

let response = request_head.assemble(request).build()?.send()?;
let response = decode_iterable_query_response(&response)?;

let (batch, cursor) = response.into_parts();
let (batch, remaining_items, cursor) = response.into_parts();

let cursor = cursor.map(|cursor| QueryCursor {
request_head,
cursor,
});

Ok((batch, cursor))
Ok((batch, remaining_items, cursor))
}

fn continue_query(
cursor: Self::Cursor,
) -> Result<(QueryOutputBatchBox, Option<Self::Cursor>), Self::Error> {
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error> {
let QueryCursor {
request_head,
cursor,
Expand All @@ -190,14 +190,14 @@ impl QueryExecutor for Client {
let response = request_head.assemble(request).build()?.send()?;
let response = decode_iterable_query_response(&response)?;

let (batch, cursor) = response.into_parts();
let (batch, remaining_items, cursor) = response.into_parts();

let cursor = cursor.map(|cursor| QueryCursor {
request_head,
cursor,
});

Ok((batch, cursor))
Ok((batch, remaining_items, cursor))
}
}

Expand Down
30 changes: 29 additions & 1 deletion client/tests/integration/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,33 @@ fn limits_should_work() -> Result<()> {
Ok(())
}

#[test]
fn reported_length_should_be_accurate() -> Result<()> {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(10_690).start_with_runtime();
wait_for_genesis_committed(&vec![client.clone()], 0);

register_assets(&client)?;

let mut iter = client
.query(asset::all_definitions())
.with_pagination(Pagination {
limit: Some(nonzero!(7_u64)),
offset: 1,
})
.with_fetch_size(FetchSize::new(Some(nonzero!(3_u64))))
.execute()?;

assert_eq!(iter.len(), 7);

for _ in 0..4 {
iter.next().unwrap().unwrap();
}

assert_eq!(iter.len(), 3);

Ok(())
}

#[test]
fn fetch_size_should_work() -> Result<()> {
// use the lower-level API to inspect the batch size
Expand All @@ -49,9 +76,10 @@ fn fetch_size_should_work() -> Result<()> {
FetchSize::new(Some(nonzero!(3_u64))),
),
);
let (first_batch, _continue_cursor) = client.start_query(query)?;
let (first_batch, remaining_items, _continue_cursor) = client.start_query(query)?;

assert_eq!(first_batch.len(), 3);
assert_eq!(remaining_items, 4);

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions client_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,11 +1176,11 @@ mod json {
// we can't really do type-erased iterable queries in a nice way right now...
use iroha::data_model::query::builder::QueryExecutor;

let (mut first_batch, mut continue_cursor) =
let (mut first_batch, _remaining_items, mut continue_cursor) =
client.start_query(query)?;

while let Some(cursor) = continue_cursor {
let (next_batch, next_continue_cursor) =
let (next_batch, _remaining_items, next_continue_cursor) =
<Client as QueryExecutor>::continue_query(cursor)?;

first_batch.extend(next_batch);
Expand Down
4 changes: 2 additions & 2 deletions core/src/query/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ mod tests {
)
.unwrap();

let (batch, mut current_cursor) = query_store_handle
let (batch, _remaining_items, mut current_cursor) = query_store_handle
.handle_iter_start(query_output, &ALICE_ID)
.unwrap()
.into_parts();
Expand All @@ -359,7 +359,7 @@ mod tests {
let Ok(batched) = query_store_handle.handle_iter_continue(cursor) else {
break;
};
let (batch, cursor) = batched.into_parts();
let (batch, _remaining_items, cursor) = batched.into_parts();
counter += batch.len();

current_cursor = cursor;
Expand Down
30 changes: 23 additions & 7 deletions data_model/src/query/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,31 @@ pub trait QueryExecutor {
query: SingularQueryBox,
) -> Result<SingularQueryOutputBox, Self::Error>;

/// Starts an iterable query and returns the first batch of results.
/// Starts an iterable query and returns the first batch of results, the remaining number of results and a cursor to continue the query.
///
/// # Errors
///
/// Returns an error if the query execution fails.
fn start_query(
&self,
query: QueryWithParams,
) -> Result<(QueryOutputBatchBox, Option<Self::Cursor>), Self::Error>;
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error>;

/// Continues an iterable query from the given cursor and returns the next batch of results.
/// Continues an iterable query from the given cursor and returns the next batch of results, the remaining number of results and a cursor to continue the query.
///
/// # Errors
///
/// Returns an error if the query execution fails.
fn continue_query(
cursor: Self::Cursor,
) -> Result<(QueryOutputBatchBox, Option<Self::Cursor>), Self::Error>;
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error>;
}

/// An iterator over results of an iterable query.
#[derive(Debug)]
pub struct QueryIterator<E: QueryExecutor, T> {
current_batch_iter: vec::IntoIter<T>,
remaining_items: u64,
continue_cursor: Option<E::Cursor>,
}

Expand All @@ -73,12 +74,14 @@ where
/// Returns an error if the type of the batch does not match the expected type `T`.
pub fn new(
first_batch: QueryOutputBatchBox,
remaining_items: u64,
continue_cursor: Option<E::Cursor>,
) -> Result<Self, <Vec<T> as TryFrom<QueryOutputBatchBox>>::Error> {
let batch: Vec<T> = first_batch.try_into()?;

Ok(Self {
current_batch_iter: batch.into_iter(),
remaining_items,
continue_cursor,
})
}
Expand All @@ -102,7 +105,7 @@ where
let cursor = self.continue_cursor.take()?;

// get a next batch from iroha
let (batch, cursor) = match E::continue_query(cursor) {
let (batch, remaining_items, cursor) = match E::continue_query(cursor) {
Ok(r) => r,
Err(e) => return Some(Err(e)),
};
Expand All @@ -115,11 +118,23 @@ where
.expect("BUG: iroha returned unexpected type in iterable query");

self.current_batch_iter = batch.into_iter();
self.remaining_items = remaining_items;

self.next()
}
}

impl<E, T> ExactSizeIterator for QueryIterator<E, T>
where
E: QueryExecutor,
Vec<T>: TryFrom<QueryOutputBatchBox>,
<Vec<T> as TryFrom<QueryOutputBatchBox>>::Error: core::fmt::Debug,
{
fn len(&self) -> usize {
self.current_batch_iter.len() + self.remaining_items as usize
}
}

/// An error that can occur when constraining the number of results of an iterable query to one.
#[derive(
Debug,
Expand Down Expand Up @@ -256,9 +271,10 @@ where
},
};

let (first_batch, continue_cursor) = self.query_executor.start_query(query)?;
let (first_batch, remaining_items, continue_cursor) =
self.query_executor.start_query(query)?;

let iterator = QueryIterator::<E, Q::Item>::new(first_batch, continue_cursor)
let iterator = QueryIterator::<E, Q::Item>::new(first_batch, remaining_items, continue_cursor)
.expect(
"INTERNAL BUG: iroha returned unexpected type in iterable query. Is there a schema mismatch?",
);
Expand Down
4 changes: 2 additions & 2 deletions data_model/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ impl QueryOutput {
}

/// Split this [`QueryOutput`] into its constituent parts.
pub fn into_parts(self) -> (QueryOutputBatchBox, Option<ForwardCursor>) {
(self.batch, self.continue_cursor)
pub fn into_parts(self) -> (QueryOutputBatchBox, u64, Option<ForwardCursor>) {
(self.batch, self.remaining_items, self.continue_cursor)
}
}

Expand Down
20 changes: 14 additions & 6 deletions smart_contract/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,28 +149,36 @@ impl QueryExecutor for SmartContractQueryExecutor {
fn start_query(
&self,
query: QueryWithParams,
) -> Result<(QueryOutputBatchBox, Option<Self::Cursor>), Self::Error> {
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error> {
let QueryResponse::Iterable(output) = execute_query(&QueryRequest::Start(query))? else {
dbg_panic("BUG: iroha returned unexpected type in iterable query");
};

let (batch, cursor) = output.into_parts();
let (batch, remaining_items, cursor) = output.into_parts();

Ok((batch, cursor.map(|cursor| QueryCursor { cursor })))
Ok((
batch,
remaining_items,
cursor.map(|cursor| QueryCursor { cursor }),
))
}

fn continue_query(
cursor: Self::Cursor,
) -> Result<(QueryOutputBatchBox, Option<Self::Cursor>), Self::Error> {
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error> {
let QueryResponse::Iterable(output) =
execute_query(&QueryRequest::Continue(cursor.cursor))?
else {
dbg_panic("BUG: iroha returned unexpected type in iterable query");
};

let (batch, cursor) = output.into_parts();
let (batch, remaining_items, cursor) = output.into_parts();

Ok((batch, cursor.map(|cursor| QueryCursor { cursor })))
Ok((
batch,
remaining_items,
cursor.map(|cursor| QueryCursor { cursor }),
))
}
}

Expand Down
2 changes: 1 addition & 1 deletion wasm_samples/query_assets_and_save_cursor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn main(owner: AccountId) {
pub cursor: ForwardCursor,
}

let (_batch, cursor) = SmartContractQueryExecutor
let (_batch, _remaining_items, cursor) = SmartContractQueryExecutor
.start_query(QueryWithParams::new(
QueryWithFilter::new(FindAssets, CompoundPredicate::PASS).into(),
QueryParams::new(
Expand Down

0 comments on commit 76149b1

Please sign in to comment.