Skip to content

Commit

Permalink
Refactor: split RaftInner into separate file (#885)
Browse files Browse the repository at this point in the history
* Chore: move "raft" to a separate dir

* Refactor: split RaftInner into separate file

Moved the `RaftInner` struct and its methods into a separate
`raft_inner.rs` file. `Raft` is a control handle for application,
internally used methods should be provided by `RaftInner`, not
`Raft`.
  • Loading branch information
drmingdrmer committed Jul 1, 2023
1 parent 86b46a0 commit 9ea796a
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 100 deletions.
110 changes: 10 additions & 100 deletions openraft/src/raft.rs → openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Public Raft interface and data types.

mod raft_inner;

use std::collections::BTreeMap;
use std::fmt;
use std::fmt::Debug;
Expand Down Expand Up @@ -27,7 +29,6 @@ use crate::core::replication_lag;
use crate::core::sm;
use crate::core::RaftCore;
use crate::core::Tick;
use crate::core::TickHandle;
use crate::display_ext::DisplayOptionExt;
use crate::display_ext::DisplaySlice;
use crate::engine::Engine;
Expand All @@ -46,6 +47,7 @@ use crate::metrics::RaftMetrics;
use crate::metrics::Wait;
use crate::network::RaftNetworkFactory;
use crate::node::Node;
use crate::raft::raft_inner::RaftInner;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::AppData;
Expand Down Expand Up @@ -155,24 +157,6 @@ where
Done(Result<(), Fatal<NID>>),
}

struct RaftInner<C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
{
id: C::NodeId,
config: Arc<Config>,
runtime_config: Arc<RuntimeConfig>,
tick_handle: TickHandle<C>,
tx_api: mpsc::UnboundedSender<RaftMsg<C, N, LS>>,
rx_metrics: watch::Receiver<RaftMetrics<C::NodeId, C::Node>>,
// TODO(xp): it does not need to be a async mutex.
#[allow(clippy::type_complexity)]
tx_shutdown: Mutex<Option<oneshot::Sender<()>>>,
core_state: Mutex<CoreState<C::NodeId, C::AsyncRuntime>>,
}

/// The Raft API.
///
/// This type implements the full Raft spec, and is the interface to a running Raft node.
Expand Down Expand Up @@ -356,22 +340,22 @@ where
/// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error.
/// It is not affected by `Raft::enable_elect(false)`.
pub async fn trigger_elect(&self) -> Result<(), Fatal<C::NodeId>> {
self.send_external_command(ExternalCommand::Elect, "trigger_elect").await
self.inner.send_external_command(ExternalCommand::Elect, "trigger_elect").await
}

/// Trigger a heartbeat at once and return at once.
///
/// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error.
/// It is not affected by `Raft::enable_heartbeat(false)`.
pub async fn trigger_heartbeat(&self) -> Result<(), Fatal<C::NodeId>> {
self.send_external_command(ExternalCommand::Heartbeat, "trigger_heartbeat").await
self.inner.send_external_command(ExternalCommand::Heartbeat, "trigger_heartbeat").await
}

/// Trigger to build a snapshot at once and return at once.
///
/// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error.
pub async fn trigger_snapshot(&self) -> Result<(), Fatal<C::NodeId>> {
self.send_external_command(ExternalCommand::Snapshot, "trigger_snapshot").await
self.inner.send_external_command(ExternalCommand::Snapshot, "trigger_snapshot").await
}

/// Initiate the log purge up to and including the given `upto` log index.
Expand All @@ -390,21 +374,7 @@ where
///
/// [`max_in_snapshot_log_to_keep`]: `crate::Config::max_in_snapshot_log_to_keep`
pub async fn purge_log(&self, upto: u64) -> Result<(), Fatal<C::NodeId>> {
self.send_external_command(ExternalCommand::PurgeLog { upto }, "purge_log").await
}

async fn send_external_command(
&self,
cmd: ExternalCommand,
cmd_desc: impl fmt::Display + Default,
) -> Result<(), Fatal<C::NodeId>> {
let send_res = self.inner.tx_api.send(RaftMsg::ExternalCommand { cmd });

if send_res.is_err() {
let fatal = self.get_core_stopped_error("sending external command to RaftCore", Some(cmd_desc)).await;
return Err(fatal);
}
Ok(())
self.inner.send_external_command(ExternalCommand::PurgeLog { upto }, "purge_log").await
}

/// Submit an AppendEntries RPC to this Raft node.
Expand Down Expand Up @@ -751,7 +721,7 @@ where
let send_res = self.inner.tx_api.send(mes);

if send_res.is_err() {
let fatal = self.get_core_stopped_error("sending tx to RaftCore", sum).await;
let fatal = self.inner.get_core_stopped_error("sending tx to RaftCore", sum).await;
return Err(RaftError::Fatal(fatal));
}

Expand All @@ -761,73 +731,13 @@ where
match recv_res {
Ok(x) => x.map_err(|e| RaftError::APIError(e)),
Err(_) => {
let fatal = self.get_core_stopped_error("receiving rx from RaftCore", sum).await;
let fatal = self.inner.get_core_stopped_error("receiving rx from RaftCore", sum).await;
tracing::error!(error = debug(&fatal), "core_call fatal error");
Err(RaftError::Fatal(fatal))
}
}
}

