diff --git a/openraft/src/raft/message/append_entries.rs b/openraft/src/raft/message/append_entries.rs new file mode 100644 index 000000000..5980110d8 --- /dev/null +++ b/openraft/src/raft/message/append_entries.rs @@ -0,0 +1,117 @@ +use std::fmt; + +use crate::display_ext::DisplayOptionExt; +use crate::display_ext::DisplaySlice; +use crate::LogId; +use crate::MessageSummary; +use crate::NodeId; +use crate::RaftTypeConfig; +use crate::Vote; + +/// An RPC sent by a cluster leader to replicate log entries (§5.3), and as a heartbeat (§5.2). +#[derive(Clone)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] +pub struct AppendEntriesRequest { + pub vote: Vote, + + pub prev_log_id: Option>, + + /// The new log entries to store. + /// + /// This may be empty when the leader is sending heartbeats. Entries + /// are batched for efficiency. + pub entries: Vec, + + /// The leader's committed log id. + pub leader_commit: Option>, +} + +impl fmt::Debug for AppendEntriesRequest +where C::D: fmt::Debug +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AppendEntriesRequest") + .field("vote", &self.vote) + .field("prev_log_id", &self.prev_log_id) + .field("entries", &self.entries) + .field("leader_commit", &self.leader_commit) + .finish() + } +} + +impl MessageSummary> for AppendEntriesRequest { + fn summary(&self) -> String { + format!( + "vote={}, prev_log_id={}, leader_commit={}, entries={}", + self.vote, + self.prev_log_id.summary(), + self.leader_commit.summary(), + DisplaySlice::<_>(self.entries.as_slice()) + ) + } +} + +/// The response to an `AppendEntriesRequest`. +/// +/// [`RaftNetwork::send_append_entries`] returns this type only when received an RPC reply. +/// Otherwise it should return [`RPCError`]. +/// +/// [`RPCError`]: crate::error::RPCError +/// [`RaftNetwork::send_append_entries`]: crate::network::RaftNetwork::send_append_entries +#[derive(Debug)] +#[derive(PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] +pub enum AppendEntriesResponse { + /// Successfully replicated all log entries to the target node. + Success, + + /// Successfully sent the first portion of log entries. + /// + /// [`RaftNetwork::send_append_entries`] can return a partial success. + /// For example, it tries to send log entries `[1-2..3-10]`, the application is allowed to send + /// just `[1-2..1-3]` and return `PartialSuccess(1-3)`, + /// + /// ### Caution + /// + /// The returned matching log id must be **greater than or equal to** the first log + /// id([`AppendEntriesRequest::prev_log_id`]) of the entries to send. If no RPC reply is + /// received, [`RaftNetwork::send_append_entries`] must return an [`RPCError`] to inform + /// Openraft that the first log id([`AppendEntriesRequest::prev_log_id`]) may not match on + /// the remote target node. + /// + /// [`RPCError`]: crate::error::RPCError + /// [`RaftNetwork::send_append_entries`]: crate::network::RaftNetwork::send_append_entries + PartialSuccess(Option>), + + /// The first log id([`AppendEntriesRequest::prev_log_id`]) of the entries to send does not + /// match on the remote target node. + Conflict, + + /// Seen a vote `v` that does not hold `mine_vote >= v`. + /// And a leader's vote(committed vote) must be total order with other vote. + /// Therefore it has to be a higher vote: `mine_vote < v` + HigherVote(Vote), +} + +impl AppendEntriesResponse { + pub fn is_success(&self) -> bool { + matches!(*self, AppendEntriesResponse::Success) + } + + pub fn is_conflict(&self) -> bool { + matches!(*self, AppendEntriesResponse::Conflict) + } +} + +impl fmt::Display for AppendEntriesResponse { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AppendEntriesResponse::Success => write!(f, "Success"), + AppendEntriesResponse::PartialSuccess(m) => { + write!(f, "PartialSuccess({})", m.display()) + } + AppendEntriesResponse::HigherVote(vote) => write!(f, "Higher vote, {}", vote), + AppendEntriesResponse::Conflict => write!(f, "Conflict"), + } + } +} diff --git a/openraft/src/raft/message/client_write.rs b/openraft/src/raft/message/client_write.rs new file mode 100644 index 000000000..b063c8c89 --- /dev/null +++ b/openraft/src/raft/message/client_write.rs @@ -0,0 +1,42 @@ +use std::fmt::Debug; + +use crate::AppDataResponse; +use crate::LogId; +use crate::Membership; +use crate::MessageSummary; +use crate::RaftTypeConfig; + +/// The response to a client-request. +#[cfg_attr( + feature = "serde", + derive(serde::Deserialize, serde::Serialize), + serde(bound = "C::R: AppDataResponse") +)] +pub struct ClientWriteResponse { + /// The id of the log that is applied. + pub log_id: LogId, + + /// Application specific response data. + pub data: C::R, + + /// If the log entry is a change-membership entry. + pub membership: Option>, +} + +impl Debug for ClientWriteResponse +where C::R: Debug +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ClientWriteResponse") + .field("log_id", &self.log_id) + .field("data", &self.data) + .field("membership", &self.membership) + .finish() + } +} + +impl MessageSummary> for ClientWriteResponse { + fn summary(&self) -> String { + format!("log_id: {}, membership: {:?}", self.log_id, self.membership) + } +} diff --git a/openraft/src/raft/message/install_snapshot.rs b/openraft/src/raft/message/install_snapshot.rs new file mode 100644 index 000000000..3372e3f81 --- /dev/null +++ b/openraft/src/raft/message/install_snapshot.rs @@ -0,0 +1,47 @@ +use crate::MessageSummary; +use crate::NodeId; +use crate::RaftTypeConfig; +use crate::SnapshotMeta; +use crate::Vote; + +/// An RPC sent by the Raft leader to send chunks of a snapshot to a follower (§7). +#[derive(Clone, Debug)] +#[derive(PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] +pub struct InstallSnapshotRequest { + pub vote: Vote, + + /// Metadata of a snapshot: snapshot_id, last_log_ed membership etc. + pub meta: SnapshotMeta, + + /// The byte offset where this chunk of data is positioned in the snapshot file. + pub offset: u64, + /// The raw bytes of the snapshot chunk, starting at `offset`. + pub data: Vec, + + /// Will be `true` if this is the last chunk in the snapshot. + pub done: bool, +} + +impl MessageSummary> for InstallSnapshotRequest { + fn summary(&self) -> String { + format!( + "vote={}, meta={}, offset={}, len={}, done={}", + self.vote, + self.meta, + self.offset, + self.data.len(), + self.done + ) + } +} + +/// The response to an `InstallSnapshotRequest`. +#[derive(Debug)] +#[derive(PartialEq, Eq)] +#[derive(derive_more::Display)] +#[display(fmt = "{{vote:{}}}", vote)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] +pub struct InstallSnapshotResponse { + pub vote: Vote, +} diff --git a/openraft/src/raft/message/mod.rs b/openraft/src/raft/message/mod.rs new file mode 100644 index 000000000..786fca895 --- /dev/null +++ b/openraft/src/raft/message/mod.rs @@ -0,0 +1,18 @@ +//! Raft protocol messages and types. +//! +//! Request and response types for an application to talk to the Raft, +//! and are also used by network layer to talk to other Raft nodes. + +mod append_entries; +mod install_snapshot; +mod vote; + +mod client_write; + +pub use append_entries::AppendEntriesRequest; +pub use append_entries::AppendEntriesResponse; +pub use client_write::ClientWriteResponse; +pub use install_snapshot::InstallSnapshotRequest; +pub use install_snapshot::InstallSnapshotResponse; +pub use vote::VoteRequest; +pub use vote::VoteResponse; diff --git a/openraft/src/raft/message/vote.rs b/openraft/src/raft/message/vote.rs new file mode 100644 index 000000000..7c7336145 --- /dev/null +++ b/openraft/src/raft/message/vote.rs @@ -0,0 +1,59 @@ +use std::fmt; + +use crate::display_ext::DisplayOptionExt; +use crate::LogId; +use crate::MessageSummary; +use crate::NodeId; +use crate::Vote; + +/// An RPC sent by candidates to gather votes (§5.2). +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] +pub struct VoteRequest { + pub vote: Vote, + pub last_log_id: Option>, +} + +impl fmt::Display for VoteRequest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{{vote:{}, last_log:{}}}", self.vote, self.last_log_id.display(),) + } +} + +impl MessageSummary> for VoteRequest { + fn summary(&self) -> String { + self.to_string() + } +} + +impl VoteRequest { + pub fn new(vote: Vote, last_log_id: Option>) -> Self { + Self { vote, last_log_id } + } +} + +/// The response to a `VoteRequest`. +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] +pub struct VoteResponse { + /// vote after a node handling vote-request. + /// Thus `resp.vote >= req.vote` always holds. + pub vote: Vote, + + /// Will be true if the candidate received a vote from the responder. + pub vote_granted: bool, + + /// The last log id stored on the remote voter. + pub last_log_id: Option>, +} + +impl MessageSummary> for VoteResponse { + fn summary(&self) -> String { + format!( + "{{granted:{}, {}, last_log:{:?}}}", + self.vote_granted, + self.vote, + self.last_log_id.map(|x| x.to_string()) + ) + } +} diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index f391f54b8..59861d4fd 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -1,5 +1,6 @@ //! Public Raft interface and data types. +mod message; mod raft_inner; use std::collections::BTreeMap; @@ -11,6 +12,13 @@ use std::sync::Arc; use std::time::Duration; use maplit::btreemap; +pub use message::AppendEntriesRequest; +pub use message::AppendEntriesResponse; +pub use message::ClientWriteResponse; +pub use message::InstallSnapshotRequest; +pub use message::InstallSnapshotResponse; +pub use message::VoteRequest; +pub use message::VoteResponse; use tokio::io::AsyncRead; use tokio::io::AsyncSeek; use tokio::io::AsyncWrite; @@ -29,8 +37,6 @@ use crate::core::replication_lag; use crate::core::sm; use crate::core::RaftCore; use crate::core::Tick; -use crate::display_ext::DisplayOptionExt; -use crate::display_ext::DisplaySlice; use crate::engine::Engine; use crate::engine::EngineConfig; use crate::entry::FromAppData; @@ -56,14 +62,11 @@ use crate::AsyncRuntime; use crate::ChangeMembers; use crate::LogId; use crate::LogIdOptionExt; -use crate::Membership; use crate::MessageSummary; use crate::NodeId; use crate::OptionalSend; use crate::RaftState; -use crate::SnapshotMeta; use crate::StorageHelper; -use crate::Vote; /// Configuration of types used by the [`Raft`] core engine. /// @@ -967,240 +970,3 @@ impl fmt::Display for ExternalCommand { } } } - -/// An RPC sent by a cluster leader to replicate log entries (§5.3), and as a heartbeat (§5.2). -#[derive(Clone)] -#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub struct AppendEntriesRequest { - pub vote: Vote, - - pub prev_log_id: Option>, - - /// The new log entries to store. - /// - /// This may be empty when the leader is sending heartbeats. Entries - /// are batched for efficiency. - pub entries: Vec, - - /// The leader's committed log id. - pub leader_commit: Option>, -} - -impl Debug for AppendEntriesRequest -where C::D: Debug -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("AppendEntriesRequest") - .field("vote", &self.vote) - .field("prev_log_id", &self.prev_log_id) - .field("entries", &self.entries) - .field("leader_commit", &self.leader_commit) - .finish() - } -} - -impl MessageSummary> for AppendEntriesRequest { - fn summary(&self) -> String { - format!( - "vote={}, prev_log_id={}, leader_commit={}, entries={}", - self.vote, - self.prev_log_id.summary(), - self.leader_commit.summary(), - DisplaySlice::<_>(self.entries.as_slice()) - ) - } -} - -/// The response to an `AppendEntriesRequest`. -/// -/// [`RaftNetwork::send_append_entries`] returns this type only when received an RPC reply. -/// Otherwise it should return [`RPCError`]. -/// -/// [`RPCError`]: crate::error::RPCError -/// [`RaftNetwork::send_append_entries`]: crate::network::RaftNetwork::send_append_entries -#[derive(Debug)] -#[derive(PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub enum AppendEntriesResponse { - /// Successfully replicated all log entries to the target node. - Success, - - /// Successfully sent the first portion of log entries. - /// - /// [`RaftNetwork::send_append_entries`] can return a partial success. - /// For example, it tries to send log entries `[1-2..3-10]`, the application is allowed to send - /// just `[1-2..1-3]` and return `PartialSuccess(1-3)`, - /// - /// ### Caution - /// - /// The returned matching log id must be **greater than or equal to** the first log - /// id([`AppendEntriesRequest::prev_log_id`]) of the entries to send. If no RPC reply is - /// received, [`RaftNetwork::send_append_entries`] must return an [`RPCError`] to inform - /// Openraft that the first log id([`AppendEntriesRequest::prev_log_id`]) may not match on - /// the remote target node. - /// - /// [`RPCError`]: crate::error::RPCError - /// [`RaftNetwork::send_append_entries`]: crate::network::RaftNetwork::send_append_entries - PartialSuccess(Option>), - - /// The first log id([`AppendEntriesRequest::prev_log_id`]) of the entries to send does not - /// match on the remote target node. - Conflict, - - /// Seen a vote `v` that does not hold `mine_vote >= v`. - /// And a leader's vote(committed vote) must be total order with other vote. - /// Therefore it has to be a higher vote: `mine_vote < v` - HigherVote(Vote), -} - -impl AppendEntriesResponse { - pub fn is_success(&self) -> bool { - matches!(*self, AppendEntriesResponse::Success) - } - - pub fn is_conflict(&self) -> bool { - matches!(*self, AppendEntriesResponse::Conflict) - } -} - -impl fmt::Display for AppendEntriesResponse { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - AppendEntriesResponse::Success => write!(f, "Success"), - AppendEntriesResponse::PartialSuccess(m) => { - write!(f, "PartialSuccess({})", m.display()) - } - AppendEntriesResponse::HigherVote(vote) => write!(f, "Higher vote, {}", vote), - AppendEntriesResponse::Conflict => write!(f, "Conflict"), - } - } -} - -/// An RPC sent by candidates to gather votes (§5.2). -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub struct VoteRequest { - pub vote: Vote, - pub last_log_id: Option>, -} - -impl fmt::Display for VoteRequest { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{{vote:{}, last_log:{}}}", self.vote, self.last_log_id.display(),) - } -} - -impl MessageSummary> for VoteRequest { - fn summary(&self) -> String { - self.to_string() - } -} - -impl VoteRequest { - pub fn new(vote: Vote, last_log_id: Option>) -> Self { - Self { vote, last_log_id } - } -} - -/// The response to a `VoteRequest`. -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub struct VoteResponse { - /// vote after a node handling vote-request. - /// Thus `resp.vote >= req.vote` always holds. - pub vote: Vote, - - /// Will be true if the candidate received a vote from the responder. - pub vote_granted: bool, - - /// The last log id stored on the remote voter. - pub last_log_id: Option>, -} - -impl MessageSummary> for VoteResponse { - fn summary(&self) -> String { - format!( - "{{granted:{}, {}, last_log:{:?}}}", - self.vote_granted, - self.vote, - self.last_log_id.map(|x| x.to_string()) - ) - } -} - -/// An RPC sent by the Raft leader to send chunks of a snapshot to a follower (§7). -#[derive(Clone, Debug)] -#[derive(PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub struct InstallSnapshotRequest { - pub vote: Vote, - - /// Metadata of a snapshot: snapshot_id, last_log_ed membership etc. - pub meta: SnapshotMeta, - - /// The byte offset where this chunk of data is positioned in the snapshot file. - pub offset: u64, - /// The raw bytes of the snapshot chunk, starting at `offset`. - pub data: Vec, - - /// Will be `true` if this is the last chunk in the snapshot. - pub done: bool, -} - -impl MessageSummary> for InstallSnapshotRequest { - fn summary(&self) -> String { - format!( - "vote={}, meta={}, offset={}, len={}, done={}", - self.vote, - self.meta, - self.offset, - self.data.len(), - self.done - ) - } -} - -/// The response to an `InstallSnapshotRequest`. -#[derive(Debug)] -#[derive(PartialEq, Eq)] -#[derive(derive_more::Display)] -#[display(fmt = "{{vote:{}}}", vote)] -#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub struct InstallSnapshotResponse { - pub vote: Vote, -} - -/// The response to a client-request. -#[cfg_attr( - feature = "serde", - derive(serde::Deserialize, serde::Serialize), - serde(bound = "C::R: AppDataResponse") -)] -pub struct ClientWriteResponse { - /// The id of the log that is applied. - pub log_id: LogId, - - /// Application specific response data. - pub data: C::R, - - /// If the log entry is a change-membership entry. - pub membership: Option>, -} - -impl Debug for ClientWriteResponse -where C::R: Debug -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ClientWriteResponse") - .field("log_id", &self.log_id) - .field("data", &self.data) - .field("membership", &self.membership) - .finish() - } -} - -impl MessageSummary> for ClientWriteResponse { - fn summary(&self) -> String { - format!("log_id: {}, membership: {:?}", self.log_id, self.membership) - } -}