Skip to content

Commit

Permalink
Change: move external command trigger to dedicated Trigger struct (#888)
Browse files Browse the repository at this point in the history
* Refactor: move RaftTypeConfig to separate file

* Change: move external command trigger to dedicated Trigger struct

Moved trigger election, heartbeat, snapshot and purge log from `Raft`
to a new `Trigger` struct, to separate externally trigger actions from
the main Raft API.

---

Marked the old trigger methods in `Raft` as deprecated, and recommended
using the new `Trigger` struct instead.

The main motivation of these changes is to organize the Raft API in a
more structured way, by extracting trigger actions into a dedicated
struct, instead of mixing them together in the `Raft` API.
  • Loading branch information
drmingdrmer committed Jul 1, 2023
1 parent 7eccd06 commit 4c488a6
Show file tree
Hide file tree
Showing 18 changed files with 207 additions and 117 deletions.
3 changes: 2 additions & 1 deletion openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub mod raft;
pub mod storage;
pub mod testing;
pub mod timer;
pub mod type_config;

pub(crate) mod engine;
pub(crate) mod log_id_range;
Expand All @@ -89,6 +90,7 @@ pub use macros::add_async_trait;
pub use network::RPCTypes;
pub use network::RaftNetwork;
pub use network::RaftNetworkFactory;
pub use type_config::RaftTypeConfig;

pub use crate::async_runtime::AsyncRuntime;
pub use crate::async_runtime::TokioRuntime;
Expand All @@ -114,7 +116,6 @@ pub use crate::node::EmptyNode;
pub use crate::node::Node;
pub use crate::node::NodeId;
pub use crate::raft::Raft;
pub use crate::raft::RaftTypeConfig;
pub use crate::raft_state::MembershipState;
pub use crate::raft_state::RaftState;
pub use crate::raft_types::SnapshotId;
Expand Down
16 changes: 16 additions & 0 deletions openraft/src/raft/core_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use crate::error::Fatal;
use crate::AsyncRuntime;
use crate::NodeId;

/// The running state of RaftCore
pub(in crate::raft) enum CoreState<NID, A>
where
NID: NodeId,
A: AsyncRuntime,
{
/// The RaftCore task is still running.
Running(A::JoinHandle<Result<(), Fatal<NID>>>),

/// The RaftCore task has finished. The return value of the task is stored.
Done(Result<(), Fatal<NID>>),
}
132 changes: 33 additions & 99 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@

mod message;
mod raft_inner;
mod trigger;

pub(in crate::raft) mod core_state;

use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use core_state::CoreState;
use maplit::btreemap;
pub use message::AppendEntriesRequest;
pub use message::AppendEntriesResponse;
Expand All @@ -17,9 +21,6 @@ pub use message::InstallSnapshotRequest;
pub use message::InstallSnapshotResponse;
pub use message::VoteRequest;
pub use message::VoteResponse;
use tokio::io::AsyncRead;
use tokio::io::AsyncSeek;
use tokio::io::AsyncWrite;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::watch;
Expand All @@ -31,16 +32,13 @@ use tracing::Level;
use crate::config::Config;
use crate::config::RuntimeConfig;
use crate::core::command_state::CommandState;
use crate::core::raft_msg::external_command::ExternalCommand;
use crate::core::raft_msg::RaftMsg;
use crate::core::replication_lag;
use crate::core::sm;
use crate::core::RaftCore;
use crate::core::Tick;
use crate::engine::Engine;
use crate::engine::EngineConfig;
use crate::entry::FromAppData;
use crate::entry::RaftEntry;
use crate::error::CheckIsLeaderError;
use crate::error::ClientWriteError;
use crate::error::Fatal;
Expand All @@ -51,69 +49,19 @@ use crate::membership::IntoNodes;
use crate::metrics::RaftMetrics;
use crate::metrics::Wait;
use crate::network::RaftNetworkFactory;
use crate::node::Node;
use crate::raft::raft_inner::RaftInner;
use crate::raft::trigger::Trigger;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::AppData;
use crate::AppDataResponse;
use crate::AsyncRuntime;
use crate::ChangeMembers;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::MessageSummary;
use crate::NodeId;
use crate::OptionalSend;
use crate::RaftState;
pub use crate::RaftTypeConfig;
use crate::StorageHelper;

/// Configuration of types used by the [`Raft`] core engine.
///
/// The (empty) implementation structure defines request/response types, node ID type
/// and the like. Refer to the documentation of associated types for more information.
///
/// ## Note
///
/// Since Rust cannot automatically infer traits for various inner types using this config
/// type as a parameter, this trait simply uses all the traits required for various types
/// as its supertraits as a workaround. To ease the declaration, the macro
/// `declare_raft_types` is provided, which can be used to declare the type easily.
///
/// Example:
/// ```ignore
/// openraft::declare_raft_types!(
/// /// Declare the type configuration for `MemStore`.
/// pub Config: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId
/// );
/// ```
pub trait RaftTypeConfig:
Sized + Send + Sync + Debug + Clone + Copy + Default + Eq + PartialEq + Ord + PartialOrd + 'static
{
/// Application-specific request data passed to the state machine.
type D: AppData;

/// Application-specific response data returned by the state machine.
type R: AppDataResponse;

/// A Raft node's ID.
type NodeId: NodeId;

/// Raft application level node data
type Node: Node;

/// Raft log entry, which can be built from an AppData.
type Entry: RaftEntry<Self::NodeId, Self::Node> + FromAppData<Self::D>;

/// Snapshot data for exposing a snapshot for reading & writing.
///
/// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#implement-raftstorage)
/// for details on where and how this is used.
type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + Sync + Unpin + 'static;

/// Asynchronous runtime type.
type AsyncRuntime: AsyncRuntime;
}

/// Define types for a Raft type configuration.
///
/// Since Rust has some limitations when deriving traits for types with generic arguments
Expand All @@ -125,8 +73,14 @@ pub trait RaftTypeConfig:
/// Example:
/// ```ignore
/// openraft::declare_raft_types!(
/// /// Declare the type configuration for `MemStore`.
/// pub Config: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId
/// pub Config:
/// D = ClientRequest,
/// R = ClientResponse,
/// NodeId = u64,
/// Node = openraft::BasicNode,
/// Entry = openraft::Entry<TypeConfig>,
/// SnapshotData = Cursor<Vec<u8>>,
/// AsyncRuntime = openraft::TokioRuntime,
/// );
/// ```
#[macro_export]
Expand All @@ -146,19 +100,6 @@ macro_rules! declare_raft_types {
};
}

