Skip to content

Commit

Permalink
Refactor: move raft message types to separate files (#886)
Browse files Browse the repository at this point in the history
- `raft::message::AppendEntriesRequest`
- `raft::message::AppendEntriesResponse`
- `raft::message::ClientWriteResponse`
- `raft::message::InstallSnapshotRequest`
- `raft::message::InstallSnapshotResponse`
- `raft::message::VoteRequest`
- `raft::message::VoteResponse`
  • Loading branch information
drmingdrmer committed Jul 1, 2023
1 parent 9ea796a commit 732304b
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 242 deletions.
117 changes: 117 additions & 0 deletions openraft/src/raft/message/append_entries.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use std::fmt;

use crate::display_ext::DisplayOptionExt;
use crate::display_ext::DisplaySlice;
use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;
use crate::RaftTypeConfig;
use crate::Vote;

/// An RPC sent by a cluster leader to replicate log entries (§5.3), and as a heartbeat (§5.2).
#[derive(Clone)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct AppendEntriesRequest<C: RaftTypeConfig> {
pub vote: Vote<C::NodeId>,

pub prev_log_id: Option<LogId<C::NodeId>>,

/// The new log entries to store.
///
/// This may be empty when the leader is sending heartbeats. Entries
/// are batched for efficiency.
pub entries: Vec<C::Entry>,

/// The leader's committed log id.
pub leader_commit: Option<LogId<C::NodeId>>,
}

impl<C: RaftTypeConfig> fmt::Debug for AppendEntriesRequest<C>
where C::D: fmt::Debug
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AppendEntriesRequest")
.field("vote", &self.vote)
.field("prev_log_id", &self.prev_log_id)
.field("entries", &self.entries)
.field("leader_commit", &self.leader_commit)
.finish()
}
}

impl<C: RaftTypeConfig> MessageSummary<AppendEntriesRequest<C>> for AppendEntriesRequest<C> {
fn summary(&self) -> String {
format!(
"vote={}, prev_log_id={}, leader_commit={}, entries={}",
self.vote,
self.prev_log_id.summary(),
self.leader_commit.summary(),
DisplaySlice::<_>(self.entries.as_slice())
)
}
}

/// The response to an `AppendEntriesRequest`.
///
/// [`RaftNetwork::send_append_entries`] returns this type only when received an RPC reply.
/// Otherwise it should return [`RPCError`].
///
/// [`RPCError`]: crate::error::RPCError
/// [`RaftNetwork::send_append_entries`]: crate::network::RaftNetwork::send_append_entries
#[derive(Debug)]
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub enum AppendEntriesResponse<NID: NodeId> {
/// Successfully replicated all log entries to the target node.
Success,

/// Successfully sent the first portion of log entries.
///
/// [`RaftNetwork::send_append_entries`] can return a partial success.
/// For example, it tries to send log entries `[1-2..3-10]`, the application is allowed to send
/// just `[1-2..1-3]` and return `PartialSuccess(1-3)`,
///
/// ### Caution
///
/// The returned matching log id must be **greater than or equal to** the first log
/// id([`AppendEntriesRequest::prev_log_id`]) of the entries to send. If no RPC reply is
/// received, [`RaftNetwork::send_append_entries`] must return an [`RPCError`] to inform
/// Openraft that the first log id([`AppendEntriesRequest::prev_log_id`]) may not match on
/// the remote target node.
///
/// [`RPCError`]: crate::error::RPCError
/// [`RaftNetwork::send_append_entries`]: crate::network::RaftNetwork::send_append_entries
PartialSuccess(Option<LogId<NID>>),

/// The first log id([`AppendEntriesRequest::prev_log_id`]) of the entries to send does not
/// match on the remote target node.
Conflict,

/// Seen a vote `v` that does not hold `mine_vote >= v`.
/// And a leader's vote(committed vote) must be total order with other vote.
/// Therefore it has to be a higher vote: `mine_vote < v`
HigherVote(Vote<NID>),
}

impl<NID: NodeId> AppendEntriesResponse<NID> {
pub fn is_success(&self) -> bool {
matches!(*self, AppendEntriesResponse::Success)
}

pub fn is_conflict(&self) -> bool {
matches!(*self, AppendEntriesResponse::Conflict)
}
}

impl<NID: NodeId> fmt::Display for AppendEntriesResponse<NID> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AppendEntriesResponse::Success => write!(f, "Success"),
AppendEntriesResponse::PartialSuccess(m) => {
write!(f, "PartialSuccess({})", m.display())
}
AppendEntriesResponse::HigherVote(vote) => write!(f, "Higher vote, {}", vote),
AppendEntriesResponse::Conflict => write!(f, "Conflict"),
}
}
}
42 changes: 42 additions & 0 deletions openraft/src/raft/message/client_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::fmt::Debug;

use crate::AppDataResponse;

Check warning on line 3 in openraft/src/raft/message/client_write.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly)

unused import: `crate::AppDataResponse`

Check warning on line 3 in openraft/src/raft/message/client_write.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader)

unused import: `crate::AppDataResponse`

Check warning on line 3 in openraft/src/raft/message/client_write.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader)

unused import: `crate::AppDataResponse`

Check warning on line 3 in openraft/src/raft/message/client_write.rs

View workflow job for this annotation

GitHub Actions / openraft-test-bench (nightly)

unused import: `crate::AppDataResponse`

Check warning on line 3 in openraft/src/raft/message/client_write.rs

View workflow job for this annotation

GitHub Actions / coverage

unused import: `crate::AppDataResponse`

