Skip to content

Commit

Permalink
Chore: Replace LogId<C> with LogIdOf<C>
Browse files Browse the repository at this point in the history
This change prepares for a future update where `LogId` will become an
associated type of `RaftTypeConfig`. By replacing `LogId` with `LogIdOf`
now, the transition to `C::LogId` in a future commit will require
minimal code changes.
  • Loading branch information
drmingdrmer committed Jan 9, 2025
1 parent d4b4b02 commit e723789
Show file tree
Hide file tree
Showing 61 changed files with 296 additions and 292 deletions.
12 changes: 6 additions & 6 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use openraft::alias::LogIdOf;
use openraft::alias::SnapshotDataOf;
use openraft::storage::IOFlushed;
use openraft::storage::LogState;
Expand All @@ -18,7 +19,6 @@ use openraft::storage::RaftStateMachine;
use openraft::storage::Snapshot;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::OptionalSend;
use openraft::RaftLogId;
use openraft::SnapshotMeta;
Expand Down Expand Up @@ -61,14 +61,14 @@ pub struct StoredSnapshot {

#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct StateMachine {
pub last_applied_log: Option<LogId<TypeConfig>>,
pub last_applied_log: Option<LogIdOf<TypeConfig>>,
pub last_membership: StoredMembership<TypeConfig>,
}

pub struct LogStore {
vote: RwLock<Option<Vote<TypeConfig>>>,
log: RwLock<BTreeMap<u64, Entry<TypeConfig>>>,
last_purged_log_id: RwLock<Option<LogId<TypeConfig>>>,
last_purged_log_id: RwLock<Option<LogIdOf<TypeConfig>>>,
}

impl LogStore {
Expand Down Expand Up @@ -212,15 +212,15 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
}

