From 52532c7ec6dabad2105146f873c2f95c9c8ea704 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Wed, 8 Jan 2025 16:01:53 +0800 Subject: [PATCH] refactor(storage): remove StateStoreWrite and extract separate StateStoreReadLog trait (#20051) --- .../src/hummock/iterator/change_log.rs | 6 +- .../src/hummock/store/hummock_storage.rs | 7 +- .../hummock/store/local_hummock_storage.rs | 16 - src/storage/src/hummock/utils.rs | 91 ++++- src/storage/src/mem_table.rs | 361 +----------------- src/storage/src/memory.rs | 284 +++++++++++++- src/storage/src/monitor/monitored_store.rs | 5 +- src/storage/src/monitor/traced_store.rs | 5 +- src/storage/src/panic_store.rs | 18 +- src/storage/src/store.rs | 75 ++-- src/storage/src/store_impl.rs | 64 ++-- .../log_store_impl/kv_log_store/reader.rs | 66 ++-- 12 files changed, 483 insertions(+), 515 deletions(-) diff --git a/src/storage/src/hummock/iterator/change_log.rs b/src/storage/src/hummock/iterator/change_log.rs index a10981c35fe16..0fd227020a527 100644 --- a/src/storage/src/hummock/iterator/change_log.rs +++ b/src/storage/src/hummock/iterator/change_log.rs @@ -545,10 +545,10 @@ mod tests { use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableStore}; use crate::memory::MemoryStateStore; use crate::store::{ - ChangeLogValue, NewLocalOptions, OpConsistencyLevel, ReadLogOptions, StateStoreIter, - StateStoreRead, CHECK_BYTES_EQUAL, + ChangeLogValue, NewLocalOptions, OpConsistencyLevel, ReadLogOptions, StateStoreReadLog, + CHECK_BYTES_EQUAL, }; - use crate::StateStore; + use crate::{StateStore, StateStoreIter}; #[tokio::test] async fn test_empty() { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 0caae17c6c204..07aff03b4e804 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -79,7 +79,7 @@ impl Drop for HummockStorageShutdownGuard { } /// `HummockStorage` is the entry point of the Hummock state store backend. -/// It implements the `StateStore` and `StateStoreRead` traits but not the `StateStoreWrite` trait +/// It implements the `StateStore` and `StateStoreRead` traits but without any write method /// since all writes should be done via `LocalHummockStorage` to ensure the single writer property /// of hummock. `LocalHummockStorage` instance can be created via `new_local` call. /// Hummock is the state store backend. @@ -589,7 +589,6 @@ impl HummockStorage { } impl StateStoreRead for HummockStorage { - type ChangeLogIter = ChangeLogIterator; type Iter = HummockStorageIterator; type RevIter = HummockStorageRevIterator; @@ -635,6 +634,10 @@ impl StateStoreRead for HummockStorage { ); self.rev_iter_inner(key_range, epoch, read_options) } +} + +impl StateStoreReadLog for HummockStorage { + type ChangeLogIter = ChangeLogIterator; async fn iter_log( &self, diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 989eb878b0798..69952838e9829 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -32,7 +32,6 @@ use super::version::{StagingData, VersionUpdate}; use crate::error::StorageResult; use crate::hummock::event_handler::hummock_event_handler::HummockEventSender; use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard}; -use crate::hummock::iterator::change_log::ChangeLogIterator; use crate::hummock::iterator::{ Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion, IteratorFactory, MergeIterator, UserIterator, @@ -244,7 +243,6 @@ impl LocalHummockStorage { } impl StateStoreRead for LocalHummockStorage { - type ChangeLogIter = ChangeLogIterator; type Iter = HummockStorageIterator; type RevIter = HummockStorageRevIterator; @@ -279,20 +277,6 @@ impl StateStoreRead for LocalHummockStorage { self.rev_iter_flushed(key_range, epoch, read_options) .instrument(tracing::trace_span!("hummock_rev_iter")) } - - async fn iter_log( - &self, - epoch_range: (u64, u64), - key_range: TableKeyRange, - options: ReadLogOptions, - ) -> StorageResult { - let version = self.read_version.read().committed().clone(); - let iter = self - .hummock_version_reader - .iter_log(version, epoch_range, key_range, options) - .await?; - Ok(iter) - } } impl LocalStateStore for LocalHummockStorage { diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 6f42260bcc9c1..7f30f63d83e19 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -23,9 +23,11 @@ use std::time::{Duration, Instant}; use bytes::Bytes; use foyer::CacheHint; +use futures::{pin_mut, Stream, StreamExt}; use parking_lot::Mutex; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::config::StorageMemoryConfig; +use risingwave_expr::codegen::try_stream; use risingwave_hummock_sdk::can_concat; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::key::{ @@ -35,12 +37,12 @@ use risingwave_hummock_sdk::sstable_info::SstableInfo; use tokio::sync::oneshot::{channel, Receiver, Sender}; use super::{HummockError, HummockResult, SstableStoreRef}; -use crate::error::StorageResult; +use crate::error::{StorageError, StorageResult}; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::CachePolicy; use crate::mem_table::{KeyOp, MemTableError}; use crate::monitor::MemoryCollector; -use crate::store::{OpConsistencyLevel, ReadOptions, StateStoreRead}; +use crate::store::{OpConsistencyLevel, ReadOptions, StateStoreKeyedRow, StateStoreRead}; pub fn range_overlap( search_key_range: &R, @@ -716,6 +718,91 @@ impl MemoryCollector for HummockMemoryCollector { } } +#[try_stream(ok = StateStoreKeyedRow, error = StorageError)] +pub(crate) async fn merge_stream<'a>( + mem_table_iter: impl Iterator, &'a KeyOp)> + 'a, + inner_stream: impl Stream> + 'static, + table_id: TableId, + epoch: u64, + rev: bool, +) { + let inner_stream = inner_stream.peekable(); + pin_mut!(inner_stream); + + let mut mem_table_iter = mem_table_iter.fuse().peekable(); + + loop { + match (inner_stream.as_mut().peek().await, mem_table_iter.peek()) { + (None, None) => break, + // The mem table side has come to an end, return data from the shared storage. + (Some(_), None) => { + let (key, value) = inner_stream.next().await.unwrap()?; + yield (key, value) + } + // The stream side has come to an end, return data from the mem table. + (None, Some(_)) => { + let (key, key_op) = mem_table_iter.next().unwrap(); + match key_op { + KeyOp::Insert(value) | KeyOp::Update((_, value)) => { + yield (FullKey::new(table_id, key.clone(), epoch), value.clone()) + } + _ => {} + } + } + (Some(Ok((inner_key, _))), Some((mem_table_key, _))) => { + debug_assert_eq!(inner_key.user_key.table_id, table_id); + let mut ret = inner_key.user_key.table_key.cmp(mem_table_key); + if rev { + ret = ret.reverse(); + } + match ret { + Ordering::Less => { + // yield data from storage + let (key, value) = inner_stream.next().await.unwrap()?; + yield (key, value); + } + Ordering::Equal => { + // both memtable and storage contain the key, so we advance both + // iterators and return the data in memory. + + let (_, key_op) = mem_table_iter.next().unwrap(); + let (key, old_value_in_inner) = inner_stream.next().await.unwrap()?; + match key_op { + KeyOp::Insert(value) => { + yield (key.clone(), value.clone()); + } + KeyOp::Delete(_) => {} + KeyOp::Update((old_value, new_value)) => { + debug_assert!(old_value == &old_value_in_inner); + + yield (key, new_value.clone()); + } + } + } + Ordering::Greater => { + // yield data from mem table + let (key, key_op) = mem_table_iter.next().unwrap(); + + match key_op { + KeyOp::Insert(value) => { + yield (FullKey::new(table_id, key.clone(), epoch), value.clone()); + } + KeyOp::Delete(_) => {} + KeyOp::Update(_) => unreachable!( + "memtable update should always be paired with a storage key" + ), + } + } + } + } + (Some(Err(_)), Some(_)) => { + // Throw the error. + return Err(inner_stream.next().await.unwrap().unwrap_err()); + } + } + } +} + #[cfg(test)] mod tests { use std::future::{poll_fn, Future}; diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 2694f1433c0fa..61807517a20a4 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -12,37 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp::Ordering; use std::collections::btree_map::Entry; use std::collections::BTreeMap; -use std::future::Future; -use std::ops::Bound::{Excluded, Included, Unbounded}; +use std::ops::Bound::{Included, Unbounded}; use std::ops::RangeBounds; -use std::sync::Arc; use bytes::Bytes; -use futures::{pin_mut, Stream, StreamExt}; -use futures_async_stream::try_stream; -use itertools::Itertools; -use risingwave_common::bitmap::Bitmap; -use risingwave_common::catalog::{TableId, TableOption}; -use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common_estimate_size::{EstimateSize, KvSize}; -use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, FullKey, TableKey, TableKeyRange}; -use risingwave_hummock_sdk::table_watermark::WatermarkDirection; +use risingwave_hummock_sdk::key::TableKey; use thiserror::Error; use thiserror_ext::AsReport; use tracing::error; -use crate::error::{StorageError, StorageResult}; use crate::hummock::iterator::{Backward, Forward, FromRustIterator, RustIteratorBuilder}; use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId}; -use crate::hummock::utils::{ - do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled, -}; +use crate::hummock::utils::sanity_check_enabled; use crate::hummock::value::HummockValue; use crate::row_serde::value_serde::ValueRowSerde; -use crate::storage_value::StorageValue; use crate::store::*; pub type ImmutableMemtable = SharedBufferBatch; @@ -427,347 +413,6 @@ impl KeyOp { } } -#[try_stream(ok = StateStoreKeyedRow, error = StorageError)] -pub(crate) async fn merge_stream<'a>( - mem_table_iter: impl Iterator, &'a KeyOp)> + 'a, - inner_stream: impl Stream> + 'static, - table_id: TableId, - epoch: u64, - rev: bool, -) { - let inner_stream = inner_stream.peekable(); - pin_mut!(inner_stream); - - let mut mem_table_iter = mem_table_iter.fuse().peekable(); - - loop { - match (inner_stream.as_mut().peek().await, mem_table_iter.peek()) { - (None, None) => break, - // The mem table side has come to an end, return data from the shared storage. - (Some(_), None) => { - let (key, value) = inner_stream.next().await.unwrap()?; - yield (key, value) - } - // The stream side has come to an end, return data from the mem table. - (None, Some(_)) => { - let (key, key_op) = mem_table_iter.next().unwrap(); - match key_op { - KeyOp::Insert(value) | KeyOp::Update((_, value)) => { - yield (FullKey::new(table_id, key.clone(), epoch), value.clone()) - } - _ => {} - } - } - (Some(Ok((inner_key, _))), Some((mem_table_key, _))) => { - debug_assert_eq!(inner_key.user_key.table_id, table_id); - let mut ret = inner_key.user_key.table_key.cmp(mem_table_key); - if rev { - ret = ret.reverse(); - } - match ret { - Ordering::Less => { - // yield data from storage - let (key, value) = inner_stream.next().await.unwrap()?; - yield (key, value); - } - Ordering::Equal => { - // both memtable and storage contain the key, so we advance both - // iterators and return the data in memory. - - let (_, key_op) = mem_table_iter.next().unwrap(); - let (key, old_value_in_inner) = inner_stream.next().await.unwrap()?; - match key_op { - KeyOp::Insert(value) => { - yield (key.clone(), value.clone()); - } - KeyOp::Delete(_) => {} - KeyOp::Update((old_value, new_value)) => { - debug_assert!(old_value == &old_value_in_inner); - - yield (key, new_value.clone()); - } - } - } - Ordering::Greater => { - // yield data from mem table - let (key, key_op) = mem_table_iter.next().unwrap(); - - match key_op { - KeyOp::Insert(value) => { - yield (FullKey::new(table_id, key.clone(), epoch), value.clone()); - } - KeyOp::Delete(_) => {} - KeyOp::Update(_) => unreachable!( - "memtable update should always be paired with a storage key" - ), - } - } - } - } - (Some(Err(_)), Some(_)) => { - // Throw the error. - return Err(inner_stream.next().await.unwrap().unwrap_err()); - } - } - } -} - -pub struct MemtableLocalStateStore { - mem_table: MemTable, - inner: S, - - epoch: Option, - - table_id: TableId, - op_consistency_level: OpConsistencyLevel, - table_option: TableOption, - vnodes: Arc, -} - -impl MemtableLocalStateStore { - pub fn new(inner: S, option: NewLocalOptions) -> Self { - Self { - inner, - mem_table: MemTable::new(option.op_consistency_level.clone()), - epoch: None, - table_id: option.table_id, - op_consistency_level: option.op_consistency_level, - table_option: option.table_option, - vnodes: option.vnodes, - } - } - - pub fn inner(&self) -> &S { - &self.inner - } -} - -impl LocalStateStore for MemtableLocalStateStore { - type Iter<'a> = impl StateStoreIter + 'a; - type RevIter<'a> = impl StateStoreIter + 'a; - - async fn get( - &self, - key: TableKey, - read_options: ReadOptions, - ) -> StorageResult> { - match self.mem_table.buffer.get(&key) { - None => self.inner.get(key, self.epoch(), read_options).await, - Some(op) => match op { - KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok(Some(value.clone())), - KeyOp::Delete(_) => Ok(None), - }, - } - } - - #[allow(clippy::manual_async_fn)] - fn iter( - &self, - key_range: TableKeyRange, - read_options: ReadOptions, - ) -> impl Future>> + Send + '_ { - async move { - let iter = self - .inner - .iter(key_range.clone(), self.epoch(), read_options) - .await?; - Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream( - self.mem_table.iter(key_range), - iter.into_stream(to_owned_item), - self.table_id, - self.epoch(), - false, - )))) - } - } - - #[allow(clippy::manual_async_fn)] - fn rev_iter( - &self, - key_range: TableKeyRange, - read_options: ReadOptions, - ) -> impl Future>> + Send + '_ { - async move { - let iter = self - .inner - .rev_iter(key_range.clone(), self.epoch(), read_options) - .await?; - Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream( - self.mem_table.rev_iter(key_range), - iter.into_stream(to_owned_item), - self.table_id, - self.epoch(), - true, - )))) - } - } - - fn insert( - &mut self, - key: TableKey, - new_val: Bytes, - old_val: Option, - ) -> StorageResult<()> { - match old_val { - None => self.mem_table.insert(key, new_val)?, - Some(old_val) => self.mem_table.update(key, old_val, new_val)?, - }; - Ok(()) - } - - fn delete(&mut self, key: TableKey, old_val: Bytes) -> StorageResult<()> { - Ok(self.mem_table.delete(key, old_val)?) - } - - async fn flush(&mut self) -> StorageResult { - let buffer = self.mem_table.drain().into_parts(); - let mut kv_pairs = Vec::with_capacity(buffer.len()); - for (key, key_op) in buffer { - match key_op { - // Currently, some executors do not strictly comply with these semantics. As - // a workaround you may call disable the check by initializing the - // state store with `op_consistency_level=Inconsistent`. - KeyOp::Insert(value) => { - if sanity_check_enabled() { - do_insert_sanity_check( - &key, - &value, - &self.inner, - self.epoch(), - self.table_id, - self.table_option, - &self.op_consistency_level, - ) - .await?; - } - kv_pairs.push((key, StorageValue::new_put(value))); - } - KeyOp::Delete(old_value) => { - if sanity_check_enabled() { - do_delete_sanity_check( - &key, - &old_value, - &self.inner, - self.epoch(), - self.table_id, - self.table_option, - &self.op_consistency_level, - ) - .await?; - } - kv_pairs.push((key, StorageValue::new_delete())); - } - KeyOp::Update((old_value, new_value)) => { - if sanity_check_enabled() { - do_update_sanity_check( - &key, - &old_value, - &new_value, - &self.inner, - self.epoch(), - self.table_id, - self.table_option, - &self.op_consistency_level, - ) - .await?; - } - kv_pairs.push((key, StorageValue::new_put(new_value))); - } - } - } - self.inner.ingest_batch( - kv_pairs, - vec![], - WriteOptions { - epoch: self.epoch(), - table_id: self.table_id, - }, - ) - } - - fn epoch(&self) -> u64 { - self.epoch.expect("should have set the epoch") - } - - fn is_dirty(&self) -> bool { - self.mem_table.is_dirty() - } - - #[allow(clippy::unused_async)] - async fn init(&mut self, options: InitOptions) -> StorageResult<()> { - assert!( - self.epoch.replace(options.epoch.curr).is_none(), - "epoch in local state store of table id {:?} is init for more than once", - self.table_id - ); - - Ok(()) - } - - fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { - assert!(!self.is_dirty()); - if let Some(value_checker) = opts.switch_op_consistency_level { - self.mem_table.op_consistency_level.update(&value_checker); - } - let prev_epoch = self - .epoch - .replace(next_epoch) - .expect("should have init epoch before seal the first epoch"); - assert!( - next_epoch > prev_epoch, - "new epoch {} should be greater than current epoch: {}", - next_epoch, - prev_epoch - ); - if let Some((direction, watermarks)) = opts.table_watermarks { - let delete_ranges = watermarks - .iter() - .flat_map(|vnode_watermark| { - let inner_range = match direction { - WatermarkDirection::Ascending => { - (Unbounded, Excluded(vnode_watermark.watermark().clone())) - } - WatermarkDirection::Descending => { - (Excluded(vnode_watermark.watermark().clone()), Unbounded) - } - }; - vnode_watermark - .vnode_bitmap() - .iter_vnodes() - .map(move |vnode| { - let (start, end) = - prefixed_range_with_vnode(inner_range.clone(), vnode); - (start.map(|key| key.0.clone()), end.map(|key| key.0.clone())) - }) - }) - .collect_vec(); - if let Err(e) = self.inner.ingest_batch( - Vec::new(), - delete_ranges, - WriteOptions { - epoch: self.epoch(), - table_id: self.table_id, - }, - ) { - error!(error = %e.as_report(), "failed to write delete ranges of table watermark"); - } - } - } - - async fn try_flush(&mut self) -> StorageResult<()> { - Ok(()) - } - - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { - std::mem::replace(&mut self.vnodes, vnodes) - } - - fn get_table_watermark(&self, _vnode: VirtualNode) -> Option { - // TODO: may store the written table watermark and have a correct implementation - None - } -} - #[cfg(test)] mod tests { use bytes::{BufMut, Bytes, BytesMut}; diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 5f4a631494113..bd388fc47222b 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -14,18 +14,31 @@ use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::future::Future; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::ops::{Bound, RangeBounds}; use std::sync::{Arc, LazyLock}; use bytes::Bytes; +use itertools::Itertools; use parking_lot::RwLock; -use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey}; +use risingwave_common::bitmap::Bitmap; +use risingwave_common::catalog::{TableId, TableOption}; +use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_hummock_sdk::key::{ + prefixed_range_with_vnode, FullKey, TableKey, TableKeyRange, UserKey, +}; +use risingwave_hummock_sdk::table_watermark::WatermarkDirection; use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch}; +use thiserror_ext::AsReport; +use tracing::error; use crate::error::StorageResult; -use crate::mem_table::MemtableLocalStateStore; +use crate::hummock::utils::{ + do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, merge_stream, + sanity_check_enabled, +}; +use crate::mem_table::{KeyOp, MemTable}; use crate::storage_value::StorageValue; use crate::store::*; @@ -603,7 +616,6 @@ impl RangeKvStateStore { } impl StateStoreRead for RangeKvStateStore { - type ChangeLogIter = RangeKvStateStoreChangeLogIter; type Iter = RangeKvStateStoreIter; type RevIter = RangeKvStateStoreRevIter; @@ -663,6 +675,10 @@ impl StateStoreRead for RangeKvStateStore { true, )) } +} + +impl StateStoreReadLog for RangeKvStateStore { + type ChangeLogIter = RangeKvStateStoreChangeLogIter; async fn iter_log( &self, @@ -692,8 +708,8 @@ impl StateStoreRead for RangeKvStateStore { } } -impl StateStoreWrite for RangeKvStateStore { - fn ingest_batch( +impl RangeKvStateStore { + pub(crate) fn ingest_batch( &self, mut kv_pairs: Vec<(TableKey, StorageValue)>, delete_ranges: Vec<(Bound, Bound)>, @@ -735,7 +751,7 @@ impl StateStoreWrite for RangeKvStateStore { } impl StateStore for RangeKvStateStore { - type Local = MemtableLocalStateStore; + type Local = RangeKvLocalStateStore; #[allow(clippy::unused_async)] async fn try_wait_epoch( @@ -748,7 +764,259 @@ impl StateStore for RangeKvStateStore { } async fn new_local(&self, option: NewLocalOptions) -> Self::Local { - MemtableLocalStateStore::new(self.clone(), option) + RangeKvLocalStateStore::new(self.clone(), option) + } +} + +pub struct RangeKvLocalStateStore { + mem_table: MemTable, + inner: RangeKvStateStore, + + epoch: Option, + + table_id: TableId, + op_consistency_level: OpConsistencyLevel, + table_option: TableOption, + vnodes: Arc, +} + +impl RangeKvLocalStateStore { + pub fn new(inner: RangeKvStateStore, option: NewLocalOptions) -> Self { + Self { + inner, + mem_table: MemTable::new(option.op_consistency_level.clone()), + epoch: None, + table_id: option.table_id, + op_consistency_level: option.op_consistency_level, + table_option: option.table_option, + vnodes: option.vnodes, + } + } +} + +impl LocalStateStore for RangeKvLocalStateStore { + type Iter<'a> = impl StateStoreIter + 'a; + type RevIter<'a> = impl StateStoreIter + 'a; + + async fn get( + &self, + key: TableKey, + read_options: ReadOptions, + ) -> StorageResult> { + match self.mem_table.buffer.get(&key) { + None => self.inner.get(key, self.epoch(), read_options).await, + Some(op) => match op { + KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok(Some(value.clone())), + KeyOp::Delete(_) => Ok(None), + }, + } + } + + #[allow(clippy::manual_async_fn)] + fn iter( + &self, + key_range: TableKeyRange, + read_options: ReadOptions, + ) -> impl Future>> + Send + '_ { + async move { + let iter = self + .inner + .iter(key_range.clone(), self.epoch(), read_options) + .await?; + Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream( + self.mem_table.iter(key_range), + iter.into_stream(to_owned_item), + self.table_id, + self.epoch(), + false, + )))) + } + } + + #[allow(clippy::manual_async_fn)] + fn rev_iter( + &self, + key_range: TableKeyRange, + read_options: ReadOptions, + ) -> impl Future>> + Send + '_ { + async move { + let iter = self + .inner + .rev_iter(key_range.clone(), self.epoch(), read_options) + .await?; + Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream( + self.mem_table.rev_iter(key_range), + iter.into_stream(to_owned_item), + self.table_id, + self.epoch(), + true, + )))) + } + } + + fn insert( + &mut self, + key: TableKey, + new_val: Bytes, + old_val: Option, + ) -> StorageResult<()> { + match old_val { + None => self.mem_table.insert(key, new_val)?, + Some(old_val) => self.mem_table.update(key, old_val, new_val)?, + }; + Ok(()) + } + + fn delete(&mut self, key: TableKey, old_val: Bytes) -> StorageResult<()> { + Ok(self.mem_table.delete(key, old_val)?) + } + + async fn flush(&mut self) -> StorageResult { + let buffer = self.mem_table.drain().into_parts(); + let mut kv_pairs = Vec::with_capacity(buffer.len()); + for (key, key_op) in buffer { + match key_op { + // Currently, some executors do not strictly comply with these semantics. As + // a workaround you may call disable the check by initializing the + // state store with `op_consistency_level=Inconsistent`. + KeyOp::Insert(value) => { + if sanity_check_enabled() { + do_insert_sanity_check( + &key, + &value, + &self.inner, + self.epoch(), + self.table_id, + self.table_option, + &self.op_consistency_level, + ) + .await?; + } + kv_pairs.push((key, StorageValue::new_put(value))); + } + KeyOp::Delete(old_value) => { + if sanity_check_enabled() { + do_delete_sanity_check( + &key, + &old_value, + &self.inner, + self.epoch(), + self.table_id, + self.table_option, + &self.op_consistency_level, + ) + .await?; + } + kv_pairs.push((key, StorageValue::new_delete())); + } + KeyOp::Update((old_value, new_value)) => { + if sanity_check_enabled() { + do_update_sanity_check( + &key, + &old_value, + &new_value, + &self.inner, + self.epoch(), + self.table_id, + self.table_option, + &self.op_consistency_level, + ) + .await?; + } + kv_pairs.push((key, StorageValue::new_put(new_value))); + } + } + } + self.inner.ingest_batch( + kv_pairs, + vec![], + WriteOptions { + epoch: self.epoch(), + table_id: self.table_id, + }, + ) + } + + fn epoch(&self) -> u64 { + self.epoch.expect("should have set the epoch") + } + + fn is_dirty(&self) -> bool { + self.mem_table.is_dirty() + } + + #[allow(clippy::unused_async)] + async fn init(&mut self, options: InitOptions) -> StorageResult<()> { + assert!( + self.epoch.replace(options.epoch.curr).is_none(), + "epoch in local state store of table id {:?} is init for more than once", + self.table_id + ); + + Ok(()) + } + + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { + assert!(!self.is_dirty()); + if let Some(value_checker) = opts.switch_op_consistency_level { + self.mem_table.op_consistency_level.update(&value_checker); + } + let prev_epoch = self + .epoch + .replace(next_epoch) + .expect("should have init epoch before seal the first epoch"); + assert!( + next_epoch > prev_epoch, + "new epoch {} should be greater than current epoch: {}", + next_epoch, + prev_epoch + ); + if let Some((direction, watermarks)) = opts.table_watermarks { + let delete_ranges = watermarks + .iter() + .flat_map(|vnode_watermark| { + let inner_range = match direction { + WatermarkDirection::Ascending => { + (Unbounded, Excluded(vnode_watermark.watermark().clone())) + } + WatermarkDirection::Descending => { + (Excluded(vnode_watermark.watermark().clone()), Unbounded) + } + }; + vnode_watermark + .vnode_bitmap() + .iter_vnodes() + .map(move |vnode| { + let (start, end) = + prefixed_range_with_vnode(inner_range.clone(), vnode); + (start.map(|key| key.0.clone()), end.map(|key| key.0.clone())) + }) + }) + .collect_vec(); + if let Err(e) = self.inner.ingest_batch( + Vec::new(), + delete_ranges, + WriteOptions { + epoch: self.epoch(), + table_id: self.table_id, + }, + ) { + error!(error = %e.as_report(), "failed to write delete ranges of table watermark"); + } + } + } + + async fn try_flush(&mut self) -> StorageResult<()> { + Ok(()) + } + + fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { + std::mem::replace(&mut self.vnodes, vnodes) + } + + fn get_table_watermark(&self, _vnode: VirtualNode) -> Option { + // TODO: may store the written table watermark and have a correct implementation + None } } diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 3e795f72ab47a..96a225cc22635 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -175,7 +175,6 @@ impl MonitoredStateStore { } impl StateStoreRead for MonitoredStateStore { - type ChangeLogIter = impl StateStoreReadChangeLogIter; type Iter = impl StateStoreReadIter; type RevIter = impl StateStoreReadIter; @@ -217,6 +216,10 @@ impl StateStoreRead for MonitoredStateStore { self.inner.rev_iter(key_range, epoch, read_options), ) } +} + +impl StateStoreReadLog for MonitoredStateStore { + type ChangeLogIter = impl StateStoreReadChangeLogIter; fn iter_log( &self, diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index 4fa24d425953b..fc4aecc6c4054 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -285,7 +285,6 @@ impl StateStore for TracedStateStore { } impl StateStoreRead for TracedStateStore { - type ChangeLogIter = impl StateStoreReadChangeLogIter; type Iter = impl StateStoreReadIter; type RevIter = impl StateStoreReadIter; @@ -336,6 +335,10 @@ impl StateStoreRead for TracedStateStore { ); self.traced_iter(self.inner.rev_iter(key_range, epoch, read_options), span) } +} + +impl StateStoreReadLog for TracedStateStore { + type ChangeLogIter = impl StateStoreReadChangeLogIter; fn iter_log( &self, diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 1ca087696cc6c..1f04281a38a11 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::marker::PhantomData; -use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; @@ -23,7 +22,6 @@ use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; use crate::error::StorageResult; -use crate::storage_value::StorageValue; use crate::store::*; /// A panic state store. If a workload is fully in-memory, we can use this state store to @@ -32,7 +30,6 @@ use crate::store::*; pub struct PanicStateStore; impl StateStoreRead for PanicStateStore { - type ChangeLogIter = PanicStateStoreIter; type Iter = PanicStateStoreIter; type RevIter = PanicStateStoreIter; @@ -65,6 +62,10 @@ impl StateStoreRead for PanicStateStore { ) -> StorageResult { panic!("should not read from the state store!"); } +} + +impl StateStoreReadLog for PanicStateStore { + type ChangeLogIter = PanicStateStoreIter; async fn iter_log( &self, @@ -76,17 +77,6 @@ impl StateStoreRead for PanicStateStore { } } -impl StateStoreWrite for PanicStateStore { - fn ingest_batch( - &self, - _kv_pairs: Vec<(TableKey, StorageValue)>, - _delete_ranges: Vec<(Bound, Bound)>, - _write_options: WriteOptions, - ) -> StorageResult { - panic!("should not write to the state store!"); - } -} - impl LocalStateStore for PanicStateStore { type Iter<'a> = PanicStateStoreIter; type RevIter<'a> = PanicStateStoreIter; diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index da8fdf206ad5d..531ca629d4d65 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -17,7 +17,6 @@ use std::default::Default; use std::fmt::{Debug, Formatter}; use std::future::Future; use std::marker::PhantomData; -use std::ops::Bound; use std::sync::{Arc, LazyLock}; use bytes::Bytes; @@ -42,7 +41,6 @@ use risingwave_pb::hummock::PbVnodeWatermark; use crate::error::{StorageError, StorageResult}; use crate::hummock::CachePolicy; use crate::monitor::{MonitoredStateStore, MonitoredStorageMetrics}; -use crate::storage_value::StorageValue; pub trait StaticSendSync = Send + Sync + 'static; @@ -59,9 +57,7 @@ impl IterItem for StateStoreReadLogItem { } pub trait StateStoreIter: Send { - fn try_next( - &mut self, - ) -> impl Future>>> + Send + '_; + fn try_next(&mut self) -> impl StorageFuture<'_, Option>>; } pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult { @@ -238,11 +234,22 @@ pub struct ReadLogOptions { } pub trait StateStoreReadChangeLogIter = StateStoreIter + Send + 'static; +pub trait StorageFuture<'a, T> = Future> + Send + 'a; + +pub trait StateStoreReadLog: StaticSendSync { + type ChangeLogIter: StateStoreReadChangeLogIter; + + fn iter_log( + &self, + epoch_range: (u64, u64), + key_range: TableKeyRange, + options: ReadLogOptions, + ) -> impl StorageFuture<'_, Self::ChangeLogIter>; +} pub trait StateStoreRead: StaticSendSync { type Iter: StateStoreReadIter; type RevIter: StateStoreReadIter; - type ChangeLogIter: StateStoreReadChangeLogIter; /// Point gets a value from the state store. /// The result is based on a snapshot corresponding to the given `epoch`. @@ -252,7 +259,7 @@ pub trait StateStoreRead: StaticSendSync { key: TableKey, epoch: u64, read_options: ReadOptions, - ) -> impl Future>> + Send + '_; + ) -> impl StorageFuture<'_, Option>; /// Point gets a value from the state store. /// The result is based on a snapshot corresponding to the given `epoch`. @@ -262,7 +269,7 @@ pub trait StateStoreRead: StaticSendSync { key: TableKey, epoch: u64, read_options: ReadOptions, - ) -> impl Future>> + Send + '_ { + ) -> impl StorageFuture<'_, Option> { self.get_keyed_row(key, epoch, read_options) .map_ok(|v| v.map(|(_, v)| v)) } @@ -277,21 +284,14 @@ pub trait StateStoreRead: StaticSendSync { key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - ) -> impl Future> + Send + '_; + ) -> impl StorageFuture<'_, Self::Iter>; fn rev_iter( &self, key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - ) -> impl Future> + Send + '_; - - fn iter_log( - &self, - epoch_range: (u64, u64), - key_range: TableKeyRange, - options: ReadLogOptions, - ) -> impl Future> + Send + '_; + ) -> impl StorageFuture<'_, Self::RevIter>; } pub trait StateStoreReadExt: StaticSendSync { @@ -308,7 +308,7 @@ pub trait StateStoreReadExt: StaticSendSync { epoch: u64, limit: Option, read_options: ReadOptions, - ) -> impl Future>> + Send + '_; + ) -> impl StorageFuture<'_, Vec>; } impl StateStoreReadExt for S { @@ -333,29 +333,6 @@ impl StateStoreReadExt for S { } } -pub trait StateStoreWrite: StaticSendSync { - /// Writes a batch to storage. The batch should be: - /// * Ordered. KV pairs will be directly written to the table, so it must be ordered. - /// * Locally unique. There should not be two or more operations on the same key in one write - /// batch. - /// - /// Ingests a batch of data into the state store. One write batch should never contain operation - /// on the same key. e.g. Put(233, x) then Delete(233). - /// An epoch should be provided to ingest a write batch. It is served as: - /// - A handle to represent an atomic write session. All ingested write batches associated with - /// the same `Epoch` have the all-or-nothing semantics, meaning that partial changes are not - /// queryable and will be rolled back if instructed. - /// - A version of a kv pair. kv pair associated with larger `Epoch` is guaranteed to be newer - /// then kv pair with smaller `Epoch`. Currently this version is only used to derive the - /// per-key modification history (e.g. in compaction), not across different keys. - fn ingest_batch( - &self, - kv_pairs: Vec<(TableKey, StorageValue)>, - delete_ranges: Vec<(Bound, Bound)>, - write_options: WriteOptions, - ) -> StorageResult; -} - #[derive(Clone)] pub struct TryWaitEpochOptions { pub table_id: TableId, @@ -384,7 +361,7 @@ impl From for TracedTryWaitEpochOptions { } } -pub trait StateStore: StateStoreRead + StaticSendSync + Clone { +pub trait StateStore: StateStoreRead + StateStoreReadLog + StaticSendSync + Clone { type Local: LocalStateStore; /// If epoch is `Committed`, we will wait until the epoch is committed and its data is ready to @@ -393,7 +370,7 @@ pub trait StateStore: StateStoreRead + StaticSendSync + Clone { &self, epoch: HummockReadEpoch, options: TryWaitEpochOptions, - ) -> impl Future> + Send + '_; + ) -> impl StorageFuture<'_, ()>; /// Creates a [`MonitoredStateStore`] from this state store, with given `stats`. fn monitored(self, storage_metrics: Arc) -> MonitoredStateStore { @@ -416,7 +393,7 @@ pub trait LocalStateStore: StaticSendSync { &self, key: TableKey, read_options: ReadOptions, - ) -> impl Future>> + Send + '_; + ) -> impl StorageFuture<'_, Option>; /// Opens and returns an iterator for given `prefix_hint` and `full_key_range` /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and @@ -427,13 +404,13 @@ pub trait LocalStateStore: StaticSendSync { &self, key_range: TableKeyRange, read_options: ReadOptions, - ) -> impl Future>> + Send + '_; + ) -> impl StorageFuture<'_, Self::Iter<'_>>; fn rev_iter( &self, key_range: TableKeyRange, read_options: ReadOptions, - ) -> impl Future>> + Send + '_; + ) -> impl StorageFuture<'_, Self::RevIter<'_>>; /// Get last persisted watermark for a given vnode. fn get_table_watermark(&self, vnode: VirtualNode) -> Option; @@ -450,9 +427,9 @@ pub trait LocalStateStore: StaticSendSync { /// than the given `epoch` will be deleted. fn delete(&mut self, key: TableKey, old_val: Bytes) -> StorageResult<()>; - fn flush(&mut self) -> impl Future> + Send + '_; + fn flush(&mut self) -> impl StorageFuture<'_, usize>; - fn try_flush(&mut self) -> impl Future> + Send + '_; + fn try_flush(&mut self) -> impl StorageFuture<'_, ()>; fn epoch(&self) -> u64; fn is_dirty(&self) -> bool; @@ -463,7 +440,7 @@ pub trait LocalStateStore: StaticSendSync { /// In some cases like replicated state table, state table may not be empty initially, /// as such we need to wait for `epoch.prev` checkpoint to complete, /// hence this interface is made async. - fn init(&mut self, opts: InitOptions) -> impl Future> + Send + '_; + fn init(&mut self, opts: InitOptions) -> impl StorageFuture<'_, ()>; /// Updates the monotonically increasing write epoch to `new_epoch`. /// All writes after this function is called will be tagged with `new_epoch`. In other words, diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 87883911c58e2..6488e9c3f23b6 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -228,7 +228,7 @@ pub mod verify { use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; - use std::ops::{Bound, Deref}; + use std::ops::Deref; use std::sync::Arc; use bytes::Bytes; @@ -240,7 +240,6 @@ pub mod verify { use crate::error::StorageResult; use crate::hummock::HummockStorage; - use crate::storage_value::StorageValue; use crate::store::*; use crate::store_impl::AsHummock; @@ -263,6 +262,7 @@ pub mod verify { } } + #[derive(Clone)] pub struct VerifyStateStore { pub actual: A, pub expected: Option, @@ -276,7 +276,6 @@ pub mod verify { } impl StateStoreRead for VerifyStateStore { - type ChangeLogIter = impl StateStoreReadChangeLogIter; type Iter = impl StateStoreReadIter; type RevIter = impl StateStoreReadIter; @@ -342,6 +341,10 @@ pub mod verify { Ok(verify_iter::(actual, expected)) } } + } + + impl StateStoreReadLog for VerifyStateStore { + type ChangeLogIter = impl StateStoreReadChangeLogIter; async fn iter_log( &self, @@ -392,36 +395,6 @@ pub mod verify { } } - impl StateStoreWrite for VerifyStateStore { - fn ingest_batch( - &self, - kv_pairs: Vec<(TableKey, StorageValue)>, - delete_ranges: Vec<(Bound, Bound)>, - write_options: WriteOptions, - ) -> StorageResult { - let actual = self.actual.ingest_batch( - kv_pairs.clone(), - delete_ranges.clone(), - write_options.clone(), - ); - if let Some(expected) = &self.expected { - let expected = expected.ingest_batch(kv_pairs, delete_ranges, write_options); - assert_eq!(actual.is_err(), expected.is_err()); - } - actual - } - } - - impl Clone for VerifyStateStore { - fn clone(&self) -> Self { - Self { - actual: self.actual.clone(), - expected: self.expected.clone(), - _phantom: PhantomData, - } - } - } - impl LocalStateStore for VerifyStateStore { type Iter<'a> = impl StateStoreIter + 'a; type RevIter<'a> = impl StateStoreIter + 'a; @@ -806,7 +779,7 @@ impl AsHummock for SledStateStore { } #[cfg(debug_assertions)] -pub mod boxed_state_store { +mod boxed_state_store { use std::future::Future; use std::ops::{Deref, DerefMut}; use std::sync::Arc; @@ -873,7 +846,10 @@ pub mod boxed_state_store { epoch: u64, read_options: ReadOptions, ) -> StorageResult; + } + #[async_trait::async_trait] + pub trait DynamicDispatchedStateStoreReadLog: StaticSendSync { async fn iter_log( &self, epoch_range: (u64, u64), @@ -912,7 +888,10 @@ pub mod boxed_state_store { self.rev_iter(key_range, epoch, read_options).await?, )) } + } + #[async_trait::async_trait] + impl DynamicDispatchedStateStoreReadLog for S { async fn iter_log( &self, epoch_range: (u64, u64), @@ -1159,7 +1138,6 @@ pub mod boxed_state_store { pub type BoxDynamicDispatchedStateStore = Box; impl StateStoreRead for BoxDynamicDispatchedStateStore { - type ChangeLogIter = BoxStateStoreReadChangeLogIter; type Iter = BoxStateStoreReadIter; type RevIter = BoxStateStoreReadIter; @@ -1189,6 +1167,10 @@ pub mod boxed_state_store { ) -> impl Future> + '_ { self.deref().rev_iter(key_range, epoch, read_options) } + } + + impl StateStoreReadLog for BoxDynamicDispatchedStateStore { + type ChangeLogIter = BoxStateStoreReadChangeLogIter; fn iter_log( &self, @@ -1201,7 +1183,11 @@ pub mod boxed_state_store { } pub trait DynamicDispatchedStateStore: - DynClone + DynamicDispatchedStateStoreRead + DynamicDispatchedStateStoreExt + AsHummock + DynClone + + DynamicDispatchedStateStoreRead + + DynamicDispatchedStateStoreReadLog + + DynamicDispatchedStateStoreExt + + AsHummock { } @@ -1214,7 +1200,11 @@ pub mod boxed_state_store { } impl< - S: DynClone + DynamicDispatchedStateStoreRead + DynamicDispatchedStateStoreExt + AsHummock, + S: DynClone + + DynamicDispatchedStateStoreRead + + DynamicDispatchedStateStoreReadLog + + DynamicDispatchedStateStoreExt + + AsHummock, > DynamicDispatchedStateStore for S { } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index c27cf711a3ad0..20c8e7fbcb576 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -674,54 +674,65 @@ impl LogReader for KvLogStoreReader { #[cfg(test)] mod tests { - use std::ops::Bound::Unbounded; + use std::collections::{Bound, HashSet}; use bytes::Bytes; use itertools::Itertools; - use risingwave_common::util::epoch::test_epoch; - use risingwave_hummock_sdk::key::TableKey; + use risingwave_common::hash::VirtualNode; + use risingwave_common::util::epoch::{test_epoch, EpochExt}; + use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, KeyPayloadType, TableKey}; + use risingwave_hummock_test::local_state_store_test_utils::LocalStateStoreTestExt; + use risingwave_hummock_test::test_utils::prepare_hummock_test_env; use risingwave_storage::hummock::iterator::test_utils::{ iterator_test_table_key_of, iterator_test_value_of, }; - use risingwave_storage::memory::MemoryStateStore; - use risingwave_storage::storage_value::StorageValue; - use risingwave_storage::store::{ReadOptions, StateStoreRead, StateStoreWrite, WriteOptions}; - use risingwave_storage::StateStoreIter; + use risingwave_storage::store::{ + LocalStateStore, NewLocalOptions, ReadOptions, SealCurrentEpochOptions, StateStoreRead, + }; + use risingwave_storage::{StateStore, StateStoreIter}; use crate::common::log_store_impl::kv_log_store::reader::AutoRebuildStateStoreReadIter; use crate::common::log_store_impl::kv_log_store::test_utils::TEST_TABLE_ID; #[tokio::test] async fn test_auto_rebuild_iter() { - let state_store = MemoryStateStore::new(); + let test_env = prepare_hummock_test_env().await; + test_env.register_table_id(TEST_TABLE_ID).await; + let mut state_store = test_env + .storage + .new_local(NewLocalOptions::for_test(TEST_TABLE_ID)) + .await; + let epoch = test_epoch(1); + test_env + .storage + .start_epoch(epoch, HashSet::from_iter([TEST_TABLE_ID])); + state_store.init_for_test(epoch).await.unwrap(); let key_count = 100; let pairs = (0..key_count) .map(|i| { let key = iterator_test_table_key_of(i); let value = iterator_test_value_of(i); - (TableKey(Bytes::from(key)), StorageValue::new_put(value)) + (TableKey(Bytes::from(key)), Bytes::from(value)) }) .collect_vec(); - let epoch = test_epoch(1); - state_store - .ingest_batch( - pairs.clone(), - vec![], - WriteOptions { - epoch, - table_id: TEST_TABLE_ID, - }, - ) - .unwrap(); + for (key, value) in &pairs { + state_store + .insert(key.clone(), value.clone(), None) + .unwrap(); + } + state_store.flush().await.unwrap(); + state_store.seal_current_epoch(epoch.next_epoch(), SealCurrentEpochOptions::for_test()); + test_env.commit_epoch(epoch).await; + let state_store = test_env.storage.clone(); async fn validate( - mut kv_iter: impl Iterator, StorageValue)>, + mut kv_iter: impl Iterator, Bytes)>, mut iter: impl StateStoreIter, ) { while let Some((key, value)) = iter.try_next().await.unwrap() { let (k, v) = kv_iter.next().unwrap(); assert_eq!(key.user_key.table_key, k.to_ref()); - assert_eq!(v.user_value.as_deref(), Some(value)); + assert_eq!(v.as_ref(), value); } assert!(kv_iter.next().is_none()); } @@ -730,10 +741,17 @@ mod tests { table_id: TEST_TABLE_ID, ..Default::default() }; + let key_range = prefixed_range_with_vnode( + ( + Bound::::Unbounded, + Bound::::Unbounded, + ), + VirtualNode::ZERO, + ); let kv_iter = pairs.clone().into_iter(); let iter = state_store - .iter((Unbounded, Unbounded), epoch, read_options.clone()) + .iter(key_range.clone(), epoch, read_options.clone()) .await .unwrap(); validate(kv_iter, iter).await; @@ -755,7 +773,7 @@ mod tests { false } }, - (Unbounded, Unbounded), + key_range.clone(), epoch, read_options, )