From 9ea796a6d54ad5cd3c2104dc7cc83d5d92edf937 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sat, 1 Jul 2023 12:25:34 +0800 Subject: [PATCH] Refactor: split RaftInner into separate file (#885) * Chore: move "raft" to a separate dir * Refactor: split RaftInner into separate file Moved the `RaftInner` struct and its methods into a separate `raft_inner.rs` file. `Raft` is a control handle for application, internally used methods should be provided by `RaftInner`, not `Raft`. --- openraft/src/{raft.rs => raft/mod.rs} | 110 ++-------------------- openraft/src/raft/raft_inner.rs | 126 ++++++++++++++++++++++++++ 2 files changed, 136 insertions(+), 100 deletions(-) rename openraft/src/{raft.rs => raft/mod.rs} (92%) create mode 100644 openraft/src/raft/raft_inner.rs diff --git a/openraft/src/raft.rs b/openraft/src/raft/mod.rs similarity index 92% rename from openraft/src/raft.rs rename to openraft/src/raft/mod.rs index e7408ca51..f391f54b8 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft/mod.rs @@ -1,5 +1,7 @@ //! Public Raft interface and data types. +mod raft_inner; + use std::collections::BTreeMap; use std::fmt; use std::fmt::Debug; @@ -27,7 +29,6 @@ use crate::core::replication_lag; use crate::core::sm; use crate::core::RaftCore; use crate::core::Tick; -use crate::core::TickHandle; use crate::display_ext::DisplayOptionExt; use crate::display_ext::DisplaySlice; use crate::engine::Engine; @@ -46,6 +47,7 @@ use crate::metrics::RaftMetrics; use crate::metrics::Wait; use crate::network::RaftNetworkFactory; use crate::node::Node; +use crate::raft::raft_inner::RaftInner; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; use crate::AppData; @@ -155,24 +157,6 @@ where Done(Result<(), Fatal>), } -struct RaftInner -where - C: RaftTypeConfig, - N: RaftNetworkFactory, - LS: RaftLogStorage, -{ - id: C::NodeId, - config: Arc, - runtime_config: Arc, - tick_handle: TickHandle, - tx_api: mpsc::UnboundedSender>, - rx_metrics: watch::Receiver>, - // TODO(xp): it does not need to be a async mutex. - #[allow(clippy::type_complexity)] - tx_shutdown: Mutex>>, - core_state: Mutex>, -} - /// The Raft API. /// /// This type implements the full Raft spec, and is the interface to a running Raft node. @@ -356,7 +340,7 @@ where /// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error. /// It is not affected by `Raft::enable_elect(false)`. pub async fn trigger_elect(&self) -> Result<(), Fatal> { - self.send_external_command(ExternalCommand::Elect, "trigger_elect").await + self.inner.send_external_command(ExternalCommand::Elect, "trigger_elect").await } /// Trigger a heartbeat at once and return at once. @@ -364,14 +348,14 @@ where /// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error. /// It is not affected by `Raft::enable_heartbeat(false)`. pub async fn trigger_heartbeat(&self) -> Result<(), Fatal> { - self.send_external_command(ExternalCommand::Heartbeat, "trigger_heartbeat").await + self.inner.send_external_command(ExternalCommand::Heartbeat, "trigger_heartbeat").await } /// Trigger to build a snapshot at once and return at once. /// /// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error. pub async fn trigger_snapshot(&self) -> Result<(), Fatal> { - self.send_external_command(ExternalCommand::Snapshot, "trigger_snapshot").await + self.inner.send_external_command(ExternalCommand::Snapshot, "trigger_snapshot").await } /// Initiate the log purge up to and including the given `upto` log index. @@ -390,21 +374,7 @@ where /// /// [`max_in_snapshot_log_to_keep`]: `crate::Config::max_in_snapshot_log_to_keep` pub async fn purge_log(&self, upto: u64) -> Result<(), Fatal> { - self.send_external_command(ExternalCommand::PurgeLog { upto }, "purge_log").await - } - - async fn send_external_command( - &self, - cmd: ExternalCommand, - cmd_desc: impl fmt::Display + Default, - ) -> Result<(), Fatal> { - let send_res = self.inner.tx_api.send(RaftMsg::ExternalCommand { cmd }); - - if send_res.is_err() { - let fatal = self.get_core_stopped_error("sending external command to RaftCore", Some(cmd_desc)).await; - return Err(fatal); - } - Ok(()) + self.inner.send_external_command(ExternalCommand::PurgeLog { upto }, "purge_log").await } /// Submit an AppendEntries RPC to this Raft node. @@ -751,7 +721,7 @@ where let send_res = self.inner.tx_api.send(mes); if send_res.is_err() { - let fatal = self.get_core_stopped_error("sending tx to RaftCore", sum).await; + let fatal = self.inner.get_core_stopped_error("sending tx to RaftCore", sum).await; return Err(RaftError::Fatal(fatal)); } @@ -761,73 +731,13 @@ where match recv_res { Ok(x) => x.map_err(|e| RaftError::APIError(e)), Err(_) => { - let fatal = self.get_core_stopped_error("receiving rx from RaftCore", sum).await; + let fatal = self.inner.get_core_stopped_error("receiving rx from RaftCore", sum).await; tracing::error!(error = debug(&fatal), "core_call fatal error"); Err(RaftError::Fatal(fatal)) } } } - async fn get_core_stopped_error( - &self, - when: impl fmt::Display, - message_summary: Option, - ) -> Fatal { - // Wait for the core task to finish. - self.join_core_task().await; - - // Retrieve the result. - let core_res = { - let state = self.inner.core_state.lock().await; - if let CoreState::Done(core_task_res) = &*state { - core_task_res.clone() - } else { - unreachable!("RaftCore should have already quit") - } - }; - - tracing::error!( - core_result = debug(&core_res), - "failure {}; message: {}", - when, - message_summary.unwrap_or_default() - ); - - match core_res { - // A normal quit is still an unexpected "stop" to the caller. - Ok(_) => Fatal::Stopped, - Err(e) => e, - } - } - - /// Wait for RaftCore task to finish and record the returned value from the task. - #[tracing::instrument(level = "debug", skip_all)] - async fn join_core_task(&self) { - let mut state = self.inner.core_state.lock().await; - match &mut *state { - CoreState::Running(handle) => { - let res = handle.await; - tracing::info!(res = debug(&res), "RaftCore exited"); - - let core_task_res = match res { - Err(err) => { - if C::AsyncRuntime::is_panic(&err) { - Err(Fatal::Panicked) - } else { - Err(Fatal::Stopped) - } - } - Ok(returned_res) => returned_res, - }; - - *state = CoreState::Done(core_task_res); - } - CoreState::Done(_) => { - // RaftCore has already quit, nothing to do - } - } - } - /// Send a request to the Raft core loop in a fire-and-forget manner. /// /// The request functor will be called with a mutable reference to both the state machine @@ -898,7 +808,7 @@ where let send_res = tx.send(()); tracing::info!("sending shutdown signal to RaftCore, sending res: {:?}", send_res); } - self.join_core_task().await; + self.inner.join_core_task().await; self.inner.tick_handle.shutdown().await; // TODO(xp): API change: replace `JoinError` with `Fatal`, diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs new file mode 100644 index 000000000..67337a372 --- /dev/null +++ b/openraft/src/raft/raft_inner.rs @@ -0,0 +1,126 @@ +use std::fmt; +use std::sync::Arc; + +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::sync::watch; +use tokio::sync::Mutex; + +use crate::config::RuntimeConfig; +use crate::core::TickHandle; +use crate::error::Fatal; +use crate::raft::CoreState; +use crate::raft::ExternalCommand; +use crate::raft::RaftMsg; +use crate::storage::RaftLogStorage; +use crate::AsyncRuntime; +use crate::Config; +use crate::RaftMetrics; +use crate::RaftNetworkFactory; +use crate::RaftTypeConfig; + +/// RaftInner is the internal handle and provides internally used APIs to communicate with +/// `RaftCore`. +pub(in crate::raft) struct RaftInner +where + C: RaftTypeConfig, + N: RaftNetworkFactory, + LS: RaftLogStorage, +{ + pub(in crate::raft) id: C::NodeId, + pub(in crate::raft) config: Arc, + pub(in crate::raft) runtime_config: Arc, + pub(in crate::raft) tick_handle: TickHandle, + pub(in crate::raft) tx_api: mpsc::UnboundedSender>, + pub(in crate::raft) rx_metrics: watch::Receiver>, + + // TODO(xp): it does not need to be a async mutex. + #[allow(clippy::type_complexity)] + pub(in crate::raft) tx_shutdown: Mutex>>, + pub(in crate::raft) core_state: Mutex>, +} + +impl RaftInner +where + C: RaftTypeConfig, + N: RaftNetworkFactory, + LS: RaftLogStorage, +{ + /// Send an [`ExternalCommand`] to RaftCore to execute in the `RaftCore` thread. + /// + /// It returns at once. + pub(in crate::raft) async fn send_external_command( + &self, + cmd: ExternalCommand, + cmd_desc: impl fmt::Display + Default, + ) -> Result<(), Fatal> { + let send_res = self.tx_api.send(RaftMsg::ExternalCommand { cmd }); + + if send_res.is_err() { + let fatal = self.get_core_stopped_error("sending external command to RaftCore", Some(cmd_desc)).await; + return Err(fatal); + } + Ok(()) + } + + /// Get the error that caused RaftCore to stop. + pub(in crate::raft) async fn get_core_stopped_error( + &self, + when: impl fmt::Display, + message_summary: Option, + ) -> Fatal { + // Wait for the core task to finish. + self.join_core_task().await; + + // Retrieve the result. + let core_res = { + let state = self.core_state.lock().await; + if let CoreState::Done(core_task_res) = &*state { + core_task_res.clone() + } else { + unreachable!("RaftCore should have already quit") + } + }; + + tracing::error!( + core_result = debug(&core_res), + "failure {}; message: {}", + when, + message_summary.unwrap_or_default() + ); + + match core_res { + // A normal quit is still an unexpected "stop" to the caller. + Ok(_) => Fatal::Stopped, + Err(e) => e, + } + } + + /// Wait for `RaftCore` task to finish and record the returned value from the task. + #[tracing::instrument(level = "debug", skip_all)] + pub(in crate::raft) async fn join_core_task(&self) { + let mut state = self.core_state.lock().await; + match &mut *state { + CoreState::Running(handle) => { + let res = handle.await; + tracing::info!(res = debug(&res), "RaftCore exited"); + + let core_task_res = match res { + Err(err) => { + if C::AsyncRuntime::is_panic(&err) { + Err(Fatal::Panicked) + } else { + Err(Fatal::Stopped) + } + } + Ok(returned_res) => returned_res, + }; + + *state = CoreState::Done(core_task_res); + } + CoreState::Done(_) => { + // RaftCore has already quit, nothing to do + } + } + } +}