Skip to content

Commit

Permalink
feat(queries): Report amount of remaining items in query (#5016)
Browse files Browse the repository at this point in the history
Signed-off-by: ⭐️NINIKA⭐️ <[email protected]>
  • Loading branch information
DCNick3 authored Sep 4, 2024
1 parent 6613210 commit 36bbf04
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 35 deletions.
12 changes: 6 additions & 6 deletions client/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,27 +159,27 @@ impl QueryExecutor for Client {
fn start_query(
&self,
query: QueryWithParams,
) -> Result<(QueryOutputBatchBox, Option<Self::Cursor>), Self::Error> {
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error> {
let request_head = self.get_query_request_head();

let request = QueryRequest::Start(query);

let response = request_head.assemble(request).build()?.send()?;
let response = decode_iterable_query_response(&response)?;

let (batch, cursor) = response.into_parts();
let (batch, remaining_items, cursor) = response.into_parts();

let cursor = cursor.map(|cursor| QueryCursor {
request_head,
cursor,
});

Ok((batch, cursor))
Ok((batch, remaining_items, cursor))
}

fn continue_query(
cursor: Self::Cursor,
) -> Result<(QueryOutputBatchBox, Option<Self::Cursor>), Self::Error> {
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error> {
let QueryCursor {
request_head,
cursor,
Expand All @@ -190,14 +190,14 @@ impl QueryExecutor for Client {
let response = request_head.assemble(request).build()?.send()?;
let response = decode_iterable_query_response(&response)?;

let (batch, cursor) = response.into_parts();
let (batch, remaining_items, cursor) = response.into_parts();

let cursor = cursor.map(|cursor| QueryCursor {
request_head,
cursor,
});

Ok((batch, cursor))
Ok((batch, remaining_items, cursor))
}
}

Expand Down
30 changes: 29 additions & 1 deletion client/tests/integration/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,33 @@ fn limits_should_work() -> Result<()> {
Ok(())
}

#[test]
fn reported_length_should_be_accurate() -> Result<()> {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(11_170).start_with_runtime();
wait_for_genesis_committed(&vec![client.clone()], 0);

register_assets(&client)?;

let mut iter = client
.query(asset::all_definitions())
.with_pagination(Pagination {
limit: Some(nonzero!(7_u64)),
offset: 1,
})
.with_fetch_size(FetchSize::new(Some(nonzero!(3_u64))))
.execute()?;

assert_eq!(iter.len(), 7);

for _ in 0..4 {
iter.next().unwrap().unwrap();
}

assert_eq!(iter.len(), 3);

Ok(())
}

#[test]
fn fetch_size_should_work() -> Result<()> {
// use the lower-level API to inspect the batch size
Expand All @@ -49,9 +76,10 @@ fn fetch_size_should_work() -> Result<()> {
FetchSize::new(Some(nonzero!(3_u64))),
),
);
let (first_batch, _continue_cursor) = client.start_query(query)?;
let (first_batch, remaining_items, _continue_cursor) = client.start_query(query)?;

assert_eq!(first_batch.len(), 3);
assert_eq!(remaining_items, 4);

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions client_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,11 +1176,11 @@ mod json {
// we can't really do type-erased iterable queries in a nice way right now...
use iroha::data_model::query::builder::QueryExecutor;

let (mut first_batch, mut continue_cursor) =
let (mut first_batch, _remaining_items, mut continue_cursor) =
client.start_query(query)?;

while let Some(cursor) = continue_cursor {
let (next_batch, next_continue_cursor) =
let (next_batch, _remaining_items, next_continue_cursor) =
<Client as QueryExecutor>::continue_query(cursor)?;

first_batch.extend(next_batch);
Expand Down
16 changes: 14 additions & 2 deletions core/src/query/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ trait BatchedTrait {
&mut self,
cursor: u64,
) -> Result<(QueryOutputBatchBox, Option<NonZeroU64>), UnknownCursor>;
fn remaining(&self) -> u64;
}

struct BatchedInner<I> {
Expand All @@ -22,7 +23,7 @@ struct BatchedInner<I> {

impl<I> BatchedTrait for BatchedInner<I>
where
I: Iterator,
I: ExactSizeIterator,
QueryOutputBatchBox: From<Vec<I::Item>>,
{
fn next_batch(
Expand Down Expand Up @@ -76,6 +77,10 @@ where
.map(|cursor| NonZeroU64::new(cursor).expect("Cursor is never 0")),
))
}

fn remaining(&self) -> u64 {
self.iter.len() as u64
}
}

/// Unknown cursor error.
Expand All @@ -100,7 +105,7 @@ impl QueryBatchedErasedIterator {
/// Creates a new batched iterator. Boxes the inner iterator to erase its type.
pub fn new<I>(iter: I, batch_size: NonZeroU64) -> Self
where
I: Iterator + Send + Sync + 'static,
I: ExactSizeIterator + Send + Sync + 'static,
QueryOutputBatchBox: From<Vec<I::Item>>,
{
Self {
Expand Down Expand Up @@ -128,4 +133,11 @@ impl QueryBatchedErasedIterator {
) -> Result<(QueryOutputBatchBox, Option<NonZeroU64>), UnknownCursor> {
self.inner.next_batch(cursor)
}

/// Returns the number of remaining elements in the iterator.
///
/// You should not rely on the reported amount being correct for safety, same as [`ExactSizeIterator::len`].
pub fn remaining(&self) -> u64 {
self.inner.remaining()
}
}
6 changes: 6 additions & 0 deletions core/src/query/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ impl<I: Iterator> Iterator for Paginated<I> {
}
}

impl<I: ExactSizeIterator> ExactSizeIterator for Paginated<I> {
fn len(&self) -> usize {
self.0.len()
}
}

#[cfg(test)]
mod tests {
use iroha_data_model::query::parameters::Pagination;
Expand Down
31 changes: 24 additions & 7 deletions core/src/query/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,19 @@ impl LiveQueryStore {
&self,
query_id: QueryId,
cursor: NonZeroU64,
) -> Result<(QueryOutputBatchBox, Option<NonZeroU64>)> {
) -> Result<(QueryOutputBatchBox, u64, Option<NonZeroU64>)> {
trace!(%query_id, "Advancing existing query");
let QueryInfo {
mut live_query,
authority,
..
} = self.remove(&query_id).ok_or(UnknownCursor)?;
let (next_batch, next_cursor) = live_query.next_batch(cursor.get())?;
let remaining = live_query.remaining();
if next_cursor.is_some() {
self.insert(query_id, live_query, authority);
}
Ok((next_batch, next_cursor))
Ok((next_batch, remaining, next_cursor))
}

fn check_capacity(&self, authority: &AccountId) -> Result<()> {
Expand Down Expand Up @@ -248,12 +249,20 @@ impl LiveQueryStoreHandle {
let curr_cursor = 0;
let (batch, next_cursor) = live_query.next_batch(curr_cursor)?;

// NOTE: we are checking remaining items _after_ the first batch is taken
let remaining_items = live_query.remaining();

// if the cursor is `None` - the query has ended, we can remove it from the store
if next_cursor.is_some() {
self.store
.insert_new_query(query_id.clone(), live_query, authority.clone())?;
}
Ok(Self::construct_query_response(batch, query_id, next_cursor))
Ok(Self::construct_query_response(
batch,
remaining_items,
query_id,
next_cursor,
))
}

/// Retrieve next batch of query output using `cursor`.
Expand All @@ -265,9 +274,15 @@ impl LiveQueryStoreHandle {
&self,
ForwardCursor { query, cursor }: ForwardCursor,
) -> Result<QueryOutput> {
let (batch, next_cursor) = self.store.get_query_next_batch(query.clone(), cursor)?;
let (batch, remaining, next_cursor) =
self.store.get_query_next_batch(query.clone(), cursor)?;

Ok(Self::construct_query_response(batch, query, next_cursor))
Ok(Self::construct_query_response(
batch,
remaining,
query,
next_cursor,
))
}

/// Remove query from the storage if there is any.
Expand All @@ -277,11 +292,13 @@ impl LiveQueryStoreHandle {

fn construct_query_response(
batch: QueryOutputBatchBox,
remaining_items: u64,
query_id: QueryId,
cursor: Option<NonZeroU64>,
) -> QueryOutput {
QueryOutput::new(
batch,
remaining_items,
cursor.map(|cursor| ForwardCursor {
query: query_id,
cursor,
Expand Down Expand Up @@ -330,7 +347,7 @@ mod tests {
)
.unwrap();

let (batch, mut current_cursor) = query_store_handle
let (batch, _remaining_items, mut current_cursor) = query_store_handle
.handle_iter_start(query_output, &ALICE_ID)
.unwrap()
.into_parts();
Expand All @@ -342,7 +359,7 @@ mod tests {
let Ok(batched) = query_store_handle.handle_iter_continue(cursor) else {
break;
};
let (batch, cursor) = batched.into_parts();
let (batch, _remaining_items, cursor) = batched.into_parts();
counter += batch.len();

current_cursor = cursor;
Expand Down
30 changes: 23 additions & 7 deletions data_model/src/query/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,31 @@ pub trait QueryExecutor {
query: SingularQueryBox,
) -> Result<SingularQueryOutputBox, Self::Error>;

/// Starts an iterable query and returns the first batch of results.
/// Starts an iterable query and returns the first batch of results, the remaining number of results and a cursor to continue the query.
///
/// # Errors
///
/// Returns an error if the query execution fails.
fn start_query(
&self,
query: QueryWithParams,
) -> Result<(QueryOutputBatchBox, Option<Self::Cursor>), Self::Error>;
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error>;

/// Continues an iterable query from the given cursor and returns the next batch of results.
/// Continues an iterable query from the given cursor and returns the next batch of results, the remaining number of results and a cursor to continue the query.
///
/// # Errors
///
/// Returns an error if the query execution fails.
fn continue_query(
cursor: Self::Cursor,
) -> Result<(QueryOutputBatchBox, Option<Self::Cursor>), Self::Error>;
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error>;
}

/// An iterator over results of an iterable query.
#[derive(Debug)]
pub struct QueryIterator<E: QueryExecutor, T> {
current_batch_iter: vec::IntoIter<T>,
remaining_items: u64,
continue_cursor: Option<E::Cursor>,
}

Expand All @@ -73,12 +74,14 @@ where
/// Returns an error if the type of the batch does not match the expected type `T`.
pub fn new(
first_batch: QueryOutputBatchBox,
remaining_items: u64,
continue_cursor: Option<E::Cursor>,
) -> Result<Self, <Vec<T> as TryFrom<QueryOutputBatchBox>>::Error> {
let batch: Vec<T> = first_batch.try_into()?;

Ok(Self {
current_batch_iter: batch.into_iter(),
remaining_items,
continue_cursor,
})
}
Expand All @@ -102,7 +105,7 @@ where
let cursor = self.continue_cursor.take()?;

// get a next batch from iroha
let (batch, cursor) = match E::continue_query(cursor) {
let (batch, remaining_items, cursor) = match E::continue_query(cursor) {
Ok(r) => r,
Err(e) => return Some(Err(e)),
};
Expand All @@ -115,11 +118,23 @@ where
.expect("BUG: iroha returned unexpected type in iterable query");

self.current_batch_iter = batch.into_iter();
self.remaining_items = remaining_items;

self.next()
}
}

impl<E, T> ExactSizeIterator for QueryIterator<E, T>
where
E: QueryExecutor,
Vec<T>: TryFrom<QueryOutputBatchBox>,
<Vec<T> as TryFrom<QueryOutputBatchBox>>::Error: core::fmt::Debug,
{
fn len(&self) -> usize {
self.current_batch_iter.len() + self.remaining_items as usize
}
}

/// An error that can occur when constraining the number of results of an iterable query to one.
#[derive(
Debug,
Expand Down Expand Up @@ -256,9 +271,10 @@ where
},
};

let (first_batch, continue_cursor) = self.query_executor.start_query(query)?;
let (first_batch, remaining_items, continue_cursor) =
self.query_executor.start_query(query)?;

let iterator = QueryIterator::<E, Q::Item>::new(first_batch, continue_cursor)
let iterator = QueryIterator::<E, Q::Item>::new(first_batch, remaining_items, continue_cursor)
.expect(
"INTERNAL BUG: iroha returned unexpected type in iterable query. Is there a schema mismatch?",
);
Expand Down
13 changes: 10 additions & 3 deletions data_model/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ mod model {
pub struct QueryOutput {
/// A single batch of results
pub batch: QueryOutputBatchBox,
/// The number of items in the query remaining to be fetched after this batch
pub remaining_items: u64,
/// If not `None`, contains a cursor that can be used to fetch the next batch of results. Otherwise the current batch is the last one.
pub continue_cursor: Option<ForwardCursor>,
}
Expand Down Expand Up @@ -308,16 +310,21 @@ impl SingularQuery for SingularQueryBox {

impl QueryOutput {
/// Create a new [`QueryOutput`] from the iroha response parts.
pub fn new(batch: QueryOutputBatchBox, continue_cursor: Option<ForwardCursor>) -> Self {
pub fn new(
batch: QueryOutputBatchBox,
remaining_items: u64,
continue_cursor: Option<ForwardCursor>,
) -> Self {
Self {
batch,
remaining_items,
continue_cursor,
}
}

/// Split this [`QueryOutput`] into its constituent parts.
pub fn into_parts(self) -> (QueryOutputBatchBox, Option<ForwardCursor>) {
(self.batch, self.continue_cursor)
pub fn into_parts(self) -> (QueryOutputBatchBox, u64, Option<ForwardCursor>) {
(self.batch, self.remaining_items, self.continue_cursor)
}
}

Expand Down
Loading

0 comments on commit 36bbf04

Please sign in to comment.