diff --git a/client/src/query.rs b/client/src/query.rs index 532d685aa8e..c676b31e609 100644 --- a/client/src/query.rs +++ b/client/src/query.rs @@ -159,7 +159,7 @@ impl QueryExecutor for Client { fn start_query( &self, query: QueryWithParams, - ) -> Result<(QueryOutputBatchBox, Option), Self::Error> { + ) -> Result<(QueryOutputBatchBox, u64, Option), Self::Error> { let request_head = self.get_query_request_head(); let request = QueryRequest::Start(query); @@ -167,19 +167,19 @@ impl QueryExecutor for Client { let response = request_head.assemble(request).build()?.send()?; let response = decode_iterable_query_response(&response)?; - let (batch, cursor) = response.into_parts(); + let (batch, remaining_items, cursor) = response.into_parts(); let cursor = cursor.map(|cursor| QueryCursor { request_head, cursor, }); - Ok((batch, cursor)) + Ok((batch, remaining_items, cursor)) } fn continue_query( cursor: Self::Cursor, - ) -> Result<(QueryOutputBatchBox, Option), Self::Error> { + ) -> Result<(QueryOutputBatchBox, u64, Option), Self::Error> { let QueryCursor { request_head, cursor, @@ -190,14 +190,14 @@ impl QueryExecutor for Client { let response = request_head.assemble(request).build()?.send()?; let response = decode_iterable_query_response(&response)?; - let (batch, cursor) = response.into_parts(); + let (batch, remaining_items, cursor) = response.into_parts(); let cursor = cursor.map(|cursor| QueryCursor { request_head, cursor, }); - Ok((batch, cursor)) + Ok((batch, remaining_items, cursor)) } } diff --git a/client/tests/integration/pagination.rs b/client/tests/integration/pagination.rs index b741a4f11ac..2431f5abb42 100644 --- a/client/tests/integration/pagination.rs +++ b/client/tests/integration/pagination.rs @@ -24,6 +24,33 @@ fn limits_should_work() -> Result<()> { Ok(()) } +#[test] +fn reported_length_should_be_accurate() -> Result<()> { + let (_rt, _peer, client) = ::new().with_port(11_170).start_with_runtime(); + wait_for_genesis_committed(&vec![client.clone()], 0); + + register_assets(&client)?; + + let mut iter = client + .query(asset::all_definitions()) + .with_pagination(Pagination { + limit: Some(nonzero!(7_u64)), + offset: 1, + }) + .with_fetch_size(FetchSize::new(Some(nonzero!(3_u64)))) + .execute()?; + + assert_eq!(iter.len(), 7); + + for _ in 0..4 { + iter.next().unwrap().unwrap(); + } + + assert_eq!(iter.len(), 3); + + Ok(()) +} + #[test] fn fetch_size_should_work() -> Result<()> { // use the lower-level API to inspect the batch size @@ -49,9 +76,10 @@ fn fetch_size_should_work() -> Result<()> { FetchSize::new(Some(nonzero!(3_u64))), ), ); - let (first_batch, _continue_cursor) = client.start_query(query)?; + let (first_batch, remaining_items, _continue_cursor) = client.start_query(query)?; assert_eq!(first_batch.len(), 3); + assert_eq!(remaining_items, 4); Ok(()) } diff --git a/client_cli/src/main.rs b/client_cli/src/main.rs index 18f2a491cea..d7c4edd1c87 100644 --- a/client_cli/src/main.rs +++ b/client_cli/src/main.rs @@ -1176,11 +1176,11 @@ mod json { // we can't really do type-erased iterable queries in a nice way right now... use iroha::data_model::query::builder::QueryExecutor; - let (mut first_batch, mut continue_cursor) = + let (mut first_batch, _remaining_items, mut continue_cursor) = client.start_query(query)?; while let Some(cursor) = continue_cursor { - let (next_batch, next_continue_cursor) = + let (next_batch, _remaining_items, next_continue_cursor) = ::continue_query(cursor)?; first_batch.extend(next_batch); diff --git a/core/src/query/cursor.rs b/core/src/query/cursor.rs index 9ef2cad752f..db2378f0f23 100644 --- a/core/src/query/cursor.rs +++ b/core/src/query/cursor.rs @@ -12,6 +12,7 @@ trait BatchedTrait { &mut self, cursor: u64, ) -> Result<(QueryOutputBatchBox, Option), UnknownCursor>; + fn remaining(&self) -> u64; } struct BatchedInner { @@ -22,7 +23,7 @@ struct BatchedInner { impl BatchedTrait for BatchedInner where - I: Iterator, + I: ExactSizeIterator, QueryOutputBatchBox: From>, { fn next_batch( @@ -76,6 +77,10 @@ where .map(|cursor| NonZeroU64::new(cursor).expect("Cursor is never 0")), )) } + + fn remaining(&self) -> u64 { + self.iter.len() as u64 + } } /// Unknown cursor error. @@ -100,7 +105,7 @@ impl QueryBatchedErasedIterator { /// Creates a new batched iterator. Boxes the inner iterator to erase its type. pub fn new(iter: I, batch_size: NonZeroU64) -> Self where - I: Iterator + Send + Sync + 'static, + I: ExactSizeIterator + Send + Sync + 'static, QueryOutputBatchBox: From>, { Self { @@ -128,4 +133,11 @@ impl QueryBatchedErasedIterator { ) -> Result<(QueryOutputBatchBox, Option), UnknownCursor> { self.inner.next_batch(cursor) } + + /// Returns the number of remaining elements in the iterator. + /// + /// You should not rely on the reported amount being correct for safety, same as [`ExactSizeIterator::len`]. + pub fn remaining(&self) -> u64 { + self.inner.remaining() + } } diff --git a/core/src/query/pagination.rs b/core/src/query/pagination.rs index 5185211a970..baad40b5dd3 100644 --- a/core/src/query/pagination.rs +++ b/core/src/query/pagination.rs @@ -45,6 +45,12 @@ impl Iterator for Paginated { } } +impl ExactSizeIterator for Paginated { + fn len(&self) -> usize { + self.0.len() + } +} + #[cfg(test)] mod tests { use iroha_data_model::query::parameters::Pagination; diff --git a/core/src/query/store.rs b/core/src/query/store.rs index 4e4113698b9..ca07126ac7e 100644 --- a/core/src/query/store.rs +++ b/core/src/query/store.rs @@ -189,7 +189,7 @@ impl LiveQueryStore { &self, query_id: QueryId, cursor: NonZeroU64, - ) -> Result<(QueryOutputBatchBox, Option)> { + ) -> Result<(QueryOutputBatchBox, u64, Option)> { trace!(%query_id, "Advancing existing query"); let QueryInfo { mut live_query, @@ -197,10 +197,11 @@ impl LiveQueryStore { .. } = self.remove(&query_id).ok_or(UnknownCursor)?; let (next_batch, next_cursor) = live_query.next_batch(cursor.get())?; + let remaining = live_query.remaining(); if next_cursor.is_some() { self.insert(query_id, live_query, authority); } - Ok((next_batch, next_cursor)) + Ok((next_batch, remaining, next_cursor)) } fn check_capacity(&self, authority: &AccountId) -> Result<()> { @@ -248,12 +249,20 @@ impl LiveQueryStoreHandle { let curr_cursor = 0; let (batch, next_cursor) = live_query.next_batch(curr_cursor)?; + // NOTE: we are checking remaining items _after_ the first batch is taken + let remaining_items = live_query.remaining(); + // if the cursor is `None` - the query has ended, we can remove it from the store if next_cursor.is_some() { self.store .insert_new_query(query_id.clone(), live_query, authority.clone())?; } - Ok(Self::construct_query_response(batch, query_id, next_cursor)) + Ok(Self::construct_query_response( + batch, + remaining_items, + query_id, + next_cursor, + )) } /// Retrieve next batch of query output using `cursor`. @@ -265,9 +274,15 @@ impl LiveQueryStoreHandle { &self, ForwardCursor { query, cursor }: ForwardCursor, ) -> Result { - let (batch, next_cursor) = self.store.get_query_next_batch(query.clone(), cursor)?; + let (batch, remaining, next_cursor) = + self.store.get_query_next_batch(query.clone(), cursor)?; - Ok(Self::construct_query_response(batch, query, next_cursor)) + Ok(Self::construct_query_response( + batch, + remaining, + query, + next_cursor, + )) } /// Remove query from the storage if there is any. @@ -277,11 +292,13 @@ impl LiveQueryStoreHandle { fn construct_query_response( batch: QueryOutputBatchBox, + remaining_items: u64, query_id: QueryId, cursor: Option, ) -> QueryOutput { QueryOutput::new( batch, + remaining_items, cursor.map(|cursor| ForwardCursor { query: query_id, cursor, @@ -330,7 +347,7 @@ mod tests { ) .unwrap(); - let (batch, mut current_cursor) = query_store_handle + let (batch, _remaining_items, mut current_cursor) = query_store_handle .handle_iter_start(query_output, &ALICE_ID) .unwrap() .into_parts(); @@ -342,7 +359,7 @@ mod tests { let Ok(batched) = query_store_handle.handle_iter_continue(cursor) else { break; }; - let (batch, cursor) = batched.into_parts(); + let (batch, _remaining_items, cursor) = batched.into_parts(); counter += batch.len(); current_cursor = cursor; diff --git a/data_model/src/query/builder.rs b/data_model/src/query/builder.rs index c704e818629..bafe5e1df1d 100644 --- a/data_model/src/query/builder.rs +++ b/data_model/src/query/builder.rs @@ -34,7 +34,7 @@ pub trait QueryExecutor { query: SingularQueryBox, ) -> Result; - /// Starts an iterable query and returns the first batch of results. + /// Starts an iterable query and returns the first batch of results, the remaining number of results and a cursor to continue the query. /// /// # Errors /// @@ -42,22 +42,23 @@ pub trait QueryExecutor { fn start_query( &self, query: QueryWithParams, - ) -> Result<(QueryOutputBatchBox, Option), Self::Error>; + ) -> Result<(QueryOutputBatchBox, u64, Option), Self::Error>; - /// Continues an iterable query from the given cursor and returns the next batch of results. + /// Continues an iterable query from the given cursor and returns the next batch of results, the remaining number of results and a cursor to continue the query. /// /// # Errors /// /// Returns an error if the query execution fails. fn continue_query( cursor: Self::Cursor, - ) -> Result<(QueryOutputBatchBox, Option), Self::Error>; + ) -> Result<(QueryOutputBatchBox, u64, Option), Self::Error>; } /// An iterator over results of an iterable query. #[derive(Debug)] pub struct QueryIterator { current_batch_iter: vec::IntoIter, + remaining_items: u64, continue_cursor: Option, } @@ -73,12 +74,14 @@ where /// Returns an error if the type of the batch does not match the expected type `T`. pub fn new( first_batch: QueryOutputBatchBox, + remaining_items: u64, continue_cursor: Option, ) -> Result as TryFrom>::Error> { let batch: Vec = first_batch.try_into()?; Ok(Self { current_batch_iter: batch.into_iter(), + remaining_items, continue_cursor, }) } @@ -102,7 +105,7 @@ where let cursor = self.continue_cursor.take()?; // get a next batch from iroha - let (batch, cursor) = match E::continue_query(cursor) { + let (batch, remaining_items, cursor) = match E::continue_query(cursor) { Ok(r) => r, Err(e) => return Some(Err(e)), }; @@ -115,11 +118,23 @@ where .expect("BUG: iroha returned unexpected type in iterable query"); self.current_batch_iter = batch.into_iter(); + self.remaining_items = remaining_items; self.next() } } +impl ExactSizeIterator for QueryIterator +where + E: QueryExecutor, + Vec: TryFrom, + as TryFrom>::Error: core::fmt::Debug, +{ + fn len(&self) -> usize { + self.current_batch_iter.len() + self.remaining_items as usize + } +} + /// An error that can occur when constraining the number of results of an iterable query to one. #[derive( Debug, @@ -256,9 +271,10 @@ where }, }; - let (first_batch, continue_cursor) = self.query_executor.start_query(query)?; + let (first_batch, remaining_items, continue_cursor) = + self.query_executor.start_query(query)?; - let iterator = QueryIterator::::new(first_batch, continue_cursor) + let iterator = QueryIterator::::new(first_batch, remaining_items, continue_cursor) .expect( "INTERNAL BUG: iroha returned unexpected type in iterable query. Is there a schema mismatch?", ); diff --git a/data_model/src/query/mod.rs b/data_model/src/query/mod.rs index 0c0c6a6ce13..ae3776d84b5 100644 --- a/data_model/src/query/mod.rs +++ b/data_model/src/query/mod.rs @@ -166,6 +166,8 @@ mod model { pub struct QueryOutput { /// A single batch of results pub batch: QueryOutputBatchBox, + /// The number of items in the query remaining to be fetched after this batch + pub remaining_items: u64, /// If not `None`, contains a cursor that can be used to fetch the next batch of results. Otherwise the current batch is the last one. pub continue_cursor: Option, } @@ -308,16 +310,21 @@ impl SingularQuery for SingularQueryBox { impl QueryOutput { /// Create a new [`QueryOutput`] from the iroha response parts. - pub fn new(batch: QueryOutputBatchBox, continue_cursor: Option) -> Self { + pub fn new( + batch: QueryOutputBatchBox, + remaining_items: u64, + continue_cursor: Option, + ) -> Self { Self { batch, + remaining_items, continue_cursor, } } /// Split this [`QueryOutput`] into its constituent parts. - pub fn into_parts(self) -> (QueryOutputBatchBox, Option) { - (self.batch, self.continue_cursor) + pub fn into_parts(self) -> (QueryOutputBatchBox, u64, Option) { + (self.batch, self.remaining_items, self.continue_cursor) } } diff --git a/docs/source/references/schema.json b/docs/source/references/schema.json index fdb1c12995f..97e3038a873 100644 --- a/docs/source/references/schema.json +++ b/docs/source/references/schema.json @@ -3078,6 +3078,10 @@ "name": "batch", "type": "QueryOutputBatchBox" }, + { + "name": "remaining_items", + "type": "u64" + }, { "name": "continue_cursor", "type": "Option" diff --git a/smart_contract/src/lib.rs b/smart_contract/src/lib.rs index 52c3556bb6f..f0fc3d7ba6f 100644 --- a/smart_contract/src/lib.rs +++ b/smart_contract/src/lib.rs @@ -149,28 +149,36 @@ impl QueryExecutor for SmartContractQueryExecutor { fn start_query( &self, query: QueryWithParams, - ) -> Result<(QueryOutputBatchBox, Option), Self::Error> { + ) -> Result<(QueryOutputBatchBox, u64, Option), Self::Error> { let QueryResponse::Iterable(output) = execute_query(&QueryRequest::Start(query))? else { dbg_panic("BUG: iroha returned unexpected type in iterable query"); }; - let (batch, cursor) = output.into_parts(); + let (batch, remaining_items, cursor) = output.into_parts(); - Ok((batch, cursor.map(|cursor| QueryCursor { cursor }))) + Ok(( + batch, + remaining_items, + cursor.map(|cursor| QueryCursor { cursor }), + )) } fn continue_query( cursor: Self::Cursor, - ) -> Result<(QueryOutputBatchBox, Option), Self::Error> { + ) -> Result<(QueryOutputBatchBox, u64, Option), Self::Error> { let QueryResponse::Iterable(output) = execute_query(&QueryRequest::Continue(cursor.cursor))? else { dbg_panic("BUG: iroha returned unexpected type in iterable query"); }; - let (batch, cursor) = output.into_parts(); + let (batch, remaining_items, cursor) = output.into_parts(); - Ok((batch, cursor.map(|cursor| QueryCursor { cursor }))) + Ok(( + batch, + remaining_items, + cursor.map(|cursor| QueryCursor { cursor }), + )) } } diff --git a/wasm_samples/query_assets_and_save_cursor/src/lib.rs b/wasm_samples/query_assets_and_save_cursor/src/lib.rs index 8c719530978..e4a03191a6e 100644 --- a/wasm_samples/query_assets_and_save_cursor/src/lib.rs +++ b/wasm_samples/query_assets_and_save_cursor/src/lib.rs @@ -35,7 +35,7 @@ fn main(owner: AccountId) { pub cursor: ForwardCursor, } - let (_batch, cursor) = SmartContractQueryExecutor + let (_batch, _remaining_items, cursor) = SmartContractQueryExecutor .start_query(QueryWithParams::new( QueryWithFilter::new(FindAssets, CompoundPredicate::PASS).into(), QueryParams::new(