Skip to content

Commit

Permalink
Change: rename Raft::install_complete_snapshot() to `install_full_s…
Browse files Browse the repository at this point in the history
…napshot()`

Rename `Raft::install_complete_snapshot()` to `install_full_snapshot()`;
Rename `RaftNetwork::snapshot()` to `full_snapshot()`;
  • Loading branch information
drmingdrmer committed Feb 27, 2024
1 parent 22cd3bb commit 685fe8d
Show file tree
Hide file tree
Showing 25 changed files with 65 additions and 65 deletions.
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-generic-snapshot-data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This example is similar to the basic raft-kv-memstore example
but focuses on how to handle snapshot with `generic-snapshot-data` enabled.
Other aspects are minimized.

To send a complete snapshot, Refer to implementation of `RaftNetwork::snapshot()` in this example.
To send a complete snapshot, Refer to implementation of `RaftNetwork::full_snapshot()` in this example.

To receive a complete snapshot, Refer to implementation of `api::snapshot()` in this example.

Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-generic-snapshot-data/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub async fn snapshot(app: &mut App, req: String) -> String {
};
let res = app
.raft
.install_complete_snapshot(vote, snapshot)
.install_full_snapshot(vote, snapshot)
.await
.map_err(typ::RaftError::<typ::Infallible>::Fatal);
encode(res)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl RaftNetwork<TypeConfig> for Connection {
}