async fn get_core_stopped_error(
&self,
when: impl fmt::Display,
message_summary: Option<impl fmt::Display + Default>,
) -> Fatal<C::NodeId> {
// Wait for the core task to finish.
self.join_core_task().await;

// Retrieve the result.
let core_res = {
let state = self.inner.core_state.lock().await;
if let CoreState::Done(core_task_res) = &*state {
core_task_res.clone()
} else {
unreachable!("RaftCore should have already quit")
}
};

tracing::error!(
core_result = debug(&core_res),
"failure {}; message: {}",
when,
message_summary.unwrap_or_default()
);

match core_res {
// A normal quit is still an unexpected "stop" to the caller.
Ok(_) => Fatal::Stopped,
Err(e) => e,
}
}

/// Wait for RaftCore task to finish and record the returned value from the task.
#[tracing::instrument(level = "debug", skip_all)]
async fn join_core_task(&self) {
let mut state = self.inner.core_state.lock().await;
match &mut *state {
CoreState::Running(handle) => {
let res = handle.await;
tracing::info!(res = debug(&res), "RaftCore exited");

let core_task_res = match res {
Err(err) => {
if C::AsyncRuntime::is_panic(&err) {
Err(Fatal::Panicked)
} else {
Err(Fatal::Stopped)
}
}
Ok(returned_res) => returned_res,
};

*state = CoreState::Done(core_task_res);
}
CoreState::Done(_) => {
// RaftCore has already quit, nothing to do
}
}
}

/// Send a request to the Raft core loop in a fire-and-forget manner.
///
/// The request functor will be called with a mutable reference to both the state machine
Expand Down Expand Up @@ -898,7 +808,7 @@ where
let send_res = tx.send(());
tracing::info!("sending shutdown signal to RaftCore, sending res: {:?}", send_res);
}
self.join_core_task().await;
self.inner.join_core_task().await;
self.inner.tick_handle.shutdown().await;

// TODO(xp): API change: replace `JoinError` with `Fatal`,
Expand Down
126 changes: 126 additions & 0 deletions openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::fmt;
use std::sync::Arc;

use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::watch;
use tokio::sync::Mutex;

use crate::config::RuntimeConfig;
use crate::core::TickHandle;
use crate::error::Fatal;
use crate::raft::CoreState;
use crate::raft::ExternalCommand;
use crate::raft::RaftMsg;
use crate::storage::RaftLogStorage;
use crate::AsyncRuntime;
use crate::Config;
use crate::RaftMetrics;
use crate::RaftNetworkFactory;
use crate::RaftTypeConfig;

/// RaftInner is the internal handle and provides internally used APIs to communicate with
/// `RaftCore`.
pub(in crate::raft) struct RaftInner<C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
{
pub(in crate::raft) id: C::NodeId,
pub(in crate::raft) config: Arc<Config>,
pub(in crate::raft) runtime_config: Arc<RuntimeConfig>,
pub(in crate::raft) tick_handle: TickHandle<C>,
pub(in crate::raft) tx_api: mpsc::UnboundedSender<RaftMsg<C, N, LS>>,
pub(in crate::raft) rx_metrics: watch::Receiver<RaftMetrics<C::NodeId, C::Node>>,

// TODO(xp): it does not need to be a async mutex.
#[allow(clippy::type_complexity)]
pub(in crate::raft) tx_shutdown: Mutex<Option<oneshot::Sender<()>>>,
pub(in crate::raft) core_state: Mutex<CoreState<C::NodeId, C::AsyncRuntime>>,
}

impl<C, N, LS> RaftInner<C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
{
/// Send an [`ExternalCommand`] to RaftCore to execute in the `RaftCore` thread.
///
/// It returns at once.
pub(in crate::raft) async fn send_external_command(
&self,
cmd: ExternalCommand,
cmd_desc: impl fmt::Display + Default,
) -> Result<(), Fatal<C::NodeId>> {
let send_res = self.tx_api.send(RaftMsg::ExternalCommand { cmd });

if send_res.is_err() {
let fatal = self.get_core_stopped_error("sending external command to RaftCore", Some(cmd_desc)).await;
return Err(fatal);
}
Ok(())
}

/// Get the error that caused RaftCore to stop.
pub(in crate::raft) async fn get_core_stopped_error(
&self,
when: impl fmt::Display,
message_summary: Option<impl fmt::Display + Default>,
) -> Fatal<C::NodeId> {
// Wait for the core task to finish.
self.join_core_task().await;

// Retrieve the result.
let core_res = {
let state = self.core_state.lock().await;
if let CoreState::Done(core_task_res) = &*state {
core_task_res.clone()
} else {
unreachable!("RaftCore should have already quit")
}
};

tracing::error!(
core_result = debug(&core_res),
"failure {}; message: {}",
when,
message_summary.unwrap_or_default()
);

match core_res {
// A normal quit is still an unexpected "stop" to the caller.
Ok(_) => Fatal::Stopped,
Err(e) => e,
}
}

/// Wait for `RaftCore` task to finish and record the returned value from the task.
#[tracing::instrument(level = "debug", skip_all)]
pub(in crate::raft) async fn join_core_task(&self) {
let mut state = self.core_state.lock().await;
match &mut *state {
CoreState::Running(handle) => {
let res = handle.await;
tracing::info!(res = debug(&res), "RaftCore exited");

let core_task_res = match res {
Err(err) => {
if C::AsyncRuntime::is_panic(&err) {
Err(Fatal::Panicked)
} else {
Err(Fatal::Stopped)
}
}
Ok(returned_res) => returned_res,
};

*state = CoreState::Done(core_task_res);
}
CoreState::Done(_) => {
// RaftCore has already quit, nothing to do
}
}
}
}

0 comments on commit 9ea796a

Please sign in to comment.