Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(queries): Report amount of remaining items in query #5016

Merged
merged 3 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions client/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,27 +159,27 @@ impl QueryExecutor for Client {
fn start_query(
&self,
query: QueryWithParams,
) -> Result<(QueryOutputBatchBox, Option<Self::Cursor>), Self::Error> {
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error> {
let request_head = self.get_query_request_head();

let request = QueryRequest::Start(query);

let response = request_head.assemble(request).build()?.send()?;
let response = decode_iterable_query_response(&response)?;

let (batch, cursor) = response.into_parts();
let (batch, remaining_items, cursor) = response.into_parts();

let cursor = cursor.map(|cursor| QueryCursor {
request_head,
cursor,
});

Ok((batch, cursor))
Ok((batch, remaining_items, cursor))
}

fn continue_query(
cursor: Self::Cursor,
) -> Result<(QueryOutputBatchBox, Option<Self::Cursor>), Self::Error> {
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error> {
let QueryCursor {
request_head,
cursor,
Expand All @@ -190,14 +190,14 @@ impl QueryExecutor for Client {
let response = request_head.assemble(request).build()?.send()?;
let response = decode_iterable_query_response(&response)?;

let (batch, cursor) = response.into_parts();
let (batch, remaining_items, cursor) = response.into_parts();

let cursor = cursor.map(|cursor| QueryCursor {
request_head,
cursor,
});

Ok((batch, cursor))
Ok((batch, remaining_items, cursor))
}
}

Expand Down
30 changes: 29 additions & 1 deletion client/tests/integration/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,33 @@ fn limits_should_work() -> Result<()> {
Ok(())
}

#[test]
fn reported_length_should_be_accurate() -> Result<()> {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(11_170).start_with_runtime();
wait_for_genesis_committed(&vec![client.clone()], 0);

register_assets(&client)?;

let mut iter = client
.query(asset::all_definitions())
.with_pagination(Pagination {
limit: Some(nonzero!(7_u64)),
offset: 1,
})
.with_fetch_size(FetchSize::new(Some(nonzero!(3_u64))))
.execute()?;

assert_eq!(iter.len(), 7);

for _ in 0..4 {
iter.next().unwrap().unwrap();
}

assert_eq!(iter.len(), 3);

Ok(())
}

#[test]
fn fetch_size_should_work() -> Result<()> {
// use the lower-level API to inspect the batch size
Expand All @@ -49,9 +76,10 @@ fn fetch_size_should_work() -> Result<()> {
FetchSize::new(Some(nonzero!(3_u64))),
),
);
let (first_batch, _continue_cursor) = client.start_query(query)?;
let (first_batch, remaining_items, _continue_cursor) = client.start_query(query)?;

assert_eq!(first_batch.len(), 3);
assert_eq!(remaining_items, 4);

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions client_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,11 +1176,11 @@ mod json {
// we can't really do type-erased iterable queries in a nice way right now...
use iroha::data_model::query::builder::QueryExecutor;

let (mut first_batch, mut continue_cursor) =
let (mut first_batch, _remaining_items, mut continue_cursor) =
client.start_query(query)?;

while let Some(cursor) = continue_cursor {
let (next_batch, next_continue_cursor) =
let (next_batch, _remaining_items, next_continue_cursor) =
<Client as QueryExecutor>::continue_query(cursor)?;

first_batch.extend(next_batch);
Expand Down
16 changes: 14 additions & 2 deletions core/src/query/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ trait BatchedTrait {
&mut self,
cursor: u64,
) -> Result<(QueryOutputBatchBox, Option<NonZeroU64>), UnknownCursor>;
fn remaining(&self) -> u64;
}

struct BatchedInner<I> {
Expand All @@ -22,7 +23,7 @@ struct BatchedInner<I> {

impl<I> BatchedTrait for BatchedInner<I>
where
I: Iterator,
I: ExactSizeIterator,
QueryOutputBatchBox: From<Vec<I::Item>>,
{
fn next_batch(
Expand Down Expand Up @@ -76,6 +77,10 @@ where
.map(|cursor| NonZeroU64::new(cursor).expect("Cursor is never 0")),
))
}

fn remaining(&self) -> u64 {
self.iter.len() as u64
}
}

/// Unknown cursor error.
Expand All @@ -100,7 +105,7 @@ impl QueryBatchedErasedIterator {
/// Creates a new batched iterator. Boxes the inner iterator to erase its type.
pub fn new<I>(iter: I, batch_size: NonZeroU64) -> Self
where
I: Iterator + Send + Sync + 'static,
I: ExactSizeIterator + Send + Sync + 'static,
QueryOutputBatchBox: From<Vec<I::Item>>,
{
Self {
Expand Down Expand Up @@ -128,4 +133,11 @@ impl QueryBatchedErasedIterator {
) -> Result<(QueryOutputBatchBox, Option<NonZeroU64>), UnknownCursor> {
self.inner.next_batch(cursor)
}

/// Returns the number of remaining elements in the iterator.
///
/// You should not rely on the reported amount being correct for safety, same as [`ExactSizeIterator::len`].
pub fn remaining(&self) -> u64 {
self.inner.remaining()
}
}
6 changes: 6 additions & 0 deletions core/src/query/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ impl<I: Iterator> Iterator for Paginated<I> {
}
}

impl<I: ExactSizeIterator> ExactSizeIterator for Paginated<I> {
fn len(&self) -> usize {
self.0.len()
}
}

#[cfg(test)]
mod tests {
use iroha_data_model::query::parameters::Pagination;
Expand Down
31 changes: 24 additions & 7 deletions core/src/query/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,19 @@ impl LiveQueryStore {
&self,
query_id: QueryId,
cursor: NonZeroU64,
) -> Result<(QueryOutputBatchBox, Option<NonZeroU64>)> {
) -> Result<(QueryOutputBatchBox, u64, Option<NonZeroU64>)> {
trace!(%query_id, "Advancing existing query");
let QueryInfo {
mut live_query,
authority,
..
} = self.remove(&query_id).ok_or(UnknownCursor)?;
let (next_batch, next_cursor) = live_query.next_batch(cursor.get())?;
let remaining = live_query.remaining();
if next_cursor.is_some() {
self.insert(query_id, live_query, authority);
}
Ok((next_batch, next_cursor))
Ok((next_batch, remaining, next_cursor))
}

fn check_capacity(&self, authority: &AccountId) -> Result<()> {
Expand Down Expand Up @@ -248,12 +249,20 @@ impl LiveQueryStoreHandle {
let curr_cursor = 0;
let (batch, next_cursor) = live_query.next_batch(curr_cursor)?;

// NOTE: we are checking remaining items _after_ the first batch is taken
let remaining_items = live_query.remaining();

// if the cursor is `None` - the query has ended, we can remove it from the store
if next_cursor.is_some() {
self.store
.insert_new_query(query_id.clone(), live_query, authority.clone())?;
}
Ok(Self::construct_query_response(batch, query_id, next_cursor))
Ok(Self::construct_query_response(
batch,
remaining_items,
query_id,
next_cursor,
))
}

/// Retrieve next batch of query output using `cursor`.
Expand All @@ -265,9 +274,15 @@ impl LiveQueryStoreHandle {
&self,
ForwardCursor { query, cursor }: ForwardCursor,
) -> Result<QueryOutput> {
let (batch, next_cursor) = self.store.get_query_next_batch(query.clone(), cursor)?;
let (batch, remaining, next_cursor) =
self.store.get_query_next_batch(query.clone(), cursor)?;

Ok(Self::construct_query_response(batch, query, next_cursor))
Ok(Self::construct_query_response(
batch,
remaining,
query,
next_cursor,
))
}

/// Remove query from the storage if there is any.
Expand All @@ -277,11 +292,13 @@ impl LiveQueryStoreHandle {

fn construct_query_response(
batch: QueryOutputBatchBox,
remaining_items: u64,
query_id: QueryId,
cursor: Option<NonZeroU64>,
) -> QueryOutput {
QueryOutput::new(
batch,
remaining_items,
cursor.map(|cursor| ForwardCursor {
query: query_id,
cursor,
Expand Down Expand Up @@ -330,7 +347,7 @@ mod tests {
)
.unwrap();

let (batch, mut current_cursor) = query_store_handle
let (batch, _remaining_items, mut current_cursor) = query_store_handle
.handle_iter_start(query_output, &ALICE_ID)
.unwrap()
.into_parts();
Expand All @@ -342,7 +359,7 @@ mod tests {
let Ok(batched) = query_store_handle.handle_iter_continue(cursor) else {
break;
};
let (batch, cursor) = batched.into_parts();
let (batch, _remaining_items, cursor) = batched.into_parts();
counter += batch.len();

current_cursor = cursor;
Expand Down
30 changes: 23 additions & 7 deletions data_model/src/query/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,31 @@ pub trait QueryExecutor {
query: SingularQueryBox,
) -> Result<SingularQueryOutputBox, Self::Error>;

/// Starts an iterable query and returns the first batch of results.
/// Starts an iterable query and returns the first batch of results, the remaining number of results and a cursor to continue the query.
///
/// # Errors
///
/// Returns an error if the query execution fails.
fn start_query(
&self,
query: QueryWithParams,
) -> Result<(QueryOutputBatchBox, Option<Self::Cursor>), Self::Error>;
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error>;

/// Continues an iterable query from the given cursor and returns the next batch of results.
/// Continues an iterable query from the given cursor and returns the next batch of results, the remaining number of results and a cursor to continue the query.
///
/// # Errors
///
/// Returns an error if the query execution fails.
fn continue_query(
cursor: Self::Cursor,
) -> Result<(QueryOutputBatchBox, Option<Self::Cursor>), Self::Error>;
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error>;
}

/// An iterator over results of an iterable query.
#[derive(Debug)]
pub struct QueryIterator<E: QueryExecutor, T> {
current_batch_iter: vec::IntoIter<T>,
remaining_items: u64,
continue_cursor: Option<E::Cursor>,
}

Expand All @@ -73,12 +74,14 @@ where
/// Returns an error if the type of the batch does not match the expected type `T`.
pub fn new(
first_batch: QueryOutputBatchBox,
remaining_items: u64,
continue_cursor: Option<E::Cursor>,
) -> Result<Self, <Vec<T> as TryFrom<QueryOutputBatchBox>>::Error> {
let batch: Vec<T> = first_batch.try_into()?;

Ok(Self {
current_batch_iter: batch.into_iter(),
remaining_items,
continue_cursor,
})
}
Expand All @@ -102,7 +105,7 @@ where
let cursor = self.continue_cursor.take()?;

// get a next batch from iroha
let (batch, cursor) = match E::continue_query(cursor) {
let (batch, remaining_items, cursor) = match E::continue_query(cursor) {
Ok(r) => r,
Err(e) => return Some(Err(e)),
};
Expand All @@ -115,11 +118,23 @@ where
.expect("BUG: iroha returned unexpected type in iterable query");

self.current_batch_iter = batch.into_iter();
self.remaining_items = remaining_items;

self.next()
}
}

impl<E, T> ExactSizeIterator for QueryIterator<E, T>
where
E: QueryExecutor,
Vec<T>: TryFrom<QueryOutputBatchBox>,
<Vec<T> as TryFrom<QueryOutputBatchBox>>::Error: core::fmt::Debug,
{
fn len(&self) -> usize {
self.current_batch_iter.len() + self.remaining_items as usize
}
}

/// An error that can occur when constraining the number of results of an iterable query to one.
#[derive(
Debug,
Expand Down Expand Up @@ -256,9 +271,10 @@ where
},
};

let (first_batch, continue_cursor) = self.query_executor.start_query(query)?;
let (first_batch, remaining_items, continue_cursor) =
self.query_executor.start_query(query)?;

let iterator = QueryIterator::<E, Q::Item>::new(first_batch, continue_cursor)
let iterator = QueryIterator::<E, Q::Item>::new(first_batch, remaining_items, continue_cursor)
.expect(
"INTERNAL BUG: iroha returned unexpected type in iterable query. Is there a schema mismatch?",
);
Expand Down
13 changes: 10 additions & 3 deletions data_model/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ mod model {
pub struct QueryOutput {
/// A single batch of results
pub batch: QueryOutputBatchBox,
/// The number of items in the query remaining to be fetched after this batch
pub remaining_items: u64,
/// If not `None`, contains a cursor that can be used to fetch the next batch of results. Otherwise the current batch is the last one.
pub continue_cursor: Option<ForwardCursor>,
}
Expand Down Expand Up @@ -308,16 +310,21 @@ impl SingularQuery for SingularQueryBox {

impl QueryOutput {
/// Create a new [`QueryOutput`] from the iroha response parts.
pub fn new(batch: QueryOutputBatchBox, continue_cursor: Option<ForwardCursor>) -> Self {
pub fn new(
batch: QueryOutputBatchBox,
remaining_items: u64,
continue_cursor: Option<ForwardCursor>,
) -> Self {
Self {
batch,
remaining_items,
continue_cursor,
}
}

/// Split this [`QueryOutput`] into its constituent parts.
pub fn into_parts(self) -> (QueryOutputBatchBox, Option<ForwardCursor>) {
(self.batch, self.continue_cursor)
pub fn into_parts(self) -> (QueryOutputBatchBox, u64, Option<ForwardCursor>) {
(self.batch, self.remaining_items, self.continue_cursor)
}
}

Expand Down
Loading
Loading