Check warning on line 3 in openraft/src/raft/message/client_write.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

unused import: `crate::AppDataResponse`

Check warning on line 3 in openraft/src/raft/message/client_write.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

unused import: `crate::AppDataResponse`

Check warning on line 3 in openraft/src/raft/message/client_write.rs

View workflow job for this annotation

GitHub Actions / openraft-test (stable, 0)

unused import: `crate::AppDataResponse`

Check warning on line 3 in openraft/src/raft/message/client_write.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

unused import: `crate::AppDataResponse`
use crate::LogId;
use crate::Membership;
use crate::MessageSummary;
use crate::RaftTypeConfig;

/// The response to a client-request.
#[cfg_attr(
feature = "serde",
derive(serde::Deserialize, serde::Serialize),
serde(bound = "C::R: AppDataResponse")
)]
pub struct ClientWriteResponse<C: RaftTypeConfig> {
/// The id of the log that is applied.
pub log_id: LogId<C::NodeId>,

/// Application specific response data.
pub data: C::R,

/// If the log entry is a change-membership entry.
pub membership: Option<Membership<C::NodeId, C::Node>>,
}

impl<C: RaftTypeConfig> Debug for ClientWriteResponse<C>
where C::R: Debug
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ClientWriteResponse")
.field("log_id", &self.log_id)
.field("data", &self.data)
.field("membership", &self.membership)
.finish()
}
}

impl<C: RaftTypeConfig> MessageSummary<ClientWriteResponse<C>> for ClientWriteResponse<C> {
fn summary(&self) -> String {
format!("log_id: {}, membership: {:?}", self.log_id, self.membership)
}
}
47 changes: 47 additions & 0 deletions openraft/src/raft/message/install_snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use crate::MessageSummary;
use crate::NodeId;
use crate::RaftTypeConfig;
use crate::SnapshotMeta;
use crate::Vote;

/// An RPC sent by the Raft leader to send chunks of a snapshot to a follower (§7).
#[derive(Clone, Debug)]
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct InstallSnapshotRequest<C: RaftTypeConfig> {
pub vote: Vote<C::NodeId>,

/// Metadata of a snapshot: snapshot_id, last_log_ed membership etc.
pub meta: SnapshotMeta<C::NodeId, C::Node>,

/// The byte offset where this chunk of data is positioned in the snapshot file.
pub offset: u64,
/// The raw bytes of the snapshot chunk, starting at `offset`.
pub data: Vec<u8>,

/// Will be `true` if this is the last chunk in the snapshot.
pub done: bool,
}

impl<C: RaftTypeConfig> MessageSummary<InstallSnapshotRequest<C>> for InstallSnapshotRequest<C> {
fn summary(&self) -> String {
format!(
"vote={}, meta={}, offset={}, len={}, done={}",
self.vote,
self.meta,
self.offset,
self.data.len(),
self.done
)
}
}

/// The response to an `InstallSnapshotRequest`.
#[derive(Debug)]
#[derive(PartialEq, Eq)]
#[derive(derive_more::Display)]
#[display(fmt = "{{vote:{}}}", vote)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct InstallSnapshotResponse<NID: NodeId> {
pub vote: Vote<NID>,
}
18 changes: 18 additions & 0 deletions openraft/src/raft/message/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//! Raft protocol messages and types.
//!
//! Request and response types for an application to talk to the Raft,
//! and are also used by network layer to talk to other Raft nodes.

mod append_entries;
mod install_snapshot;
mod vote;

mod client_write;

pub use append_entries::AppendEntriesRequest;
pub use append_entries::AppendEntriesResponse;
pub use client_write::ClientWriteResponse;
pub use install_snapshot::InstallSnapshotRequest;
pub use install_snapshot::InstallSnapshotResponse;
pub use vote::VoteRequest;
pub use vote::VoteResponse;
59 changes: 59 additions & 0 deletions openraft/src/raft/message/vote.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::fmt;

use crate::display_ext::DisplayOptionExt;
use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;
use crate::Vote;

/// An RPC sent by candidates to gather votes (§5.2).
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct VoteRequest<NID: NodeId> {
pub vote: Vote<NID>,
pub last_log_id: Option<LogId<NID>>,
}

impl<NID: NodeId> fmt::Display for VoteRequest<NID> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{{vote:{}, last_log:{}}}", self.vote, self.last_log_id.display(),)
}
}

impl<NID: NodeId> MessageSummary<VoteRequest<NID>> for VoteRequest<NID> {
fn summary(&self) -> String {
self.to_string()
}
}

impl<NID: NodeId> VoteRequest<NID> {
pub fn new(vote: Vote<NID>, last_log_id: Option<LogId<NID>>) -> Self {
Self { vote, last_log_id }
}
}

/// The response to a `VoteRequest`.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct VoteResponse<NID: NodeId> {
/// vote after a node handling vote-request.
/// Thus `resp.vote >= req.vote` always holds.
pub vote: Vote<NID>,

/// Will be true if the candidate received a vote from the responder.
pub vote_granted: bool,

/// The last log id stored on the remote voter.
pub last_log_id: Option<LogId<NID>>,
}

impl<NID: NodeId> MessageSummary<VoteResponse<NID>> for VoteResponse<NID> {
fn summary(&self) -> String {
format!(
"{{granted:{}, {}, last_log:{:?}}}",
self.vote_granted,
self.vote,
self.last_log_id.map(|x| x.to_string())
)
}
}
Loading

0 comments on commit 732304b

Please sign in to comment.