Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: add rkyv support #879

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ maplit = "1.0.2"
pin-utils = "0.1.0"
pretty_assertions = "1.0.0"
rand = "0.8"
rkyv = { version = "0.7.42", features=["validation"] }
serde = { version="1.0.114", features=["derive", "rc"]}
serde_json = "1.0.57"
tempfile = { version = "3.4.0" }
Expand Down
5 changes: 5 additions & 0 deletions openraft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pin-utils = { workspace = true }
rand = { workspace = true }
serde = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
rkyv = { workspace = true, optional = true }
clap = { workspace = true }
tempfile = { workspace = true, optional = true }
thiserror = { workspace = true }
Expand Down Expand Up @@ -58,6 +59,10 @@ bt = ["anyerror/backtrace", "anyhow/backtrace"]
# If you'd like to use `serde` to serialize messages.
serde = ["dep:serde"]

# Add rkyv::Archive, rkyv:Deserialize and rkyv::Serialize derives and archive(check_bytes) to data types.
# If you'd like to use `rkyv` to serialize messages.
rkyv = ["dep:rkyv"]

# Turn on this feature it allows at most ONE quorum-granted leader for each term.
# This is the way standard raft does, by making the LeaderId a partial order value.
#
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/change_members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ use crate::NodeId;
#[derive(Debug, Clone)]
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub enum ChangeMembers<NID: NodeId, N: Node> {
/// Upgrade learners to voters.
///
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/core/server_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
#[derive(Debug, Clone, Copy, Default)]
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub enum ServerState {
/// The node is completely passive; replicating entries, but neither voting nor timing out.
#[default]
Expand Down
3 changes: 3 additions & 0 deletions openraft/src/docs/feature_flags/feature-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ By default openraft enables no features.
- `serde`: derives `serde::Serialize, serde::Deserialize` for type that are used
in storage and network, such as `Vote` or `AppendEntriesRequest`.

- `rkyv`: derives `rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, rkyv::CheckBytes`
for types that are used in storage and network, such as `Vote` or `AppendEntriesRequest`.

- `single-term-leader`: allows only one leader to be elected in each `term`.
This is the standard raft policy, which increases election confliction rate
but reduce `LogId`(`(term, node_id, index)` to `(term, index)`) size.
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/entry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ pub use traits::RaftPayload;

/// A Raft log entry.
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub struct Entry<C>
where C: RaftTypeConfig
{
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/entry/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ use crate::RaftTypeConfig;
/// Log entry payload variants.
#[derive(PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub enum EntryPayload<C: RaftTypeConfig> {
/// An empty payload committed by a new cluster leader.
Blank,
Expand Down
35 changes: 35 additions & 0 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ use crate::Vote;
derive(serde::Deserialize, serde::Serialize),
serde(bound = "E:serde::Serialize + for <'d> serde::Deserialize<'d>")
)]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]

