Skip to content

Commit

Permalink
[fix] #4057: Fix query store message ordering issue
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara committed Nov 15, 2023
1 parent e9c6fe4 commit 7cd2a33
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 26 deletions.
Binary file modified configs/peer/executor.wasm
Binary file not shown.
94 changes: 68 additions & 26 deletions core/src/query/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ impl LiveQueryStore {
const ALL_HANDLERS_DROPPED: &str =
"All handler to LiveQueryStore are dropped. Shutting down...";

let (insert_sender, mut insert_receiver) = mpsc::channel(1);
let (remove_sender, mut remove_receiver) = mpsc::channel::<(String, oneshot::Sender<_>)>(1);
let (message_sender, mut message_receiver) = mpsc::channel(1);

let mut idle_interval = tokio::time::interval(self.query_idle_time);

Expand All @@ -114,31 +113,29 @@ impl LiveQueryStore {
self.queries
.retain(|_, (_, last_access_time)| last_access_time.elapsed() <= self.query_idle_time);
},
to_insert = insert_receiver.recv() => {
let Some((query_id, live_query)) = to_insert else {
msg = message_receiver.recv() => {
let Some(msg) = msg else {
iroha_logger::info!("{ALL_HANDLERS_DROPPED}");
break;
};
self.insert(query_id, live_query)
}
to_remove = remove_receiver.recv() => {
let Some((query_id, response_sender)) = to_remove else {
iroha_logger::info!("{ALL_HANDLERS_DROPPED}");
break;
};
let live_query_opt = self.remove(&query_id);
let _ = response_sender.send(live_query_opt);

match msg {
Message::Insert(query_id, live_query) => {
self.insert(query_id, live_query)
}
Message::Remove(query_id, response_sender) => {
let live_query_opt = self.remove(&query_id);
let _ = response_sender.send(live_query_opt);
}
}
}
else => break,
}
tokio::task::yield_now().await;
}
});

LiveQueryStoreHandle {
insert_sender,
remove_sender,
}
LiveQueryStoreHandle { message_sender }
}

fn insert(&mut self, query_id: String, live_query: LiveQuery) {
Expand All @@ -150,14 +147,15 @@ impl LiveQueryStore {
}
}

enum Message {
Insert(String, Batched<Vec<Value>>),
Remove(String, oneshot::Sender<Option<Batched<Vec<Value>>>>),
}

/// Handle to interact with [`LiveQueryStore`].
#[derive(Clone)]
pub struct LiveQueryStoreHandle {
/// Sender to insert a new query with specified id.
insert_sender: mpsc::Sender<(String, LiveQuery)>,
/// Sender to send a tuple of query id and another sender, which will be
/// used by [`LiveQueryStore`] to write a response with optional live query.
remove_sender: mpsc::Sender<(String, oneshot::Sender<Option<LiveQuery>>)>,
message_sender: mpsc::Sender<Message>,
}

impl LiveQueryStoreHandle {
Expand Down Expand Up @@ -210,16 +208,16 @@ impl LiveQueryStoreHandle {
}

fn insert(&self, query_id: String, live_query: LiveQuery) -> Result<()> {
self.insert_sender
.blocking_send((query_id, live_query))
self.message_sender
.blocking_send(Message::Insert(query_id, live_query))
.map_err(|_| Error::ConnectionClosed)
}

fn remove(&self, query_id: String) -> Result<Option<LiveQuery>> {
let (sender, receiver) = oneshot::channel();

self.remove_sender
.blocking_send((query_id, sender))
self.message_sender
.blocking_send(Message::Remove(query_id, sender))
.or(Err(Error::ConnectionClosed))?;

receiver.blocking_recv().or(Err(Error::ConnectionClosed))
Expand Down Expand Up @@ -288,3 +286,47 @@ impl LiveQueryStoreHandle {
}
}
}

#[cfg(test)]
mod tests {
use std::num::NonZeroU32;

use super::*;

#[test]
fn query_message_order_preserved() {
let query_store = LiveQueryStore::test();
let threaded_rt = tokio::runtime::Runtime::new().unwrap();
let query_store_handle = threaded_rt.block_on(async { query_store.start() });

for i in 0..10_000 {
let pagination = Pagination::default();
let fetch_size = FetchSize {
fetch_size: NonZeroU32::new(1),
};
let sorting = Sorting::default();

let query_output = LazyValue::Iter(Box::new((0..100).map(|_| Value::Bool(false))));

let mut counter = 0;

let (batch, mut cursor) = query_store_handle
.handle_query_output(query_output, &sorting, pagination, fetch_size)
.unwrap()
.into();
let Value::Vec(v) = batch else { panic!("not expected result") };
counter += v.len();

while cursor.cursor.is_some() {
let Ok(batched) = query_store_handle.handle_query_cursor(cursor) else { break };
let (batch, new_cursor) = batched.into();
let Value::Vec(v) = batch else { panic!("not expected result") };
counter += v.len();

cursor = new_cursor;
}

assert_eq!(counter, 100, "failed on {i} iteration");
}
}
}

0 comments on commit 7cd2a33

Please sign in to comment.