/// The running state of RaftCore
enum CoreState<NID, A>
where
NID: NodeId,
A: AsyncRuntime,
{
/// The RaftCore task is still running.
Running(A::JoinHandle<Result<(), Fatal<NID>>>),

/// The RaftCore task has finished. The return value of the task is stored.
Done(Result<(), Fatal<NID>>),
}

/// The Raft API.
///
/// This type implements the full Raft spec, and is the interface to a running Raft node.
Expand Down Expand Up @@ -337,46 +278,39 @@ where
self.inner.runtime_config.enable_elect.store(enabled, Ordering::Relaxed);
}

/// Trigger election at once and return at once.
/// Return a handle to manually trigger raft actions, such as elect or build snapshot.
///
/// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error.
/// It is not affected by `Raft::enable_elect(false)`.
/// Example:
/// ```ignore
/// let raft = Raft::new(...).await?;
/// raft.trigger().elect().await?;
/// ```
pub fn trigger(&self) -> Trigger<C, N, LS> {
Trigger::new(self.inner.as_ref())
}

/// Trigger election at once and return at once.
#[deprecated(note = "use `Raft::trigger().elect()` instead")]
pub async fn trigger_elect(&self) -> Result<(), Fatal<C::NodeId>> {
self.inner.send_external_command(ExternalCommand::Elect, "trigger_elect").await
self.trigger().elect().await
}

/// Trigger a heartbeat at once and return at once.
///
/// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error.
/// It is not affected by `Raft::enable_heartbeat(false)`.
#[deprecated(note = "use `Raft::trigger().heartbeat()` instead")]
pub async fn trigger_heartbeat(&self) -> Result<(), Fatal<C::NodeId>> {
self.inner.send_external_command(ExternalCommand::Heartbeat, "trigger_heartbeat").await
self.trigger().heartbeat().await
}