pub enum RaftError<NID, E = Infallible>
where NID: NodeId
{
Expand Down Expand Up @@ -470,6 +476,35 @@ pub struct EmptyMembership {}
#[error("infallible")]
pub enum Infallible {}

#[cfg(feature = "rkyv")]
mod rkyv_serialization {
//! Manual implementations for the required `rkyv` traits since it is not
//! possible to derive them on an enum with no variant. All implementations
//! use `unreachable` since `Infallible` cannot be constructed.

impl rkyv::Archive for super::Infallible {
type Archived = ();
type Resolver = ();

#[inline]
unsafe fn resolve(&self, _: usize, _: Self::Resolver, _: *mut Self::Archived) {
unreachable!()
}
}

impl<S: rkyv::Fallible + ?Sized> rkyv::Serialize<S> for super::Infallible {
fn serialize(&self, _: &mut S) -> Result<Self::Resolver, S::Error> {
unreachable!()
}
}

impl<D: rkyv::Fallible + ?Sized> rkyv::Deserialize<super::Infallible, D> for u16 {
fn deserialize(&self, _: &mut D) -> Result<super::Infallible, D::Error> {
unreachable!()
}
}
}

/// A place holder to mark RaftError won't have a ForwardToLeader variant.
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/log_id/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ use crate::NodeId;
/// parts: a leader id, which refers to the leader that proposed this log, and an integer index.
#[derive(Debug, Default, Copy, Clone, PartialOrd, Ord, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub struct LogId<NID: NodeId> {
/// The id of the leader that proposed this log
pub leader_id: CommittedLeaderId<NID>,
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/membership/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ use crate::NodeId;
/// of a majority of every config.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub struct Membership<NID, N>
where
N: Node,
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/membership/stored_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ use crate::NodeId;
#[derive(Clone, Debug, Default)]
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub struct StoredMembership<NID, N>
where
N: Node,
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ use crate::Vote;
/// A set of metrics describing the current state of a Raft node.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub struct RaftMetrics<NID, N>
where
NID: NodeId,
Expand Down
35 changes: 35 additions & 0 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,11 @@ impl fmt::Display for ExternalCommand {
/// An RPC sent by a cluster leader to replicate log entries (§5.3), and as a heartbeat (§5.2).
#[derive(Clone)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub struct AppendEntriesRequest<C: RaftTypeConfig> {
pub vote: Vote<C::NodeId>,

Expand Down Expand Up @@ -1110,6 +1115,11 @@ impl<C: RaftTypeConfig> MessageSummary<AppendEntriesRequest<C>> for AppendEntrie
#[derive(Debug)]
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub enum AppendEntriesResponse<NID: NodeId> {
/// Successfully replicated all log entries to the target node.
Success,
Expand Down Expand Up @@ -1168,6 +1178,11 @@ impl<NID: NodeId> fmt::Display for AppendEntriesResponse<NID> {
/// An RPC sent by candidates to gather votes (§5.2).
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub struct VoteRequest<NID: NodeId> {
pub vote: Vote<NID>,
pub last_log_id: Option<LogId<NID>>,
Expand All @@ -1194,6 +1209,11 @@ impl<NID: NodeId> VoteRequest<NID> {
/// The response to a `VoteRequest`.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub struct VoteResponse<NID: NodeId> {
/// vote after a node handling vote-request.
/// Thus `resp.vote >= req.vote` always holds.
Expand Down Expand Up @@ -1221,6 +1241,11 @@ impl<NID: NodeId> MessageSummary<VoteResponse<NID>> for VoteResponse<NID> {
#[derive(Clone, Debug)]
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub struct InstallSnapshotRequest<C: RaftTypeConfig> {
pub vote: Vote<C::NodeId>,

Expand Down Expand Up @@ -1255,6 +1280,11 @@ impl<C: RaftTypeConfig> MessageSummary<InstallSnapshotRequest<C>> for InstallSna
#[derive(derive_more::Display)]
#[display(fmt = "{{vote:{}}}", vote)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub struct InstallSnapshotResponse<NID: NodeId> {
pub vote: Vote<NID>,
}
Expand All @@ -1265,6 +1295,11 @@ pub struct InstallSnapshotResponse<NID: NodeId> {
derive(serde::Deserialize, serde::Serialize),
serde(bound = "C::R: AppDataResponse")
)]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub struct ClientWriteResponse<C: RaftTypeConfig> {
/// The id of the log that is applied.
pub log_id: LogId<C::NodeId>,
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ use crate::Vote;

#[derive(Debug, Clone, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub struct SnapshotMeta<NID, N>
where
NID: NodeId,
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/vote/leader_id/leader_id_adv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ use crate::NodeId;
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
#[derive(PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub struct LeaderId<NID>
where NID: NodeId
{
Expand Down
10 changes: 10 additions & 0 deletions openraft/src/vote/leader_id/leader_id_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@

#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub struct LeaderId<NID>
where NID: NodeId
{
Expand Down Expand Up @@ -76,6 +81,11 @@
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
#[derive(PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
#[cfg_attr(feature = "serde", serde(transparent))]
pub struct CommittedLeaderId<NID> {
pub term: u64,
Expand All @@ -97,7 +107,7 @@

#[cfg(test)]
mod tests {
use crate::CommittedLeaderId;

Check warning on line 110 in openraft/src/vote/leader_id/leader_id_std.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader)

unused import: `crate::CommittedLeaderId`

Check warning on line 110 in openraft/src/vote/leader_id/leader_id_std.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

unused import: `crate::CommittedLeaderId`
use crate::LeaderId;

#[cfg(feature = "serde")]
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/vote/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ use crate::NodeId;
/// `Vote` represent the privilege of a node.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize),
archive(check_bytes)
)]
pub struct Vote<NID: NodeId> {
/// The id of the node that tries to become the leader.
pub leader_id: LeaderId<NID>,
Expand Down
Loading