diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index dcfaabbc3..79a44964e 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -65,6 +65,7 @@ pub mod raft; pub mod storage; pub mod testing; pub mod timer; +pub mod type_config; pub(crate) mod engine; pub(crate) mod log_id_range; @@ -89,6 +90,7 @@ pub use macros::add_async_trait; pub use network::RPCTypes; pub use network::RaftNetwork; pub use network::RaftNetworkFactory; +pub use type_config::RaftTypeConfig; pub use crate::async_runtime::AsyncRuntime; pub use crate::async_runtime::TokioRuntime; @@ -114,7 +116,6 @@ pub use crate::node::EmptyNode; pub use crate::node::Node; pub use crate::node::NodeId; pub use crate::raft::Raft; -pub use crate::raft::RaftTypeConfig; pub use crate::raft_state::MembershipState; pub use crate::raft_state::RaftState; pub use crate::raft_types::SnapshotId; diff --git a/openraft/src/raft/core_state.rs b/openraft/src/raft/core_state.rs new file mode 100644 index 000000000..c924ca310 --- /dev/null +++ b/openraft/src/raft/core_state.rs @@ -0,0 +1,16 @@ +use crate::error::Fatal; +use crate::AsyncRuntime; +use crate::NodeId; + +/// The running state of RaftCore +pub(in crate::raft) enum CoreState +where + NID: NodeId, + A: AsyncRuntime, +{ + /// The RaftCore task is still running. + Running(A::JoinHandle>>), + + /// The RaftCore task has finished. The return value of the task is stored. + Done(Result<(), Fatal>), +} diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 5a1707edb..dedaee489 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -2,6 +2,9 @@ mod message; mod raft_inner; +mod trigger; + +pub(in crate::raft) mod core_state; use std::fmt::Debug; use std::marker::PhantomData; @@ -9,6 +12,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; +use core_state::CoreState; use maplit::btreemap; pub use message::AppendEntriesRequest; pub use message::AppendEntriesResponse; @@ -17,9 +21,6 @@ pub use message::InstallSnapshotRequest; pub use message::InstallSnapshotResponse; pub use message::VoteRequest; pub use message::VoteResponse; -use tokio::io::AsyncRead; -use tokio::io::AsyncSeek; -use tokio::io::AsyncWrite; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::watch; @@ -31,7 +32,6 @@ use tracing::Level; use crate::config::Config; use crate::config::RuntimeConfig; use crate::core::command_state::CommandState; -use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::RaftMsg; use crate::core::replication_lag; use crate::core::sm; @@ -39,8 +39,6 @@ use crate::core::RaftCore; use crate::core::Tick; use crate::engine::Engine; use crate::engine::EngineConfig; -use crate::entry::FromAppData; -use crate::entry::RaftEntry; use crate::error::CheckIsLeaderError; use crate::error::ClientWriteError; use crate::error::Fatal; @@ -51,69 +49,19 @@ use crate::membership::IntoNodes; use crate::metrics::RaftMetrics; use crate::metrics::Wait; use crate::network::RaftNetworkFactory; -use crate::node::Node; use crate::raft::raft_inner::RaftInner; +use crate::raft::trigger::Trigger; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; -use crate::AppData; -use crate::AppDataResponse; use crate::AsyncRuntime; use crate::ChangeMembers; use crate::LogId; use crate::LogIdOptionExt; use crate::MessageSummary; -use crate::NodeId; -use crate::OptionalSend; use crate::RaftState; +pub use crate::RaftTypeConfig; use crate::StorageHelper; -/// Configuration of types used by the [`Raft`] core engine. -/// -/// The (empty) implementation structure defines request/response types, node ID type -/// and the like. Refer to the documentation of associated types for more information. -/// -/// ## Note -/// -/// Since Rust cannot automatically infer traits for various inner types using this config -/// type as a parameter, this trait simply uses all the traits required for various types -/// as its supertraits as a workaround. To ease the declaration, the macro -/// `declare_raft_types` is provided, which can be used to declare the type easily. -/// -/// Example: -/// ```ignore -/// openraft::declare_raft_types!( -/// /// Declare the type configuration for `MemStore`. -/// pub Config: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId -/// ); -/// ``` -pub trait RaftTypeConfig: - Sized + Send + Sync + Debug + Clone + Copy + Default + Eq + PartialEq + Ord + PartialOrd + 'static -{ - /// Application-specific request data passed to the state machine. - type D: AppData; - - /// Application-specific response data returned by the state machine. - type R: AppDataResponse; - - /// A Raft node's ID. - type NodeId: NodeId; - - /// Raft application level node data - type Node: Node; - - /// Raft log entry, which can be built from an AppData. - type Entry: RaftEntry + FromAppData; - - /// Snapshot data for exposing a snapshot for reading & writing. - /// - /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#implement-raftstorage) - /// for details on where and how this is used. - type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + Sync + Unpin + 'static; - - /// Asynchronous runtime type. - type AsyncRuntime: AsyncRuntime; -} - /// Define types for a Raft type configuration. /// /// Since Rust has some limitations when deriving traits for types with generic arguments @@ -125,8 +73,14 @@ pub trait RaftTypeConfig: /// Example: /// ```ignore /// openraft::declare_raft_types!( -/// /// Declare the type configuration for `MemStore`. -/// pub Config: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId +/// pub Config: +/// D = ClientRequest, +/// R = ClientResponse, +/// NodeId = u64, +/// Node = openraft::BasicNode, +/// Entry = openraft::Entry, +/// SnapshotData = Cursor>, +/// AsyncRuntime = openraft::TokioRuntime, /// ); /// ``` #[macro_export] @@ -146,19 +100,6 @@ macro_rules! declare_raft_types { }; } -/// The running state of RaftCore -enum CoreState -where - NID: NodeId, - A: AsyncRuntime, -{ - /// The RaftCore task is still running. - Running(A::JoinHandle>>), - - /// The RaftCore task has finished. The return value of the task is stored. - Done(Result<(), Fatal>), -} - /// The Raft API. /// /// This type implements the full Raft spec, and is the interface to a running Raft node. @@ -337,46 +278,39 @@ where self.inner.runtime_config.enable_elect.store(enabled, Ordering::Relaxed); } - /// Trigger election at once and return at once. + /// Return a handle to manually trigger raft actions, such as elect or build snapshot. /// - /// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error. - /// It is not affected by `Raft::enable_elect(false)`. + /// Example: + /// ```ignore + /// let raft = Raft::new(...).await?; + /// raft.trigger().elect().await?; + /// ``` + pub fn trigger(&self) -> Trigger { + Trigger::new(self.inner.as_ref()) + } + + /// Trigger election at once and return at once. + #[deprecated(note = "use `Raft::trigger().elect()` instead")] pub async fn trigger_elect(&self) -> Result<(), Fatal> { - self.inner.send_external_command(ExternalCommand::Elect, "trigger_elect").await + self.trigger().elect().await } /// Trigger a heartbeat at once and return at once. - /// - /// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error. - /// It is not affected by `Raft::enable_heartbeat(false)`. + #[deprecated(note = "use `Raft::trigger().heartbeat()` instead")] pub async fn trigger_heartbeat(&self) -> Result<(), Fatal> { - self.inner.send_external_command(ExternalCommand::Heartbeat, "trigger_heartbeat").await + self.trigger().heartbeat().await } /// Trigger to build a snapshot at once and return at once. - /// - /// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error. + #[deprecated(note = "use `Raft::trigger().snapshot()` instead")] pub async fn trigger_snapshot(&self) -> Result<(), Fatal> { - self.inner.send_external_command(ExternalCommand::Snapshot, "trigger_snapshot").await + self.trigger().snapshot().await } /// Initiate the log purge up to and including the given `upto` log index. - /// - /// Logs that are not included in a snapshot will **NOT** be purged. - /// In such scenario it will delete as many log as possible. - /// The [`max_in_snapshot_log_to_keep`] config is not taken into account - /// when purging logs. - /// - /// It returns error only when RaftCore has [`Fatal`] error, e.g. shut down or having storage - /// error. - /// - /// Openraft won't purge logs at once, e.g. it may be delayed by several seconds, because if it - /// is a leader and a replication task has been replicating the logs to a follower, the logs - /// can't be purged until the replication task is finished. - /// - /// [`max_in_snapshot_log_to_keep`]: `crate::Config::max_in_snapshot_log_to_keep` + #[deprecated(note = "use `Raft::trigger().purge_log()` instead")] pub async fn purge_log(&self, upto: u64) -> Result<(), Fatal> { - self.inner.send_external_command(ExternalCommand::PurgeLog { upto }, "purge_log").await + self.trigger().purge_log(upto).await } /// Submit an AppendEntries RPC to this Raft node. diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index 5ecaf4258..0bcb0afcf 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -11,7 +11,7 @@ use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::RaftMsg; use crate::core::TickHandle; use crate::error::Fatal; -use crate::raft::CoreState; +use crate::raft::core_state::CoreState; use crate::storage::RaftLogStorage; use crate::AsyncRuntime; use crate::Config; diff --git a/openraft/src/raft/trigger.rs b/openraft/src/raft/trigger.rs new file mode 100644 index 000000000..ad388ba59 --- /dev/null +++ b/openraft/src/raft/trigger.rs @@ -0,0 +1,71 @@ +//! Trigger an action to RaftCore by external caller. + +use crate::core::raft_msg::external_command::ExternalCommand; +use crate::error::Fatal; +use crate::raft::RaftInner; +use crate::storage::RaftLogStorage; +use crate::RaftNetworkFactory; +use crate::RaftTypeConfig; + +/// Trigger is an interface to trigger an action to RaftCore by external caller. +pub struct Trigger<'r, C, N, LS> +where + C: RaftTypeConfig, + N: RaftNetworkFactory, + LS: RaftLogStorage, +{ + raft_inner: &'r RaftInner, +} + +impl<'r, C, N, LS> Trigger<'r, C, N, LS> +where + C: RaftTypeConfig, + N: RaftNetworkFactory, + LS: RaftLogStorage, +{ + pub(in crate::raft) fn new(raft_inner: &'r RaftInner) -> Self { + Self { raft_inner } + } + + /// Trigger election at once and return at once. + /// + /// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error. + /// It is not affected by `Raft::enable_elect(false)`. + pub async fn elect(&self) -> Result<(), Fatal> { + self.raft_inner.send_external_command(ExternalCommand::Elect, "trigger_elect").await + } + + /// Trigger a heartbeat at once and return at once. + /// + /// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error. + /// It is not affected by `Raft::enable_heartbeat(false)`. + pub async fn heartbeat(&self) -> Result<(), Fatal> { + self.raft_inner.send_external_command(ExternalCommand::Heartbeat, "trigger_heartbeat").await + } + + /// Trigger to build a snapshot at once and return at once. + /// + /// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error. + pub async fn snapshot(&self) -> Result<(), Fatal> { + self.raft_inner.send_external_command(ExternalCommand::Snapshot, "trigger_snapshot").await + } + + /// Initiate the log purge up to and including the given `upto` log index. + /// + /// Logs that are not included in a snapshot will **NOT** be purged. + /// In such scenario it will delete as many log as possible. + /// The [`max_in_snapshot_log_to_keep`] config is not taken into account + /// when purging logs. + /// + /// It returns error only when RaftCore has [`Fatal`] error, e.g. shut down or having storage + /// error. + /// + /// Openraft won't purge logs at once, e.g. it may be delayed by several seconds, because if it + /// is a leader and a replication task has been replicating the logs to a follower, the logs + /// can't be purged until the replication task is finished. + /// + /// [`max_in_snapshot_log_to_keep`]: `crate::Config::max_in_snapshot_log_to_keep` + pub async fn purge_log(&self, upto: u64) -> Result<(), Fatal> { + self.raft_inner.send_external_command(ExternalCommand::PurgeLog { upto }, "purge_log").await + } +} diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs new file mode 100644 index 000000000..6753a5eb3 --- /dev/null +++ b/openraft/src/type_config.rs @@ -0,0 +1,68 @@ +use std::fmt::Debug; + +use tokio::io::AsyncRead; +use tokio::io::AsyncSeek; +use tokio::io::AsyncWrite; + +use crate::entry::FromAppData; +use crate::entry::RaftEntry; +use crate::AppData; +use crate::AppDataResponse; +use crate::AsyncRuntime; +use crate::Node; +use crate::NodeId; +use crate::OptionalSend; + +/// Configuration of types used by the [`Raft`] core engine. +/// +/// The (empty) implementation structure defines request/response types, node ID type +/// and the like. Refer to the documentation of associated types for more information. +/// +/// ## Note +/// +/// Since Rust cannot automatically infer traits for various inner types using this config +/// type as a parameter, this trait simply uses all the traits required for various types +/// as its supertraits as a workaround. To ease the declaration, the macro +/// `declare_raft_types` is provided, which can be used to declare the type easily. +/// +/// Example: +/// ```ignore +/// openraft::declare_raft_types!( +/// pub Config: +/// D = ClientRequest, +/// R = ClientResponse, +/// NodeId = u64, +/// Node = openraft::BasicNode, +/// Entry = openraft::Entry, +/// SnapshotData = Cursor>, +/// AsyncRuntime = openraft::TokioRuntime, +/// ); +/// ``` +/// [`Raft`]: crate::Raft +pub trait RaftTypeConfig: + Sized + Send + Sync + Debug + Clone + Copy + Default + Eq + PartialEq + Ord + PartialOrd + 'static +{ + /// Application-specific request data passed to the state machine. + type D: AppData; + + /// Application-specific response data returned by the state machine. + type R: AppDataResponse; + + /// A Raft node's ID. + type NodeId: NodeId; + + /// Raft application level node data + type Node: Node; + + /// Raft log entry, which can be built from an AppData. + type Entry: RaftEntry + FromAppData; + + /// Snapshot data for exposing a snapshot for reading & writing. + /// + /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#implement-raftstorage) + /// for details on where and how this is used. + type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + Sync + Unpin + 'static; + + /// Asynchronous runtime type. + type AsyncRuntime: AsyncRuntime; +} diff --git a/tests/tests/client_api/t12_trigger_purge_log.rs b/tests/tests/client_api/t12_trigger_purge_log.rs index 16ec10da3..0efed021f 100644 --- a/tests/tests/client_api/t12_trigger_purge_log.rs +++ b/tests/tests/client_api/t12_trigger_purge_log.rs @@ -41,7 +41,7 @@ async fn trigger_purge_log() -> anyhow::Result<()> { tracing::info!(log_index, "--- trigger snapshot for node-0"); { let n0 = router.get_raft_handle(&0)?; - n0.trigger_snapshot().await?; + n0.trigger().snapshot().await?; router.wait(&0, timeout()).snapshot(log_id(1, 0, log_index), "node-1 snapshot").await?; } @@ -60,7 +60,7 @@ async fn trigger_purge_log() -> anyhow::Result<()> { tracing::info!(log_index, "--- purge log for node 0"); { let n0 = router.get_raft_handle(&0)?; - n0.purge_log(snapshot_index).await?; + n0.trigger().purge_log(snapshot_index).await?; router .wait(&0, timeout()) @@ -70,7 +70,7 @@ async fn trigger_purge_log() -> anyhow::Result<()> { ) .await?; - n0.purge_log(log_index).await?; + n0.trigger().purge_log(log_index).await?; let res = router .wait(&0, timeout()) .purged( diff --git a/tests/tests/client_api/t13_trigger_snapshot.rs b/tests/tests/client_api/t13_trigger_snapshot.rs index 22911a676..7670783f9 100644 --- a/tests/tests/client_api/t13_trigger_snapshot.rs +++ b/tests/tests/client_api/t13_trigger_snapshot.rs @@ -28,7 +28,7 @@ async fn trigger_snapshot() -> anyhow::Result<()> { tracing::info!(log_index, "--- trigger snapshot for node-1"); { let n1 = router.get_raft_handle(&1)?; - n1.trigger_snapshot().await?; + n1.trigger().snapshot().await?; router .wait(&1, timeout()) @@ -48,7 +48,7 @@ async fn trigger_snapshot() -> anyhow::Result<()> { tracing::info!(log_index, "--- trigger snapshot for node-0"); { let n0 = router.get_raft_handle(&0)?; - n0.trigger_snapshot().await?; + n0.trigger().snapshot().await?; router .wait(&0, timeout()) diff --git a/tests/tests/membership/t30_elect_with_new_config.rs b/tests/tests/membership/t30_elect_with_new_config.rs index 4cc4274bf..630d45020 100644 --- a/tests/tests/membership/t30_elect_with_new_config.rs +++ b/tests/tests/membership/t30_elect_with_new_config.rs @@ -45,7 +45,7 @@ async fn leader_election_after_changing_0_to_01234() -> Result<()> { // Let node-1 become leader. let node_1 = router.get_raft_handle(&1)?; - node_1.trigger_elect().await?; + node_1.trigger().elect().await?; log_index += 1; // leader initial blank log router diff --git a/tests/tests/metrics/t10_purged.rs b/tests/tests/metrics/t10_purged.rs index bfecc230c..aaf4740e8 100644 --- a/tests/tests/metrics/t10_purged.rs +++ b/tests/tests/metrics/t10_purged.rs @@ -35,7 +35,7 @@ async fn metrics_purged() -> Result<()> { tracing::info!(log_index, "--- trigger snapshot"); { let n0 = router.get_raft_handle(&0)?; - n0.trigger_snapshot().await?; + n0.trigger().snapshot().await?; n0.wait(timeout()).snapshot(log_id(1, 0, log_index), "build snapshot").await?; tracing::info!(log_index, "--- metrics reports purged log id"); diff --git a/tests/tests/metrics/t30_leader_metrics.rs b/tests/tests/metrics/t30_leader_metrics.rs index a3da46819..70bbb55fb 100644 --- a/tests/tests/metrics/t30_leader_metrics.rs +++ b/tests/tests/metrics/t30_leader_metrics.rs @@ -158,7 +158,7 @@ async fn leader_metrics() -> Result<()> { // Let the leader lease expire sleep(Duration::from_millis(700)).await; - n1.trigger_elect().await?; + n1.trigger().elect().await?; n1.wait(timeout()).state(ServerState::Leader, "node-1 becomes leader").await?; n1.wait(timeout()).metrics(|x| x.replication.is_some(), "node-1 starts replication").await?; diff --git a/tests/tests/replication/t50_append_entries_backoff_rejoin.rs b/tests/tests/replication/t50_append_entries_backoff_rejoin.rs index 5432b1cf0..720ba7e43 100644 --- a/tests/tests/replication/t50_append_entries_backoff_rejoin.rs +++ b/tests/tests/replication/t50_append_entries_backoff_rejoin.rs @@ -45,7 +45,7 @@ async fn append_entries_backoff_rejoin() -> Result<()> { // Timeout leader lease otherwise vote-request will be rejected by node-2 tokio::time::sleep(Duration::from_millis(1_000)).await; - n1.trigger_elect().await?; + n1.trigger().elect().await?; n1.wait(timeout()).state(ServerState::Leader, "node-1 elect").await?; } diff --git a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs index fe6f11a8a..1d1f7054e 100644 --- a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs +++ b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs @@ -43,7 +43,7 @@ async fn building_snapshot_does_not_block_append() -> Result<()> { log_index += router.client_request_many(0, "0", 10).await?; router.wait(&1, timeout()).log(Some(log_index), "written 10 logs").await?; - follower.trigger_snapshot().await?; + follower.trigger().snapshot().await?; tracing::info!(log_index, "--- sleep 500 ms to make sure snapshot is started"); tokio::time::sleep(Duration::from_millis(500)).await; diff --git a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs index cd040c437..69500c9f1 100644 --- a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs +++ b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs @@ -47,7 +47,7 @@ async fn building_snapshot_does_not_block_apply() -> Result<()> { log_index += router.client_request_many(0, "0", 10).await?; router.wait(&1, timeout()).log(Some(log_index), "written 10 logs").await?; - follower.trigger_snapshot().await?; + follower.trigger().snapshot().await?; tracing::info!(log_index, "--- sleep 500 ms to make sure snapshot is started"); tokio::time::sleep(Duration::from_millis(500)).await; diff --git a/tests/tests/snapshot_streaming/t20_startup_snapshot.rs b/tests/tests/snapshot_streaming/t20_startup_snapshot.rs index dca8a1435..a6fbb98ef 100644 --- a/tests/tests/snapshot_streaming/t20_startup_snapshot.rs +++ b/tests/tests/snapshot_streaming/t20_startup_snapshot.rs @@ -33,7 +33,7 @@ async fn startup_build_snapshot() -> anyhow::Result<()> { log_index += router.client_request_many(0, "0", (20 - 1 - log_index) as usize).await?; router.wait(&0, timeout()).log(Some(log_index), "node-0 applied all requests").await?; - router.get_raft_handle(&0)?.trigger_snapshot().await?; + router.get_raft_handle(&0)?.trigger().snapshot().await?; router.wait(&0, timeout()).snapshot(log_id(1, 0, log_index), "node-0 snapshot").await?; } diff --git a/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs b/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs index 0405556c8..766c53e67 100644 --- a/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs +++ b/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs @@ -40,7 +40,7 @@ async fn purge_in_snapshot_logs() -> Result<()> { tracing::info!(log_index, "--- build snapshot on leader, check purged log"); { log_index += router.client_request_many(0, "0", 10).await?; - leader.trigger_snapshot().await?; + leader.trigger().snapshot().await?; leader .wait(timeout()) .snapshot( @@ -66,7 +66,7 @@ async fn purge_in_snapshot_logs() -> Result<()> { log_index += router.client_request_many(0, "0", 5).await?; router.wait(&0, timeout()).log(Some(log_index), "write another 5 logs").await?; - leader.trigger_snapshot().await?; + leader.trigger().snapshot().await?; leader .wait(timeout()) .snapshot( diff --git a/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs b/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs index 5c966eb3a..fffcf9a23 100644 --- a/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs +++ b/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs @@ -46,7 +46,7 @@ async fn replication_does_not_block_purge() -> Result<()> { { log_index += router.client_request_many(0, "0", 10).await?; - leader.trigger_snapshot().await?; + leader.trigger().snapshot().await?; leader .wait(timeout()) .snapshot(LogId::new(CommittedLeaderId::new(1, 0), log_index), "built snapshot") diff --git a/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs b/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs index 82a6141b6..bb38d5bd7 100644 --- a/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs +++ b/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs @@ -40,7 +40,7 @@ async fn snapshot_to_unreachable_node_should_not_block() -> Result<()> { tracing::info!(log_index, "--- build a snapshot"); { - n0.trigger_snapshot().await?; + n0.trigger().snapshot().await?; n0.wait(timeout()).snapshot(log_id(1, 0, log_index), "snapshot").await?; n0.wait(timeout()).purged(Some(log_id(1, 0, log_index)), "logs in snapshot are purged").await?;