/// Trigger to build a snapshot at once and return at once.
///
/// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error.
#[deprecated(note = "use `Raft::trigger().snapshot()` instead")]
pub async fn trigger_snapshot(&self) -> Result<(), Fatal<C::NodeId>> {
self.inner.send_external_command(ExternalCommand::Snapshot, "trigger_snapshot").await
self.trigger().snapshot().await
}

/// Initiate the log purge up to and including the given `upto` log index.
///
/// Logs that are not included in a snapshot will **NOT** be purged.
/// In such scenario it will delete as many log as possible.
/// The [`max_in_snapshot_log_to_keep`] config is not taken into account
/// when purging logs.
///
/// It returns error only when RaftCore has [`Fatal`] error, e.g. shut down or having storage
/// error.
///
/// Openraft won't purge logs at once, e.g. it may be delayed by several seconds, because if it
/// is a leader and a replication task has been replicating the logs to a follower, the logs
/// can't be purged until the replication task is finished.
///
/// [`max_in_snapshot_log_to_keep`]: `crate::Config::max_in_snapshot_log_to_keep`
#[deprecated(note = "use `Raft::trigger().purge_log()` instead")]
pub async fn purge_log(&self, upto: u64) -> Result<(), Fatal<C::NodeId>> {
self.inner.send_external_command(ExternalCommand::PurgeLog { upto }, "purge_log").await
self.trigger().purge_log(upto).await
}

/// Submit an AppendEntries RPC to this Raft node.
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::core::raft_msg::external_command::ExternalCommand;
use crate::core::raft_msg::RaftMsg;
use crate::core::TickHandle;
use crate::error::Fatal;
use crate::raft::CoreState;
use crate::raft::core_state::CoreState;
use crate::storage::RaftLogStorage;
use crate::AsyncRuntime;
use crate::Config;
Expand Down
71 changes: 71 additions & 0 deletions openraft/src/raft/trigger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//! Trigger an action to RaftCore by external caller.

use crate::core::raft_msg::external_command::ExternalCommand;
use crate::error::Fatal;
use crate::raft::RaftInner;
use crate::storage::RaftLogStorage;
use crate::RaftNetworkFactory;
use crate::RaftTypeConfig;

/// Trigger is an interface to trigger an action to RaftCore by external caller.
pub struct Trigger<'r, C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
{
raft_inner: &'r RaftInner<C, N, LS>,
}

impl<'r, C, N, LS> Trigger<'r, C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
{
pub(in crate::raft) fn new(raft_inner: &'r RaftInner<C, N, LS>) -> Self {
Self { raft_inner }
}

/// Trigger election at once and return at once.
///
/// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error.
/// It is not affected by `Raft::enable_elect(false)`.
pub async fn elect(&self) -> Result<(), Fatal<C::NodeId>> {
self.raft_inner.send_external_command(ExternalCommand::Elect, "trigger_elect").await
}

/// Trigger a heartbeat at once and return at once.
///
/// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error.
/// It is not affected by `Raft::enable_heartbeat(false)`.
pub async fn heartbeat(&self) -> Result<(), Fatal<C::NodeId>> {
self.raft_inner.send_external_command(ExternalCommand::Heartbeat, "trigger_heartbeat").await
}

/// Trigger to build a snapshot at once and return at once.
///
/// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error.
pub async fn snapshot(&self) -> Result<(), Fatal<C::NodeId>> {
self.raft_inner.send_external_command(ExternalCommand::Snapshot, "trigger_snapshot").await
}

/// Initiate the log purge up to and including the given `upto` log index.
///
/// Logs that are not included in a snapshot will **NOT** be purged.
/// In such scenario it will delete as many log as possible.
/// The [`max_in_snapshot_log_to_keep`] config is not taken into account
/// when purging logs.
///
/// It returns error only when RaftCore has [`Fatal`] error, e.g. shut down or having storage
/// error.
///
/// Openraft won't purge logs at once, e.g. it may be delayed by several seconds, because if it
/// is a leader and a replication task has been replicating the logs to a follower, the logs
/// can't be purged until the replication task is finished.
///
/// [`max_in_snapshot_log_to_keep`]: `crate::Config::max_in_snapshot_log_to_keep`
pub async fn purge_log(&self, upto: u64) -> Result<(), Fatal<C::NodeId>> {
self.raft_inner.send_external_command(ExternalCommand::PurgeLog { upto }, "purge_log").await
}
}
Loading

0 comments on commit 4c488a6

Please sign in to comment.