/// A real application should replace this method with customized implementation.
async fn snapshot(
async fn full_snapshot(
&mut self,
vote: Vote<NodeId>,
snapshot: Snapshot<TypeConfig>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ pub fn log_panic(panic: &PanicInfo) {
///
/// - Setup a single node cluster, write some logs, take a snapshot;
/// - Add a learner node-2 to receive snapshot replication, via the complete-snapshot API:
/// - The sending end sends snapshot with `RaftNetwork::snapshot()`;
/// - The sending end sends snapshot with `RaftNetwork::full_snapshot()`;
/// - The receiving end deliver the received snapshot to `Raft` with
/// `Raft::install_complete_snapshot()`.
/// `Raft::install_full_snapshot()`.
#[tokio::test]
async fn test_cluster() {
std::panic::set_hook(Box::new(|panic| {
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-opendal-snapshot-data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This example is similar to the basic raft-kv-memstore example
but focuses on how to store and fetch snapshot data from remote storage.
Other aspects are minimized.

To send a complete snapshot, Refer to implementation of `RaftNetwork::snapshot()` in this example.
To send a complete snapshot, Refer to implementation of `RaftNetwork::full_snapshot()` in this example.

To receive a complete snapshot, Refer to implementation of `api::snapshot()` in this example.

Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub async fn snapshot(app: &mut App, req: String) -> String {
};
let res = app
.raft
.install_complete_snapshot(vote, snapshot)
.install_full_snapshot(vote, snapshot)
.await
.map_err(typ::RaftError::<typ::Infallible>::Fatal);
encode(res)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl RaftNetwork<TypeConfig> for Connection {
}

/// A real application should replace this method with customized implementation.
async fn snapshot(
async fn full_snapshot(
&mut self,
vote: Vote<NodeId>,
snapshot: Snapshot<TypeConfig>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ pub fn log_panic(panic: &PanicInfo) {
///
/// - Setup a single node cluster, write some logs, take a snapshot;
/// - Add a learner node-2 to receive snapshot replication, via the complete-snapshot API:
/// - The sending end sends snapshot with `RaftNetwork::snapshot()`;
/// - The sending end sends snapshot with `RaftNetwork::full_snapshot()`;
/// - The receiving end deliver the received snapshot to `Raft` with
/// `Raft::install_complete_snapshot()`.
/// `Raft::install_full_snapshot()`.
#[tokio::test]
async fn test_cluster() {
std::panic::set_hook(Box::new(|panic| {
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1115,8 +1115,8 @@ where
RaftMsg::BeginReceivingSnapshot { vote, tx } => {
self.engine.handle_begin_receiving_snapshot(vote, tx);
}
RaftMsg::InstallCompleteSnapshot { vote, snapshot, tx } => {
self.engine.handle_install_complete_snapshot(vote, snapshot, tx);
RaftMsg::InstallFullSnapshot { vote, snapshot, tx } => {
self.engine.handle_install_full_snapshot(vote, snapshot, tx);
}
RaftMsg::CheckIsLeaderRequest { tx } => {
if self.engine.state.is_leader(&self.engine.config.id) {
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ where C: RaftTypeConfig
tx: VoteTx<C::NodeId>,
},

InstallCompleteSnapshot {
InstallFullSnapshot {
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
tx: ResultSender<SnapshotResponse<C::NodeId>>,
Expand Down Expand Up @@ -124,8 +124,8 @@ where C: RaftTypeConfig
RaftMsg::BeginReceivingSnapshot { vote, .. } => {
format!("BeginReceivingSnapshot: vote: {}", vote)
}
RaftMsg::InstallCompleteSnapshot { vote, snapshot, .. } => {
format!("InstallCompleteSnapshot: vote: {}, snapshot: {}", vote, snapshot)
RaftMsg::InstallFullSnapshot { vote, snapshot, .. } => {
format!("InstallFullSnapshot: vote: {}, snapshot: {}", vote, snapshot)
}
RaftMsg::ClientWriteRequest { .. } => "ClientWriteRequest".to_string(),
RaftMsg::CheckIsLeaderRequest { .. } => "CheckIsLeaderRequest".to_string(),
Expand Down
14 changes: 7 additions & 7 deletions openraft/src/core/sm/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ where C: RaftTypeConfig
Command::new(payload)
}

pub(crate) fn install_complete_snapshot(snapshot: Snapshot<C>) -> Self {
let payload = CommandPayload::InstallCompleteSnapshot { snapshot };
pub(crate) fn install_full_snapshot(snapshot: Snapshot<C>) -> Self {
let payload = CommandPayload::InstallFullSnapshot { snapshot };
Command::new(payload)
}

Expand Down Expand Up @@ -98,7 +98,7 @@ where C: RaftTypeConfig
tx: ResultSender<Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>,
},

InstallCompleteSnapshot {
InstallFullSnapshot {
snapshot: Snapshot<C>,
},

Expand All @@ -115,8 +115,8 @@ where C: RaftTypeConfig
match self {
CommandPayload::BuildSnapshot => write!(f, "BuildSnapshot"),
CommandPayload::GetSnapshot { .. } => write!(f, "GetSnapshot"),
CommandPayload::InstallCompleteSnapshot { snapshot } => {
write!(f, "InstallCompleteSnapshot: meta: {:?}", snapshot.meta)
CommandPayload::InstallFullSnapshot { snapshot } => {
write!(f, "InstallFullSnapshot: meta: {:?}", snapshot.meta)
}
CommandPayload::BeginReceivingSnapshot { .. } => {
write!(f, "BeginReceivingSnapshot")
Expand All @@ -136,8 +136,8 @@ where C: RaftTypeConfig
(CommandPayload::GetSnapshot { .. }, CommandPayload::GetSnapshot { .. }) => true,
(CommandPayload::BeginReceivingSnapshot { .. }, CommandPayload::BeginReceivingSnapshot { .. }) => true,
(
CommandPayload::InstallCompleteSnapshot { snapshot: s1 },
CommandPayload::InstallCompleteSnapshot { snapshot: s2 },
CommandPayload::InstallFullSnapshot { snapshot: s1 },
CommandPayload::InstallFullSnapshot { snapshot: s2 },
) => s1.meta == s2.meta,
(CommandPayload::Apply { entries: entries1 }, CommandPayload::Apply { entries: entries2 }) => {
// Entry may not be `Eq`, we just compare log id.
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/sm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ where
self.get_snapshot(tx).await?;
// GetSnapshot does not respond to RaftCore
}
CommandPayload::InstallCompleteSnapshot { snapshot } => {
CommandPayload::InstallFullSnapshot { snapshot } => {
tracing::info!("{}: install complete snapshot", func_name!());

let meta = snapshot.meta.clone();
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/docs/feature_flags/feature-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ This feature is introduced in 0.9.0

On the sending end (leader that sends snapshot to follower):

- Without `generic-snapshot-data`: [`RaftNetwork::snapshot()`]
- Without `generic-snapshot-data`: [`RaftNetwork::full_snapshot()`]
provides a default implementation that invokes the chunk-based API
[`RaftNetwork::install_snapshot()`] for transmit.

- With `generic-snapshot-data` enabled: [`RaftNetwork::snapshot()`]
- With `generic-snapshot-data` enabled: [`RaftNetwork::full_snapshot()`]
must be implemented to provide application customized snapshot transmission.
Application does not need to implement [`RaftNetwork::install_snapshot()`].

Expand Down Expand Up @@ -96,5 +96,5 @@ emit log record.
See: [tracing doc: emitting-log-records](https://docs.rs/tracing/latest/tracing/#emitting-log-records)


[`RaftNetwork::snapshot()`]: crate::network::RaftNetwork::snapshot
[`RaftNetwork::full_snapshot()`]: crate::network::RaftNetwork::full_snapshot
[`RaftNetwork::install_snapshot()`]: crate::network::RaftNetwork::install_snapshot
14 changes: 7 additions & 7 deletions openraft/src/docs/getting_started/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ and receiving messages between Raft nodes.
Here is the list of methods that need to be implemented for the [`RaftNetwork`] trait:


| [`RaftNetwork`] method | forward request | to target |
|------------------------|--------------------------|---------------------------------------------------|
| [`append_entries()`] | [`AppendEntriesRequest`] | remote node [`Raft::append_entries()`] |
| [`snapshot()`] | [`Snapshot`] | remote node [`Raft::install_complete_snapshot()`] |
| [`vote()`] | [`VoteRequest`] | remote node [`Raft::vote()`] |
| [`RaftNetwork`] method | forward request | to target |
|------------------------|--------------------------|------------------------------------------------|
| [`append_entries()`] | [`AppendEntriesRequest`] | remote node [`Raft::append_entries()`] |
| [`full_snapshot()`] | [`Snapshot`] | remote node [`Raft::install_full_snapshot()`] |
| [`vote()`] | [`VoteRequest`] | remote node [`Raft::vote()`] |

[Mem KV Network](https://github.com/datafuselabs/openraft/blob/main/examples/raft-kv-memstore/src/network/raft_network_impl.rs)
demonstrates how to forward messages to other Raft nodes using [`reqwest`](https://docs.rs/reqwest/latest/reqwest/) as network transport layer.
Expand Down Expand Up @@ -351,7 +351,7 @@ Additionally, two test scripts for setting up a cluster are available:
[`Raft`]: `crate::Raft`
[`Raft::append_entries()`]: `crate::Raft::append_entries`
[`Raft::vote()`]: `crate::Raft::vote`
[`Raft::install_complete_snapshot()`]: `crate::Raft::install_complete_snapshot`
[`Raft::install_full_snapshot()`]: `crate::Raft::install_full_snapshot`

[`AppendEntriesRequest`]: `crate::raft::AppendEntriesRequest`
[`VoteRequest`]: `crate::raft::VoteRequest`
Expand Down Expand Up @@ -398,7 +398,7 @@ Additionally, two test scripts for setting up a cluster are available:
[`RaftNetwork`]: `crate::network::RaftNetwork`
[`append_entries()`]: `crate::RaftNetwork::append_entries`
[`vote()`]: `crate::RaftNetwork::vote`
[`snapshot()`]: `crate::RaftNetwork::snapshot`
[`full_snapshot()`]: `crate::RaftNetwork::full_snapshot`


[`RaftSnapshotBuilder`]: `crate::storage::RaftSnapshotBuilder`
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ where
AppendEntries(ValueSender<Result<AppendEntriesResponse<NID>, Infallible>>),
ReceiveSnapshotChunk(ValueSender<Result<(), InstallSnapshotError>>),
InstallSnapshot(ValueSender<Result<InstallSnapshotResponse<NID>, InstallSnapshotError>>),
InstallCompleteSnapshot(ValueSender<Result<SnapshotResponse<NID>, Infallible>>),
InstallFullSnapshot(ValueSender<Result<SnapshotResponse<NID>, Infallible>>),
Initialize(ValueSender<Result<(), InitializeError<NID, N>>>),
}

Expand All @@ -253,7 +253,7 @@ where
Respond::AppendEntries(x) => x.send(),
Respond::ReceiveSnapshotChunk(x) => x.send(),
Respond::InstallSnapshot(x) => x.send(),
Respond::InstallCompleteSnapshot(x) => x.send(),
Respond::InstallFullSnapshot(x) => x.send(),
Respond::Initialize(x) => x.send(),
}
}
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ where C: RaftTypeConfig

/// Install a completely received snapshot on a follower.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_install_complete_snapshot(
pub(crate) fn handle_install_full_snapshot(
&mut self,
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
Expand All @@ -467,7 +467,7 @@ where C: RaftTypeConfig
};

let mut fh = self.following_handler();
fh.install_complete_snapshot(snapshot);
fh.install_full_snapshot(snapshot);
let res = Ok(SnapshotResponse {
vote: *self.state.vote_ref(),
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn test_install_snapshot_lt_last_snapshot() -> anyhow::Result<()> {
// `snapshot_meta.last_log_id`.
let mut eng = eng();

eng.following_handler().install_complete_snapshot(Snapshot {
eng.following_handler().install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(2, 1, 2)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
Expand Down Expand Up @@ -86,7 +86,7 @@ fn test_install_snapshot_lt_committed() -> anyhow::Result<()> {
// Although in this case the state machine is not affected.
let mut eng = eng();

eng.following_handler().install_complete_snapshot(Snapshot {
eng.following_handler().install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(4, 1, 5)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
Expand All @@ -113,7 +113,7 @@ fn test_install_snapshot_not_conflict() -> anyhow::Result<()> {
// Snapshot will be installed and there are no conflicting logs.
let mut eng = eng();

eng.following_handler().install_complete_snapshot(Snapshot {
eng.following_handler().install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(4, 1, 6)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
Expand All @@ -140,7 +140,7 @@ fn test_install_snapshot_not_conflict() -> anyhow::Result<()> {
vec![
//
Command::from(
sm::Command::install_complete_snapshot(Snapshot {
sm::Command::install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(4, 1, 6)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
Expand Down Expand Up @@ -187,7 +187,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> {
eng
};

eng.following_handler().install_complete_snapshot(Snapshot {
eng.following_handler().install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(5, 1, 6)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
Expand Down Expand Up @@ -215,7 +215,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> {
//
Command::DeleteConflictLog { since: log_id(2, 1, 4) },
Command::from(
sm::Command::install_complete_snapshot(Snapshot {
sm::Command::install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(5, 1, 6)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
Expand All @@ -238,7 +238,7 @@ fn test_install_snapshot_advance_last_log_id() -> anyhow::Result<()> {
// Snapshot will be installed and there are no conflicting logs.
let mut eng = eng();

eng.following_handler().install_complete_snapshot(Snapshot {
eng.following_handler().install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(100, 1, 100)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
Expand Down Expand Up @@ -268,7 +268,7 @@ fn test_install_snapshot_advance_last_log_id() -> anyhow::Result<()> {
assert_eq!(
vec![
Command::from(
sm::Command::install_complete_snapshot(Snapshot {
sm::Command::install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(100, 1, 100)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
Expand All @@ -293,7 +293,7 @@ fn test_install_snapshot_update_accepted() -> anyhow::Result<()> {
// Snapshot will be installed and `accepted` should be updated.
let mut eng = eng();

eng.following_handler().install_complete_snapshot(Snapshot {
eng.following_handler().install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(100, 1, 100)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/engine/handler/following_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ where C: RaftTypeConfig
/// Refer to [`snapshot_replication`](crate::docs::protocol::replication::snapshot_replication)
/// for the reason the following workflow is needed.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn install_complete_snapshot(&mut self, snapshot: Snapshot<C>) {
pub(crate) fn install_full_snapshot(&mut self, snapshot: Snapshot<C>) {
let meta = &snapshot.meta;
tracing::info!("install_complete_snapshot: meta:{:?}", meta);
tracing::info!("install_full_snapshot: meta:{:?}", meta);

let snap_last_log_id = meta.last_log_id;

Expand Down Expand Up @@ -285,7 +285,7 @@ where C: RaftTypeConfig
meta.last_membership.clone(),
));

self.output.push_command(Command::from(sm::Command::install_complete_snapshot(snapshot)));
self.output.push_command(Command::from(sm::Command::install_full_snapshot(snapshot)));

self.state.purge_upto = Some(snap_last_log_id);
self.log_handler().purge_log();
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/network/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ where C: RaftTypeConfig
///
/// The `vote` is the leader vote which is used to check if the leader is still valid by a
/// follower.
/// When the follower finished receiving snapshot, it calls `Raft::install_complete_snapshot()`
/// When the follower finished receiving snapshot, it calls `Raft::install_full_snapshot()`
/// with this vote.
///
/// `cancel` get `Ready` when the caller decides to cancel this snapshot transmission.
#[cfg(feature = "generic-snapshot-data")]
async fn snapshot(
async fn full_snapshot(
&mut self,
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
Expand All @@ -86,7 +86,7 @@ where C: RaftTypeConfig
) -> Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>;

#[cfg(not(feature = "generic-snapshot-data"))]
async fn snapshot(
async fn full_snapshot(
&mut self,
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/network/stream_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<C: RaftTypeConfig> SnapshotTransport<C> for Chunked {
/// Stream snapshot by chunks.
///
/// This function is for backward compatibility and provides a default implement for
/// `RaftNetwork::snapshot()` upon `RafNetwork::install_snapshot()`. This implementation
/// `RaftNetwork::full_snapshot()` upon `RafNetwork::install_snapshot()`. This implementation
/// requires `SnapshotData` to be `AsyncRead + AsyncSeek`.
///
/// The argument `vote` is the leader's vote which is used to check if the leader is still valid
Expand Down Expand Up @@ -96,7 +96,7 @@ impl<C: RaftTypeConfig> SnapshotTransport<C> for Chunked {
snapshot.snapshot.seek(SeekFrom::Start(offset)).await.sto_res(subject_verb)?;

// Safe unwrap(): this function is called only by default implementation of
// `RaftNetwork::snapshot()` and it is always set.
// `RaftNetwork::full_snapshot()` and it is always set.
let chunk_size = option.snapshot_chunk_size().unwrap();
let mut buf = Vec::with_capacity(chunk_size);
while buf.capacity() > buf.len() {
Expand Down
Loading

0 comments on commit 685fe8d

Please sign in to comment.