Skip to content

Commit

Permalink
Chore: replace LogId.index with LogId.index()
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jan 10, 2025
1 parent 52629cb commit 0c5ed58
Show file tree
Hide file tree
Showing 31 changed files with 85 additions and 77 deletions.
8 changes: 4 additions & 4 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
let snapshot_idx = self.snapshot_idx.fetch_add(1, Ordering::Relaxed);

let snapshot_id = if let Some(last) = last_applied_log {
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
format!("{}-{}-{}", last.leader_id, last.index(), snapshot_idx)
} else {
format!("--{}", snapshot_idx)
};
Expand Down Expand Up @@ -214,7 +214,7 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
#[tracing::instrument(level = "debug", skip(self))]
async fn truncate(&mut self, log_id: LogIdOf<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
let mut log = self.log.write().await;
log.split_off(&log_id.index);
log.split_off(&log_id.index());

Ok(())
}
Expand All @@ -227,7 +227,7 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
}

let mut log = self.log.write().await;
*log = log.split_off(&(log_id.index + 1));
*log = log.split_off(&(log_id.index() + 1));

Ok(())
}
Expand All @@ -237,7 +237,7 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
where I: IntoIterator<Item = Entry<TypeConfig>> + Send {
{
let mut log = self.log.write().await;
log.extend(entries.into_iter().map(|entry| (entry.get_log_id().index, entry)));
log.extend(entries.into_iter().map(|entry| (entry.get_log_id().index(), entry)));
}
callback.io_completed(Ok(()));
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions examples/memstore/src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
where I: IntoIterator<Item = C::Entry> {
// Simple implementation that calls the flush-before-return `append_to_log`.
for entry in entries {
self.log.insert(entry.get_log_id().index, entry);
self.log.insert(entry.get_log_id().index(), entry);
}
callback.io_completed(Ok(()));

Ok(())
}

async fn truncate(&mut self, log_id: LogIdOf<C>) -> Result<(), StorageError<C>> {
let keys = self.log.range(log_id.index..).map(|(k, _v)| *k).collect::<Vec<_>>();
let keys = self.log.range(log_id.index()..).map(|(k, _v)| *k).collect::<Vec<_>>();
for key in keys {
self.log.remove(&key);
}
Expand All @@ -121,7 +121,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
}

{
let keys = self.log.range(..=log_id.index).map(|(k, _v)| *k).collect::<Vec<_>>();
let keys = self.log.range(..=log_id.index()).map(|(k, _v)| *k).collect::<Vec<_>>();
for key in keys {
self.log.remove(&key);
}
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl From<LogId> for pb::LogId {
fn from(log_id: LogId) -> Self {
pb::LogId {
term: log_id.leader_id,
index: log_id.index,
index: log_id.index(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-grpc/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
};

let snapshot_id = if let Some(last) = last_applied_log {
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
format!("{}-{}-{}", last.leader_id, last.index(), snapshot_idx)
} else {
format!("--{}", snapshot_idx)
};
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-network-v2/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
};

let snapshot_id = if let Some(last) = last_applied_log {
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
format!("{}-{}-{}", last.leader_id, last.index(), snapshot_idx)
} else {
format!("--{}", snapshot_idx)
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
};

let snapshot_id = if let Some(last) = last_applied_log {
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
format!("{}-{}-{}", last.leader_id, last.index(), snapshot_idx)
} else {
format!("--{}", snapshot_idx)
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ async fn run_test(rafts: &[typ::Raft], router: Router) {
{
let metrics = raft1.metrics().borrow().clone();
println!("node 1 metrics: {:#?}", metrics);
assert_eq!(Some(3), metrics.snapshot.map(|x| x.index));
assert_eq!(Some(3), metrics.purged.map(|x| x.index));
assert_eq!(Some(3), metrics.snapshot.map(|x| x.index()));
assert_eq!(Some(3), metrics.purged.map(|x| x.index()));
}

println!("=== add-learner node-2");
Expand All @@ -132,8 +132,8 @@ async fn run_test(rafts: &[typ::Raft], router: Router) {
{
let metrics = raft2.metrics().borrow().clone();
println!("node 2 metrics: {:#?}", metrics);
assert_eq!(Some(3), metrics.snapshot.map(|x| x.index));
assert_eq!(Some(3), metrics.purged.map(|x| x.index));
assert_eq!(Some(3), metrics.snapshot.map(|x| x.index()));
assert_eq!(Some(3), metrics.purged.map(|x| x.index()));
}

// In this example, the snapshot is just a copy of the state machine.
Expand Down
8 changes: 4 additions & 4 deletions examples/raft-kv-memstore-singlethreaded/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl RaftSnapshotBuilder<TypeConfig> for Rc<StateMachineStore> {
};

let snapshot_id = if let Some(last) = last_applied_log {
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
format!("{}-{}-{}", last.leader_id, last.index(), snapshot_idx)
} else {
format!("--{}", snapshot_idx)
};
Expand Down Expand Up @@ -307,7 +307,7 @@ impl RaftLogStorage<TypeConfig> for Rc<LogStore> {
// Simple implementation that calls the flush-before-return `append_to_log`.
let mut log = self.log.borrow_mut();
for entry in entries {
log.insert(entry.log_id.index, entry);
log.insert(entry.log_id.index(), entry);
}
callback.io_completed(Ok(()));

Expand All @@ -319,7 +319,7 @@ impl RaftLogStorage<TypeConfig> for Rc<LogStore> {
tracing::debug!("delete_log: [{:?}, +oo)", log_id);

let mut log = self.log.borrow_mut();
let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::<Vec<_>>();
let keys = log.range(log_id.index()..).map(|(k, _v)| *k).collect::<Vec<_>>();
for key in keys {
log.remove(&key);
}
Expand All @@ -340,7 +340,7 @@ impl RaftLogStorage<TypeConfig> for Rc<LogStore> {
{
let mut log = self.log.borrow_mut();

let keys = log.range(..=log_id.index).map(|(k, _v)| *k).collect::<Vec<_>>();
let keys = log.range(..=log_id.index()).map(|(k, _v)| *k).collect::<Vec<_>>();
for key in keys {
log.remove(&key);
}
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {

let snapshot_idx = self.snapshot_idx.fetch_add(1, Ordering::Relaxed) + 1;
let snapshot_id = if let Some(last) = last_applied_log {
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
format!("{}-{}-{}", last.leader_id, last.index(), snapshot_idx)
} else {
format!("--{}", snapshot_idx)
};
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl RaftSnapshotBuilder<TypeConfig> for StateMachineStore {
};

let snapshot_id = if let Some(last) = last_applied_log {
format!("{}-{}-{}", last.leader_id, last.index, self.snapshot_idx)
format!("{}-{}-{}", last.leader_id, last.index(), self.snapshot_idx)
} else {
format!("--{}", self.snapshot_idx)
};
Expand Down
2 changes: 1 addition & 1 deletion examples/rocksstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl RaftSnapshotBuilder<TypeConfig> for RocksStateMachine {
let snapshot_idx: u64 = rand::thread_rng().gen_range(0..1000);

let snapshot_id = if let Some(last) = last_applied_log {
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
format!("{}-{}-{}", last.leader_id, last.index(), snapshot_idx)
} else {
format!("--{}", snapshot_idx)
};
Expand Down
10 changes: 5 additions & 5 deletions examples/rocksstore/src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ where C: RaftTypeConfig

let entry: EntryOf<C> = serde_json::from_slice(&val).map_err(read_logs_err)?;

assert_eq!(id, entry.get_log_id().index);
assert_eq!(id, entry.get_log_id().index());

res.push(entry);
}
Expand Down Expand Up @@ -158,8 +158,8 @@ where C: RaftTypeConfig
async fn append<I>(&mut self, entries: I, callback: IOFlushed<C>) -> Result<(), StorageError<C>>
where I: IntoIterator<Item = EntryOf<C>> + Send {
for entry in entries {
let id = id_to_bin(entry.get_log_id().index);
assert_eq!(bin_to_id(&id), entry.get_log_id().index);
let id = id_to_bin(entry.get_log_id().index());
assert_eq!(bin_to_id(&id), entry.get_log_id().index());
self.db
.put_cf(
self.cf_logs(),
Expand All @@ -179,7 +179,7 @@ where C: RaftTypeConfig
async fn truncate(&mut self, log_id: LogIdOf<C>) -> Result<(), StorageError<C>> {
tracing::debug!("truncate: [{:?}, +oo)", log_id);

let from = id_to_bin(log_id.index);
let from = id_to_bin(log_id.index());
let to = id_to_bin(0xff_ff_ff_ff_ff_ff_ff_ff);
self.db.delete_range_cf(self.cf_logs(), &from, &to).map_err(|e| StorageError::write_logs(&e))?;

Expand All @@ -196,7 +196,7 @@ where C: RaftTypeConfig
self.put_meta::<meta::LastPurged>(&log_id)?;

let from = id_to_bin(0);
let to = id_to_bin(log_id.index + 1);
let to = id_to_bin(log_id.index() + 1);
self.db.delete_range_cf(self.cf_logs(), &from, &to).map_err(|e| StorageError::write_logs(&e))?;

// Purging does not need to be persistent.
Expand Down
10 changes: 5 additions & 5 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ where
// TODO: it should returns membership config error etc. currently this is done by the
// caller.
lh.leader_append_entries(entries);
let index = lh.state.last_log_id().unwrap().index;
let index = lh.state.last_log_id().unwrap().index();

// Install callback channels.
if let Some(tx) = tx {
Expand Down Expand Up @@ -767,10 +767,10 @@ where
tracing::debug!("{}: {}..={}", func_name!(), first, last);

debug_assert!(
first.index <= last.index,
first.index() <= last.index(),
"first.index {} should <= last.index {}",
first.index,
last.index
first.index(),
last.index()
);

let cmd = sm::Command::apply(first, last.clone());
Expand Down Expand Up @@ -1793,7 +1793,7 @@ where
self.log_store.truncate(since.clone()).await?;

// Inform clients waiting for logs to be applied.
let removed = self.client_resp_channels.split_off(&since.index);
let removed = self.client_resp_channels.split_off(&since.index());
if !removed.is_empty() {
let leader_id = self.current_leader();
let leader_node = self.get_leader_node(leader_id.clone());
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/sm/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ where
// so that an Entry does not need to be Clone,
// and no references will be used by apply

let since = first.index;
let end = last.index + 1;
let since = first.index();
let end = last.index() + 1;

let entries = self.log_reader.try_get_log_entries(since..end).await?;
if entries.len() != (end - since) as usize {
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,13 +591,13 @@ where C: RaftTypeConfig
return;
}

if index > snapshot_last_log_id.index {
if index > snapshot_last_log_id.index() {
tracing::info!(
"can not purge logs not in a snapshot; index: {}, last in snapshot log id: {}",
index,
snapshot_last_log_id
);
index = snapshot_last_log_id.index;
index = snapshot_last_log_id.index();
}

// Safe unwrap: `index` is ensured to be present in the above code.
Expand Down
12 changes: 6 additions & 6 deletions openraft/src/engine/handler/following_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ where C: RaftTypeConfig
);

if let Some(x) = entries.first() {
debug_assert!(x.get_log_id().index == prev_log_id.next_index());
debug_assert!(x.get_log_id().index() == prev_log_id.next_index());
}

let last_log_id = entries.last().map(|x| x.get_log_id().clone());
Expand All @@ -84,7 +84,7 @@ where C: RaftTypeConfig
// the entries after it has to be deleted first.
// Raft requires log ids are in total order by (term,index).
// Otherwise the log id with max index makes committed entry invisible in election.
self.truncate_logs(entries[since].get_log_id().index);
self.truncate_logs(entries[since].get_log_id().index());

let entries = entries.split_off(since);
self.do_append_entries(entries);
Expand Down Expand Up @@ -117,10 +117,10 @@ where C: RaftTypeConfig
) -> Result<(), RejectAppendEntries<C>> {
if let Some(prev) = prev_log_id {
if !self.state.has_log_id(prev) {
let local = self.state.get_log_id(prev.index);
let local = self.state.get_log_id(prev.index());
tracing::debug!(local = display(DisplayOption(&local)), "prev_log_id does not match");

self.truncate_logs(prev.index);
self.truncate_logs(prev.index());
return Err(RejectAppendEntries::ByConflictingLogId {
local,
expect: prev.clone(),
Expand All @@ -144,7 +144,7 @@ where C: RaftTypeConfig
pub(crate) fn do_append_entries(&mut self, entries: Vec<C::Entry>) {
debug_assert!(!entries.is_empty());
debug_assert_eq!(
entries[0].get_log_id().index,
entries[0].get_log_id().index(),
self.state.log_ids.last().cloned().next_index(),
);
debug_assert!(Some(entries[0].get_log_id()) > self.state.log_ids.last());
Expand Down Expand Up @@ -307,7 +307,7 @@ where C: RaftTypeConfig
return None;
}

let local = self.state.get_log_id(snap_last_log_id.index);
let local = self.state.get_log_id(snap_last_log_id.index());
if let Some(local) = local {
if local != snap_last_log_id {
// Conflict, delete all non-committed logs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn test_calc_purge_upto() -> anyhow::Result<()> {

if let Some(last_purged) = last_purged {
eng.state.log_ids.purge(&last_purged);
eng.state.purged_next = last_purged.index + 1;
eng.state.purged_next = last_purged.index() + 1;
}
eng.state.snapshot_meta.last_log_id = snapshot_last_log_id;
let got = eng.log_handler().calc_purge_upto();
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ where C: RaftTypeConfig

let mut updater = progress::entry::update::Updater::new(self.config, prog_entry);

updater.update_conflicting(conflict.index);
updater.update_conflicting(conflict.index());
}

/// Enable one-time replication reset for a specific node upon log reversion detection.
Expand Down
Loading

0 comments on commit 0c5ed58

Please sign in to comment.