Skip to content

Commit

Permalink
refactor(storage): remove StateStoreWrite and extract separate StateS…
Browse files Browse the repository at this point in the history
…toreReadLog trait (#20051)
  • Loading branch information
wenym1 authored Jan 8, 2025
1 parent 1ac4375 commit 52532c7
Show file tree
Hide file tree
Showing 12 changed files with 483 additions and 515 deletions.
6 changes: 3 additions & 3 deletions src/storage/src/hummock/iterator/change_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,10 +545,10 @@ mod tests {
use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableStore};
use crate::memory::MemoryStateStore;
use crate::store::{
ChangeLogValue, NewLocalOptions, OpConsistencyLevel, ReadLogOptions, StateStoreIter,
StateStoreRead, CHECK_BYTES_EQUAL,
ChangeLogValue, NewLocalOptions, OpConsistencyLevel, ReadLogOptions, StateStoreReadLog,
CHECK_BYTES_EQUAL,
};
use crate::StateStore;
use crate::{StateStore, StateStoreIter};

#[tokio::test]
async fn test_empty() {
Expand Down
7 changes: 5 additions & 2 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Drop for HummockStorageShutdownGuard {
}

/// `HummockStorage` is the entry point of the Hummock state store backend.
/// It implements the `StateStore` and `StateStoreRead` traits but not the `StateStoreWrite` trait
/// It implements the `StateStore` and `StateStoreRead` traits but without any write method
/// since all writes should be done via `LocalHummockStorage` to ensure the single writer property
/// of hummock. `LocalHummockStorage` instance can be created via `new_local` call.
/// Hummock is the state store backend.
Expand Down Expand Up @@ -589,7 +589,6 @@ impl HummockStorage {
}

impl StateStoreRead for HummockStorage {
type ChangeLogIter = ChangeLogIterator;
type Iter = HummockStorageIterator;
type RevIter = HummockStorageRevIterator;

Expand Down Expand Up @@ -635,6 +634,10 @@ impl StateStoreRead for HummockStorage {
);
self.rev_iter_inner(key_range, epoch, read_options)
}
}

impl StateStoreReadLog for HummockStorage {
type ChangeLogIter = ChangeLogIterator;

async fn iter_log(
&self,
Expand Down
16 changes: 0 additions & 16 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use super::version::{StagingData, VersionUpdate};
use crate::error::StorageResult;
use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
use crate::hummock::iterator::change_log::ChangeLogIterator;
use crate::hummock::iterator::{
Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion,
IteratorFactory, MergeIterator, UserIterator,
Expand Down Expand Up @@ -244,7 +243,6 @@ impl LocalHummockStorage {
}

impl StateStoreRead for LocalHummockStorage {
type ChangeLogIter = ChangeLogIterator;
type Iter = HummockStorageIterator;
type RevIter = HummockStorageRevIterator;

Expand Down Expand Up @@ -279,20 +277,6 @@ impl StateStoreRead for LocalHummockStorage {
self.rev_iter_flushed(key_range, epoch, read_options)
.instrument(tracing::trace_span!("hummock_rev_iter"))
}

async fn iter_log(
&self,
epoch_range: (u64, u64),
key_range: TableKeyRange,
options: ReadLogOptions,
) -> StorageResult<Self::ChangeLogIter> {
let version = self.read_version.read().committed().clone();
let iter = self
.hummock_version_reader
.iter_log(version, epoch_range, key_range, options)
.await?;
Ok(iter)
}
}

impl LocalStateStore for LocalHummockStorage {
Expand Down
91 changes: 89 additions & 2 deletions src/storage/src/hummock/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ use std::time::{Duration, Instant};

use bytes::Bytes;
use foyer::CacheHint;
use futures::{pin_mut, Stream, StreamExt};
use parking_lot::Mutex;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::config::StorageMemoryConfig;
use risingwave_expr::codegen::try_stream;
use risingwave_hummock_sdk::can_concat;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_hummock_sdk::key::{
Expand All @@ -35,12 +37,12 @@ use risingwave_hummock_sdk::sstable_info::SstableInfo;
use tokio::sync::oneshot::{channel, Receiver, Sender};

use super::{HummockError, HummockResult, SstableStoreRef};
use crate::error::StorageResult;
use crate::error::{StorageError, StorageResult};
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::CachePolicy;
use crate::mem_table::{KeyOp, MemTableError};
use crate::monitor::MemoryCollector;
use crate::store::{OpConsistencyLevel, ReadOptions, StateStoreRead};
use crate::store::{OpConsistencyLevel, ReadOptions, StateStoreKeyedRow, StateStoreRead};

pub fn range_overlap<R, B>(
search_key_range: &R,
Expand Down Expand Up @@ -716,6 +718,91 @@ impl MemoryCollector for HummockMemoryCollector {
}
}

#[try_stream(ok = StateStoreKeyedRow, error = StorageError)]
pub(crate) async fn merge_stream<'a>(
mem_table_iter: impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)> + 'a,
inner_stream: impl Stream<Item = StorageResult<StateStoreKeyedRow>> + 'static,
table_id: TableId,
epoch: u64,
rev: bool,
) {
let inner_stream = inner_stream.peekable();
pin_mut!(inner_stream);

let mut mem_table_iter = mem_table_iter.fuse().peekable();

loop {
match (inner_stream.as_mut().peek().await, mem_table_iter.peek()) {
(None, None) => break,
// The mem table side has come to an end, return data from the shared storage.
(Some(_), None) => {
let (key, value) = inner_stream.next().await.unwrap()?;
yield (key, value)
}
// The stream side has come to an end, return data from the mem table.
(None, Some(_)) => {
let (key, key_op) = mem_table_iter.next().unwrap();
match key_op {
KeyOp::Insert(value) | KeyOp::Update((_, value)) => {
yield (FullKey::new(table_id, key.clone(), epoch), value.clone())
}
_ => {}
}
}
(Some(Ok((inner_key, _))), Some((mem_table_key, _))) => {
debug_assert_eq!(inner_key.user_key.table_id, table_id);
let mut ret = inner_key.user_key.table_key.cmp(mem_table_key);
if rev {
ret = ret.reverse();
}
match ret {
Ordering::Less => {
// yield data from storage
let (key, value) = inner_stream.next().await.unwrap()?;
yield (key, value);
}
Ordering::Equal => {
// both memtable and storage contain the key, so we advance both
// iterators and return the data in memory.

let (_, key_op) = mem_table_iter.next().unwrap();
let (key, old_value_in_inner) = inner_stream.next().await.unwrap()?;
match key_op {
KeyOp::Insert(value) => {
yield (key.clone(), value.clone());
}
KeyOp::Delete(_) => {}
KeyOp::Update((old_value, new_value)) => {
debug_assert!(old_value == &old_value_in_inner);

yield (key, new_value.clone());
}
}
}
Ordering::Greater => {
// yield data from mem table
let (key, key_op) = mem_table_iter.next().unwrap();

match key_op {
KeyOp::Insert(value) => {
yield (FullKey::new(table_id, key.clone(), epoch), value.clone());
}
KeyOp::Delete(_) => {}
KeyOp::Update(_) => unreachable!(
"memtable update should always be paired with a storage key"
),
}
}
}
}
(Some(Err(_)), Some(_)) => {
// Throw the error.
return Err(inner_stream.next().await.unwrap().unwrap_err());
}
}
}
}

#[cfg(test)]
mod tests {
use std::future::{poll_fn, Future};
Expand Down
Loading

0 comments on commit 52532c7

Please sign in to comment.