diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index 5f9664b64a011..bb47248686129 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -179,7 +179,7 @@ impl HummockManager { .change_log_delta .values() .flat_map(|change_log| { - let new_log = change_log.new_log.as_ref().unwrap(); + let new_log = &change_log.new_log; new_log .new_value .iter() diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index 5a7bf0143c764..5a5c4a647ecbe 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -190,11 +190,11 @@ pub fn build_table_change_log_delta<'a>( TableId::new(table_id), ChangeLogDelta { truncate_epoch, - new_log: Some(EpochNewChangeLog { + new_log: EpochNewChangeLog { new_value: vec![], old_value: vec![], epochs: epochs.clone(), - }), + }, }, ) }) @@ -203,7 +203,7 @@ pub fn build_table_change_log_delta<'a>( for table_id in &sst.table_ids { match table_change_log.get_mut(&TableId::new(*table_id)) { Some(log) => { - log.new_log.as_mut().unwrap().old_value.push(sst.clone()); + log.new_log.old_value.push(sst.clone()); } None => { warn!(table_id, ?sst, "old value sst contains non-log-store table"); @@ -214,7 +214,7 @@ pub fn build_table_change_log_delta<'a>( for sst in new_value_ssts { for table_id in &sst.table_ids { if let Some(log) = table_change_log.get_mut(&TableId::new(*table_id)) { - log.new_log.as_mut().unwrap().new_value.push(sst.clone()); + log.new_log.new_value.push(sst.clone()); } } } @@ -224,7 +224,7 @@ pub fn build_table_change_log_delta<'a>( #[derive(Debug, PartialEq, Clone)] pub struct ChangeLogDeltaCommon { pub truncate_epoch: u64, - pub new_log: Option>, + pub new_log: EpochNewChangeLogCommon, } pub type ChangeLogDelta = ChangeLogDeltaCommon; @@ -236,7 +236,7 @@ where fn from(val: &ChangeLogDeltaCommon) -> Self { Self { truncate_epoch: val.truncate_epoch, - new_log: val.new_log.as_ref().map(|a| a.into()), + new_log: Some((&val.new_log).into()), } } } @@ -248,7 +248,7 @@ where fn from(val: &PbChangeLogDelta) -> Self { Self { truncate_epoch: val.truncate_epoch, - new_log: val.new_log.as_ref().map(|a| a.into()), + new_log: val.new_log.as_ref().unwrap().into(), } } } @@ -260,7 +260,7 @@ where fn from(val: ChangeLogDeltaCommon) -> Self { Self { truncate_epoch: val.truncate_epoch, - new_log: val.new_log.map(|a| a.into()), + new_log: Some(val.new_log.into()), } } } @@ -272,7 +272,7 @@ where fn from(val: PbChangeLogDelta) -> Self { Self { truncate_epoch: val.truncate_epoch, - new_log: val.new_log.map(|a| a.into()), + new_log: val.new_log.unwrap().into(), } } } diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index ee316f75ffd65..eb4bb30e69dc3 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -693,7 +693,7 @@ impl HummockVersion { changed_table_info: &HashMap>, ) { for (table_id, change_log_delta) in change_log_delta { - let new_change_log = change_log_delta.new_log.as_ref().unwrap(); + let new_change_log = &change_log_delta.new_log; match table_change_log.entry(*table_id) { Entry::Occupied(entry) => { let change_log = entry.into_mut(); diff --git a/src/storage/hummock_sdk/src/frontend_version.rs b/src/storage/hummock_sdk/src/frontend_version.rs index 4840b402a292b..11eb045efbe22 100644 --- a/src/storage/hummock_sdk/src/frontend_version.rs +++ b/src/storage/hummock_sdk/src/frontend_version.rs @@ -151,14 +151,12 @@ impl FrontendHummockVersionDelta { *table_id, ChangeLogDeltaCommon { truncate_epoch: change_log_delta.truncate_epoch, - new_log: change_log_delta.new_log.as_ref().map(|new_log| { - EpochNewChangeLogCommon { - // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()` - new_value: vec![(); new_log.new_value.len()], - old_value: vec![(); new_log.old_value.len()], - epochs: new_log.epochs.clone(), - } - }), + new_log: EpochNewChangeLogCommon { + // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()` + new_value: vec![(); change_log_delta.new_log.new_value.len()], + old_value: vec![(); change_log_delta.new_log.old_value.len()], + epochs: change_log_delta.new_log.epochs.clone(), + }, }, ) }) @@ -187,11 +185,17 @@ impl FrontendHummockVersionDelta { ( table_id.table_id, PbChangeLogDelta { - new_log: delta.new_log.as_ref().map(|new_log| PbEpochNewChangeLog { + new_log: Some(PbEpochNewChangeLog { // Here we need to determine if value is null but don't care what the value is, so we fill him in using `PbSstableInfo::default()` - old_value: vec![PbSstableInfo::default(); new_log.old_value.len()], - new_value: vec![PbSstableInfo::default(); new_log.new_value.len()], - epochs: new_log.epochs.clone(), + old_value: vec![ + PbSstableInfo::default(); + delta.new_log.old_value.len() + ], + new_value: vec![ + PbSstableInfo::default(); + delta.new_log.new_value.len() + ], + epochs: delta.new_log.epochs.clone(), }), truncate_epoch: delta.truncate_epoch, }, @@ -228,14 +232,18 @@ impl FrontendHummockVersionDelta { TableId::new(*table_id), ChangeLogDeltaCommon { truncate_epoch: change_log_delta.truncate_epoch, - new_log: change_log_delta.new_log.as_ref().map(|new_log| { - EpochNewChangeLogCommon { - // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()` - new_value: vec![(); new_log.new_value.len()], - old_value: vec![(); new_log.old_value.len()], - epochs: new_log.epochs.clone(), - } - }), + new_log: change_log_delta + .new_log + .as_ref() + .map(|new_log| { + EpochNewChangeLogCommon { + // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()` + new_value: vec![(); new_log.new_value.len()], + old_value: vec![(); new_log.old_value.len()], + epochs: new_log.epochs.clone(), + } + }) + .unwrap(), }, ) }) diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index 0a651260b3789..9f628808909ff 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -191,15 +191,15 @@ impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHummockV } debug_assert!(log_delta .new_log - .as_ref() - .map(|d| { - d.new_value.iter().chain(d.old_value.iter()).all(|s| { - s.table_ids - .iter() - .any(|tid| time_travel_table_ids.contains(tid)) - }) - }) - .unwrap_or(true)); + .new_value + .iter() + .chain(log_delta.new_log.old_value.iter()) + .all(|s| { + s.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + })); + Some((*table_id, PbChangeLogDelta::from(log_delta).into())) }) .collect(), diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 09e96860cc839..4c90e6cae47f1 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -570,7 +570,7 @@ where }) .chain(self.change_log_delta.values().flat_map(|delta| { // TODO: optimization: strip table change log - let new_log = delta.new_log.as_ref().unwrap(); + let new_log = &delta.new_log; new_log.new_value.iter().chain(new_log.old_value.iter()) })) } @@ -623,8 +623,8 @@ where ( TableId::new(*table_id), ChangeLogDeltaCommon { - new_log: log_delta.new_log.as_ref().map(Into::into), truncate_epoch: log_delta.truncate_epoch, + new_log: log_delta.new_log.as_ref().unwrap().into(), }, ) }) @@ -752,7 +752,7 @@ where ( TableId::new(*table_id), ChangeLogDeltaCommon { - new_log: log_delta.new_log.clone().map(Into::into), + new_log: log_delta.new_log.clone().unwrap().into(), truncate_epoch: log_delta.truncate_epoch, }, )