Skip to content

Commit

Permalink
refactor(storage): remove option of new log (#20055)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Jan 9, 2025
1 parent bb4dec2 commit 9d76985
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 43 deletions.
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl HummockManager {
.change_log_delta
.values()
.flat_map(|change_log| {
let new_log = change_log.new_log.as_ref().unwrap();
let new_log = &change_log.new_log;
new_log
.new_value
.iter()
Expand Down
18 changes: 9 additions & 9 deletions src/storage/hummock_sdk/src/change_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,11 @@ pub fn build_table_change_log_delta<'a>(
TableId::new(table_id),
ChangeLogDelta {
truncate_epoch,
new_log: Some(EpochNewChangeLog {
new_log: EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
epochs: epochs.clone(),
}),
},
},
)
})
Expand All @@ -203,7 +203,7 @@ pub fn build_table_change_log_delta<'a>(
for table_id in &sst.table_ids {
match table_change_log.get_mut(&TableId::new(*table_id)) {
Some(log) => {
log.new_log.as_mut().unwrap().old_value.push(sst.clone());
log.new_log.old_value.push(sst.clone());
}
None => {
warn!(table_id, ?sst, "old value sst contains non-log-store table");
Expand All @@ -214,7 +214,7 @@ pub fn build_table_change_log_delta<'a>(
for sst in new_value_ssts {
for table_id in &sst.table_ids {
if let Some(log) = table_change_log.get_mut(&TableId::new(*table_id)) {
log.new_log.as_mut().unwrap().new_value.push(sst.clone());
log.new_log.new_value.push(sst.clone());
}
}
}
Expand All @@ -224,7 +224,7 @@ pub fn build_table_change_log_delta<'a>(
#[derive(Debug, PartialEq, Clone)]
pub struct ChangeLogDeltaCommon<T> {
pub truncate_epoch: u64,
pub new_log: Option<EpochNewChangeLogCommon<T>>,
pub new_log: EpochNewChangeLogCommon<T>,
}

pub type ChangeLogDelta = ChangeLogDeltaCommon<SstableInfo>;
Expand All @@ -236,7 +236,7 @@ where
fn from(val: &ChangeLogDeltaCommon<T>) -> Self {
Self {
truncate_epoch: val.truncate_epoch,
new_log: val.new_log.as_ref().map(|a| a.into()),
new_log: Some((&val.new_log).into()),
}
}
}
Expand All @@ -248,7 +248,7 @@ where
fn from(val: &PbChangeLogDelta) -> Self {
Self {
truncate_epoch: val.truncate_epoch,
new_log: val.new_log.as_ref().map(|a| a.into()),
new_log: val.new_log.as_ref().unwrap().into(),
}
}
}
Expand All @@ -260,7 +260,7 @@ where
fn from(val: ChangeLogDeltaCommon<T>) -> Self {
Self {
truncate_epoch: val.truncate_epoch,
new_log: val.new_log.map(|a| a.into()),
new_log: Some(val.new_log.into()),
}
}
}
Expand All @@ -272,7 +272,7 @@ where
fn from(val: PbChangeLogDelta) -> Self {
Self {
truncate_epoch: val.truncate_epoch,
new_log: val.new_log.map(|a| a.into()),
new_log: val.new_log.unwrap().into(),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ impl HummockVersion {
changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
) {
for (table_id, change_log_delta) in change_log_delta {
let new_change_log = change_log_delta.new_log.as_ref().unwrap();
let new_change_log = &change_log_delta.new_log;
match table_change_log.entry(*table_id) {
Entry::Occupied(entry) => {
let change_log = entry.into_mut();
Expand Down
48 changes: 28 additions & 20 deletions src/storage/hummock_sdk/src/frontend_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,12 @@ impl FrontendHummockVersionDelta {
*table_id,
ChangeLogDeltaCommon {
truncate_epoch: change_log_delta.truncate_epoch,
new_log: change_log_delta.new_log.as_ref().map(|new_log| {
EpochNewChangeLogCommon {
// Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
new_value: vec![(); new_log.new_value.len()],
old_value: vec![(); new_log.old_value.len()],
epochs: new_log.epochs.clone(),
}
}),
new_log: EpochNewChangeLogCommon {
// Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
new_value: vec![(); change_log_delta.new_log.new_value.len()],
old_value: vec![(); change_log_delta.new_log.old_value.len()],
epochs: change_log_delta.new_log.epochs.clone(),
},
},
)
})
Expand Down Expand Up @@ -187,11 +185,17 @@ impl FrontendHummockVersionDelta {
(
table_id.table_id,
PbChangeLogDelta {
new_log: delta.new_log.as_ref().map(|new_log| PbEpochNewChangeLog {
new_log: Some(PbEpochNewChangeLog {
// Here we need to determine if value is null but don't care what the value is, so we fill him in using `PbSstableInfo::default()`
old_value: vec![PbSstableInfo::default(); new_log.old_value.len()],
new_value: vec![PbSstableInfo::default(); new_log.new_value.len()],
epochs: new_log.epochs.clone(),
old_value: vec![
PbSstableInfo::default();
delta.new_log.old_value.len()
],
new_value: vec![
PbSstableInfo::default();
delta.new_log.new_value.len()
],
epochs: delta.new_log.epochs.clone(),
}),
truncate_epoch: delta.truncate_epoch,
},
Expand Down Expand Up @@ -228,14 +232,18 @@ impl FrontendHummockVersionDelta {
TableId::new(*table_id),
ChangeLogDeltaCommon {
truncate_epoch: change_log_delta.truncate_epoch,
new_log: change_log_delta.new_log.as_ref().map(|new_log| {
EpochNewChangeLogCommon {
// Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
new_value: vec![(); new_log.new_value.len()],
old_value: vec![(); new_log.old_value.len()],
epochs: new_log.epochs.clone(),
}
}),
new_log: change_log_delta
.new_log
.as_ref()
.map(|new_log| {
EpochNewChangeLogCommon {
// Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
new_value: vec![(); new_log.new_value.len()],
old_value: vec![(); new_log.old_value.len()],
epochs: new_log.epochs.clone(),
}
})
.unwrap(),
},
)
})
Expand Down
18 changes: 9 additions & 9 deletions src/storage/hummock_sdk/src/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,15 @@ impl From<(&HummockVersionDelta, &HashSet<StateTableId>)> for IncompleteHummockV
}
debug_assert!(log_delta
.new_log
.as_ref()
.map(|d| {
d.new_value.iter().chain(d.old_value.iter()).all(|s| {
s.table_ids
.iter()
.any(|tid| time_travel_table_ids.contains(tid))
})
})
.unwrap_or(true));
.new_value
.iter()
.chain(log_delta.new_log.old_value.iter())
.all(|s| {
s.table_ids
.iter()
.any(|tid| time_travel_table_ids.contains(tid))
}));

Some((*table_id, PbChangeLogDelta::from(log_delta).into()))
})
.collect(),
Expand Down
6 changes: 3 additions & 3 deletions src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ where
})
.chain(self.change_log_delta.values().flat_map(|delta| {
// TODO: optimization: strip table change log
let new_log = delta.new_log.as_ref().unwrap();
let new_log = &delta.new_log;
new_log.new_value.iter().chain(new_log.old_value.iter())
}))
}
Expand Down Expand Up @@ -623,8 +623,8 @@ where
(
TableId::new(*table_id),
ChangeLogDeltaCommon {
new_log: log_delta.new_log.as_ref().map(Into::into),
truncate_epoch: log_delta.truncate_epoch,
new_log: log_delta.new_log.as_ref().unwrap().into(),
},
)
})
Expand Down Expand Up @@ -752,7 +752,7 @@ where
(
TableId::new(*table_id),
ChangeLogDeltaCommon {
new_log: log_delta.new_log.clone().map(Into::into),
new_log: log_delta.new_log.clone().unwrap().into(),
truncate_epoch: log_delta.truncate_epoch,
},
)
Expand Down

0 comments on commit 9d76985

Please sign in to comment.