#[tracing::instrument(level = "debug", skip(self))]
async fn truncate(&mut self, log_id: LogId<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
async fn truncate(&mut self, log_id: LogIdOf<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
let mut log = self.log.write().await;
log.split_off(&log_id.index);

Ok(())
}

#[tracing::instrument(level = "debug", skip_all)]
async fn purge(&mut self, log_id: LogId<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
async fn purge(&mut self, log_id: LogIdOf<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
{
let mut p = self.last_purged_log_id.write().await;
*p = Some(log_id);
Expand Down Expand Up @@ -253,7 +253,7 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
async fn applied_state(
&mut self,
) -> Result<(Option<LogId<TypeConfig>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
) -> Result<(Option<LogIdOf<TypeConfig>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
let sm = self.sm.read().await;
Ok((sm.last_applied_log, sm.last_membership.clone()))
}
Expand Down
24 changes: 12 additions & 12 deletions examples/memstore/src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::fmt::Debug;
use std::ops::RangeBounds;
use std::sync::Arc;

use openraft::alias::LogIdOf;
use openraft::alias::VoteOf;
use openraft::storage::IOFlushed;
use openraft::LogId;
use openraft::LogState;
use openraft::RaftLogId;
use openraft::RaftTypeConfig;
Expand All @@ -24,13 +24,13 @@ pub struct LogStore<C: RaftTypeConfig> {
#[derive(Debug)]
pub struct LogStoreInner<C: RaftTypeConfig> {
/// The last purged log id.
last_purged_log_id: Option<LogId<C>>,
last_purged_log_id: Option<LogIdOf<C>>,

/// The Raft log.
log: BTreeMap<u64, C::Entry>,

/// The commit log id.
committed: Option<LogId<C>>,
committed: Option<LogIdOf<C>>,

/// The current granted vote.
vote: Option<VoteOf<C>>,
Expand Down Expand Up @@ -75,12 +75,12 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
})
}

async fn save_committed(&mut self, committed: Option<LogId<C>>) -> Result<(), StorageError<C>> {
async fn save_committed(&mut self, committed: Option<LogIdOf<C>>) -> Result<(), StorageError<C>> {
self.committed = committed;
Ok(())
}

async fn read_committed(&mut self) -> Result<Option<LogId<C>>, StorageError<C>> {
async fn read_committed(&mut self) -> Result<Option<LogIdOf<C>>, StorageError<C>> {
Ok(self.committed.clone())
}

Expand All @@ -104,7 +104,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
Ok(())
}

async fn truncate(&mut self, log_id: LogId<C>) -> Result<(), StorageError<C>> {
async fn truncate(&mut self, log_id: LogIdOf<C>) -> Result<(), StorageError<C>> {
let keys = self.log.range(log_id.index..).map(|(k, _v)| *k).collect::<Vec<_>>();
for key in keys {
self.log.remove(&key);
Expand All @@ -113,7 +113,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
Ok(())
}

async fn purge(&mut self, log_id: LogId<C>) -> Result<(), StorageError<C>> {
async fn purge(&mut self, log_id: LogIdOf<C>) -> Result<(), StorageError<C>> {
{
let ld = &mut self.last_purged_log_id;
assert!(ld.as_ref() <= Some(&log_id));
Expand All @@ -135,10 +135,10 @@ mod impl_log_store {
use std::fmt::Debug;
use std::ops::RangeBounds;

use openraft::alias::LogIdOf;
use openraft::alias::VoteOf;
use openraft::storage::IOFlushed;
use openraft::storage::RaftLogStorage;
use openraft::LogId;
use openraft::LogState;
use openraft::RaftLogReader;
use openraft::RaftTypeConfig;
Expand Down Expand Up @@ -173,12 +173,12 @@ mod impl_log_store {
inner.get_log_state().await
}

async fn save_committed(&mut self, committed: Option<LogId<C>>) -> Result<(), StorageError<C>> {
async fn save_committed(&mut self, committed: Option<LogIdOf<C>>) -> Result<(), StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.save_committed(committed).await
}

async fn read_committed(&mut self) -> Result<Option<LogId<C>>, StorageError<C>> {
async fn read_committed(&mut self) -> Result<Option<LogIdOf<C>>, StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.read_committed().await
}
Expand All @@ -194,12 +194,12 @@ mod impl_log_store {
inner.append(entries, callback).await
}

async fn truncate(&mut self, log_id: LogId<C>) -> Result<(), StorageError<C>> {
async fn truncate(&mut self, log_id: LogIdOf<C>) -> Result<(), StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.truncate(log_id).await
}

async fn purge(&mut self, log_id: LogId<C>) -> Result<(), StorageError<C>> {
async fn purge(&mut self, log_id: LogIdOf<C>) -> Result<(), StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.purge(log_id).await
}
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/core/heartbeat/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::display_ext::DisplayInstantExt;
use crate::display_ext::DisplayOptionExt;
use crate::replication::ReplicationSessionId;
use crate::type_config::alias::InstantOf;
use crate::LogId;
use crate::type_config::alias::LogIdOf;
use crate::RaftTypeConfig;

/// The information for broadcasting a heartbeat.
Expand All @@ -30,13 +30,13 @@ where C: RaftTypeConfig
///
/// When there are no new logs to replicate, the Leader sends a heartbeat to replicate committed
/// log id to followers to update their committed log id.
pub(crate) committed: Option<LogId<C>>,
pub(crate) committed: Option<LogIdOf<C>>,
}

impl<C> HeartbeatEvent<C>
where C: RaftTypeConfig
{
pub(crate) fn new(time: InstantOf<C>, session_id: ReplicationSessionId<C>, committed: Option<LogId<C>>) -> Self {
pub(crate) fn new(time: InstantOf<C>, session_id: ReplicationSessionId<C>, committed: Option<LogIdOf<C>>) -> Self {
Self {
time,
session_id,
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ where C: RaftTypeConfig
leader_vote: CommittedVote<C>,
// TODO: need this?
// /// The cluster this replication works for.
// membership_log_id: Option<LogId<C>>,
// membership_log_id: Option<LogIdOf<C>>,
},

/// [`StorageError`] error has taken place locally(not on remote node),
Expand Down
18 changes: 9 additions & 9 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ use crate::runtime::RaftRuntime;
use crate::storage::IOFlushed;
use crate::storage::RaftLogStorage;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::OneshotReceiverOf;
Expand All @@ -101,7 +102,6 @@ use crate::vote::RaftLeaderId;
use crate::vote::RaftVote;
use crate::ChangeMembers;
use crate::Instant;
use crate::LogId;
use crate::Membership;
use crate::OptionalSend;
use crate::RaftTypeConfig;
Expand All @@ -110,7 +110,7 @@ use crate::StorageError;
/// A temp struct to hold the data for a node that is being applied.
#[derive(Debug)]
pub(crate) struct ApplyingEntry<C: RaftTypeConfig> {
log_id: LogId<C>,
log_id: LogIdOf<C>,
membership: Option<Membership<C>>,
}

Expand All @@ -127,7 +127,7 @@ where C: RaftTypeConfig
}

impl<C: RaftTypeConfig> ApplyingEntry<C> {
pub(crate) fn new(log_id: LogId<C>, membership: Option<Membership<C>>) -> Self {
pub(crate) fn new(log_id: LogIdOf<C>, membership: Option<Membership<C>>) -> Self {
Self { log_id, membership }
}
}
Expand All @@ -136,7 +136,7 @@ impl<C: RaftTypeConfig> ApplyingEntry<C> {
pub(crate) struct ApplyResult<C: RaftTypeConfig> {
pub(crate) since: u64,
pub(crate) end: u64,
pub(crate) last_applied: LogId<C>,
pub(crate) last_applied: LogIdOf<C>,
pub(crate) applying_entries: Vec<ApplyingEntry<C>>,
pub(crate) apply_results: Vec<C::R>,
}
Expand Down Expand Up @@ -469,7 +469,7 @@ where
}
};

let ent = C::Entry::new_membership(LogId::default(), new_membership);
let ent = C::Entry::new_membership(LogIdOf::<C>::default(), new_membership);
self.write_entry(ent, Some(tx));
}

Expand Down Expand Up @@ -548,7 +548,7 @@ where
.map(|(id, p)| {
(
id.clone(),
<ProgressEntry<C> as Borrow<Option<LogId<C>>>>::borrow(p).clone(),
<ProgressEntry<C> as Borrow<Option<LogIdOf<C>>>>::borrow(p).clone(),
)
})
.collect(),
Expand Down Expand Up @@ -670,7 +670,7 @@ where

let membership = Membership::from(member_nodes);

let entry = C::Entry::new_membership(LogId::default(), membership);
let entry = C::Entry::new_membership(LogIdOf::<C>::default(), membership);
let res = self.engine.initialize(entry);

// If there is an error, respond at once.
Expand Down Expand Up @@ -761,8 +761,8 @@ where
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn apply_to_state_machine(
&mut self,
first: LogId<C>,
last: LogId<C>,
first: LogIdOf<C>,
last: LogIdOf<C>,
) -> Result<(), StorageError<C>> {
tracing::debug!("{}: {}..={}", func_name!(), first, last);

Expand Down
22 changes: 11 additions & 11 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use crate::raft::VoteResponse;
use crate::raft_state::IOId;
use crate::replication::request::Replicate;
use crate::replication::ReplicationSessionId;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::VoteOf;
use crate::vote::committed::CommittedVote;
use crate::LogId;
use crate::OptionalSend;
use crate::RaftTypeConfig;

Expand Down Expand Up @@ -65,12 +65,12 @@ where C: RaftTypeConfig
},

/// Replicate the committed log id to other nodes
ReplicateCommitted { committed: Option<LogId<C>> },
ReplicateCommitted { committed: Option<LogIdOf<C>> },

/// Broadcast heartbeat to all other nodes.
BroadcastHeartbeat {
session_id: ReplicationSessionId<C>,
committed: Option<LogId<C>>,
committed: Option<LogIdOf<C>>,
},

/// Save the committed log id to [`RaftLogStorage`].
Expand All @@ -79,7 +79,7 @@ where C: RaftTypeConfig
/// latest state.
///
/// [`RaftLogStorage`]: crate::storage::RaftLogStorage
SaveCommitted { committed: LogId<C> },
SaveCommitted { committed: LogIdOf<C> },

/// Commit log entries that are already persisted in the store, upto `upto`, inclusive.
///
Expand All @@ -91,8 +91,8 @@ where C: RaftTypeConfig
/// [`RaftLogStorage::save_committed()`]: crate::storage::RaftLogStorage::save_committed
/// [`RaftStateMachine::apply()`]: crate::storage::RaftStateMachine::apply
Apply {
already_committed: Option<LogId<C>>,
upto: LogId<C>,
already_committed: Option<LogIdOf<C>>,
upto: LogIdOf<C>,
},

/// Replicate log entries or snapshot to a target.
Expand All @@ -118,11 +118,11 @@ where C: RaftTypeConfig
SendVote { vote_req: VoteRequest<C> },

/// Purge log from the beginning to `upto`, inclusive.
PurgeLog { upto: LogId<C> },
PurgeLog { upto: LogIdOf<C> },

/// Delete logs that conflict with the leader from a follower/learner since log id `since`,
/// inclusive.
TruncateLog { since: LogId<C> },
TruncateLog { since: LogIdOf<C> },

/// A command send to state machine worker [`sm::worker::Worker`].
///
Expand Down Expand Up @@ -296,14 +296,14 @@ where C: RaftTypeConfig
/// This is only used by [`Raft::initialize()`], because when initializing there is no leader.
///
/// [`Raft::initialize()`]: `crate::Raft::initialize()`
LogFlushed { log_id: Option<LogId<C>> },
LogFlushed { log_id: Option<LogIdOf<C>> },

/// Wait until the log is applied to the state machine.
#[allow(dead_code)]
Applied { log_id: Option<LogId<C>> },
Applied { log_id: Option<LogIdOf<C>> },

/// Wait until snapshot is built and includes the log id.
Snapshot { log_id: Option<LogId<C>> },
Snapshot { log_id: Option<LogIdOf<C>> },
}

impl<C> fmt::Display for Condition<C>
Expand Down
Loading

0 comments on commit e723789

Please sign in to comment.