From eeeff6f3ef85b17fd486fc3a235a7ff04f54ac04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Fri, 2 Aug 2024 11:01:13 +0800 Subject: [PATCH] Feature: Transfer Leader Call `Raft.trigger().transfer_leader(to)` to inform the raft node to transfer its leadership to Node `to`. This feature is enabled only when: the application uses `RaftNetworkV2` and implements the `RaftNetworkV2::transfer_leader()` methods. This method provides a default implementation that returns an `Unreachable`, and such an error will be just ignored. Application upgrading Openraft from older version does not need to modify any codes, unless TransferLeader is required. Upgrade tip: Implement `RaftNetworkV2::transfer_leader()` to send the `TransferLeaderRequest` to the target node. The target node that receives this request should then pass it to `Raft::transfer_leader()`. --- openraft/src/core/raft_core.rs | 96 ++++++++++++++--- .../src/core/raft_msg/external_command.rs | 6 ++ openraft/src/core/raft_msg/mod.rs | 4 +- openraft/src/engine/command.rs | 10 ++ openraft/src/engine/engine_impl.rs | 19 +++- .../src/engine/handler/leader_handler/mod.rs | 13 +++ .../leader_handler/transfer_leader_test.rs | 76 +++++++++++++ .../src/engine/handler/vote_handler/mod.rs | 2 - openraft/src/error.rs | 3 + openraft/src/error/streaming_error.rs | 14 +++ openraft/src/network/rpc_type.rs | 1 + openraft/src/network/v2/network.rs | 23 +++- openraft/src/proposer/leader.rs | 20 ++++ openraft/src/raft/message/mod.rs | 2 + openraft/src/raft/message/transfer_leader.rs | 56 ++++++++++ openraft/src/raft/mod.rs | 53 +++++---- openraft/src/raft/trigger.rs | 9 ++ openraft/src/raft_state/mod.rs | 24 +++-- openraft/src/replication/mod.rs | 3 + openraft/src/utime.rs | 14 ++- .../t10_conflict_with_empty_entries.rs | 2 +- .../append_entries/t10_see_higher_vote.rs | 2 +- .../t11_append_entries_with_bigger_term.rs | 2 +- tests/tests/client_api/t14_transfer_leader.rs | 56 ++++++++-- tests/tests/fixtures/mod.rs | 101 ++++++++++++------ .../snapshot_building/t10_build_snapshot.rs | 2 +- ...building_snapshot_does_not_block_append.rs | 2 +- ..._building_snapshot_does_not_block_apply.rs | 2 +- .../t31_snapshot_overrides_membership.rs | 2 +- .../t33_snapshot_delete_conflict_logs.rs | 18 ++-- 30 files changed, 525 insertions(+), 112 deletions(-) create mode 100644 openraft/src/engine/handler/leader_handler/transfer_leader_test.rs create mode 100644 openraft/src/raft/message/transfer_leader.rs diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index cbf826fcc..214108a81 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -70,6 +70,7 @@ use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; use crate::progress::Progress; use crate::quorum::QuorumSet; +use crate::raft::message::TransferLeaderRequest; use crate::raft::responder::Responder; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; @@ -472,15 +473,22 @@ where /// The result of applying it to state machine is sent to `resp_tx`, if it is not `None`. /// The calling side may not receive a result from `resp_tx`, if raft is shut down. #[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))] - pub fn write_entry(&mut self, entry: C::Entry, resp_tx: Option>) -> bool { + pub fn write_entry(&mut self, entry: C::Entry, resp_tx: Option>) { tracing::debug!(payload = display(&entry), "write_entry"); - let (mut lh, tx) = if let Some((lh, tx)) = self.engine.get_leader_handler_or_reject(resp_tx) { - (lh, tx) - } else { - return false; + let Some((mut lh, tx)) = self.engine.get_leader_handler_or_reject(resp_tx) else { + return; }; + // If the leader is transferring leadership, forward writes to the new leader. + if let Some(to) = lh.leader.get_transfer_to() { + if let Some(tx) = tx { + let err = lh.state.new_forward_to_leader(*to); + tx.send(Err(ClientWriteError::ForwardToLeader(err))); + } + return; + } + let entries = vec![entry]; // TODO: it should returns membership config error etc. currently this is done by the // caller. @@ -491,28 +499,31 @@ where if let Some(tx) = tx { self.client_resp_channels.insert(index, tx); } - - true } - /// Send a heartbeat message to every followers/learners. - /// - /// Currently heartbeat is a blank log + /// Send a heartbeat message to every follower/learners. #[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))] - pub fn send_heartbeat(&mut self, emitter: impl fmt::Display) -> bool { + pub(crate) fn send_heartbeat(&mut self, emitter: impl fmt::Display) -> bool { tracing::debug!(now = display(C::now().display()), "send_heartbeat"); - let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject(None) { - lh - } else { + let Some((mut lh, _)) = self.engine.get_leader_handler_or_reject(None) else { tracing::debug!( now = display(C::now().display()), - "{} failed to send heartbeat", + "{} failed to send heartbeat, not a Leader", emitter ); return false; }; + if lh.leader.get_transfer_to().is_some() { + tracing::debug!( + now = display(C::now().display()), + "{} is transferring leadership, skip sending heartbeat", + emitter + ); + return false; + } + lh.send_heartbeat(); tracing::debug!("{} triggered sending heartbeat", emitter); @@ -1108,6 +1119,54 @@ where } } + /// Spawn parallel vote requests to all cluster members. + #[tracing::instrument(level = "trace", skip_all)] + async fn broadcast_transfer_leader(&mut self, req: TransferLeaderRequest) { + let voter_ids = self.engine.state.membership_state.effective().voter_ids(); + + for target in voter_ids { + if target == self.id { + continue; + } + + let r = req.clone(); + + // Safe unwrap(): target must be in membership + let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap().clone(); + let mut client = self.network_factory.new_client(target, &target_node).await; + + let ttl = Duration::from_millis(self.config.election_timeout_min); + let option = RPCOption::new(ttl); + + let fut = async move { + let tm_res = C::timeout(ttl, client.transfer_leader(r, option)).await; + let res = match tm_res { + Ok(res) => res, + Err(timeout) => { + tracing::error!({error = display(timeout), target = display(target)}, "timeout sending transfer_leader"); + return; + } + }; + + if let Err(e) = res { + tracing::error!({error = display(e), target = display(target)}, "error sending transfer_leader"); + } else { + tracing::info!("Done transfer_leader sent to {}", target); + } + }; + + let span = tracing::debug_span!( + parent: &Span::current(), + "send_transfer_leader", + target = display(target) + ); + + // False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932 + #[allow(clippy::let_underscore_future)] + let _ = C::spawn(fut.instrument(span)); + } + } + #[tracing::instrument(level = "debug", skip_all)] pub(super) fn handle_vote_request(&mut self, req: VoteRequest, tx: VoteTx) { tracing::info!(req = display(&req), func = func_name!()); @@ -1187,7 +1246,7 @@ where RaftMsg::ExternalCoreRequest { req } => { req(&self.engine.state); } - RaftMsg::TransferLeader { + RaftMsg::HandleTransferLeader { from: current_leader_vote, to, } => { @@ -1227,6 +1286,9 @@ where ExternalCommand::PurgeLog { upto } => { self.engine.trigger_purge_log(upto); } + ExternalCommand::TriggerTransferLeader { to } => { + self.engine.trigger_transfer_leader(to); + } ExternalCommand::StateMachineCommand { sm_cmd } => { let res = self.sm_handle.send(sm_cmd); if let Err(e) = res { @@ -1743,6 +1805,8 @@ where } } } + Command::BroadcastTransferLeader { req } => self.broadcast_transfer_leader(req).await, + Command::RebuildReplicationStreams { targets } => { self.remove_all_replication().await; diff --git a/openraft/src/core/raft_msg/external_command.rs b/openraft/src/core/raft_msg/external_command.rs index e80aaa263..a54f99e5c 100644 --- a/openraft/src/core/raft_msg/external_command.rs +++ b/openraft/src/core/raft_msg/external_command.rs @@ -33,6 +33,9 @@ pub(crate) enum ExternalCommand { /// [`max_in_snapshot_log_to_keep`]: `crate::Config::max_in_snapshot_log_to_keep` PurgeLog { upto: u64 }, + /// Submit a command to inform RaftCore to transfer leadership to the specified node. + TriggerTransferLeader { to: C::NodeId }, + /// Send a [`sm::Command`] to [`sm::worker::Worker`]. /// This command is run in the sm task. StateMachineCommand { sm_cmd: sm::Command }, @@ -66,6 +69,9 @@ where C: RaftTypeConfig ExternalCommand::PurgeLog { upto } => { write!(f, "PurgeLog[..={}]", upto) } + ExternalCommand::TriggerTransferLeader { to } => { + write!(f, "TriggerTransferLeader: to {}", to) + } ExternalCommand::StateMachineCommand { sm_cmd } => { write!(f, "StateMachineCommand: {}", sm_cmd) } diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index 4d5b9edd1..1a811d5f4 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -99,7 +99,7 @@ where C: RaftTypeConfig /// /// If this node is `to`, reset Leader lease and start election. /// Otherwise, just reset Leader lease so that the node `to` can become Leader. - TransferLeader { + HandleTransferLeader { /// The vote of the Leader that is transferring the leadership. from: Vote, /// The assigned node to be the next Leader. @@ -140,7 +140,7 @@ where C: RaftTypeConfig write!(f, "ChangeMembership: {:?}, retain: {}", changes, retain,) } RaftMsg::ExternalCoreRequest { .. } => write!(f, "External Request"), - RaftMsg::TransferLeader { from, to } => { + RaftMsg::HandleTransferLeader { from, to } => { write!(f, "TransferLeader: from_leader: vote={}, to: {}", from, to) } RaftMsg::ExternalCommand { cmd } => { diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 390d9cf2f..651e62e9c 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -12,6 +12,7 @@ use crate::error::Infallible; use crate::error::InitializeError; use crate::error::InstallSnapshotError; use crate::progress::Inflight; +use crate::raft::message::TransferLeaderRequest; use crate::raft::AppendEntriesResponse; use crate::raft::InstallSnapshotResponse; use crate::raft::SnapshotResponse; @@ -69,6 +70,8 @@ where C: RaftTypeConfig /// /// Upon startup, the saved committed log ids will be re-applied to state machine to restore the /// latest state. + /// + /// [`RaftLogStorage`]: crate::storage::RaftLogStorage SaveCommitted { committed: LogId }, /// Commit log entries that are already persisted in the store, upto `upto`, inclusive. @@ -88,6 +91,9 @@ where C: RaftTypeConfig /// Replicate log entries or snapshot to a target. Replicate { target: C::NodeId, req: Inflight }, + /// Broadcast transfer Leader message to all other nodes. + BroadcastTransferLeader { req: TransferLeaderRequest }, + /// Membership config changed, need to update replication streams. /// The Runtime has to close all old replications and start new ones. /// Because a replication stream should only report state for one membership config. @@ -149,6 +155,7 @@ where C: RaftTypeConfig Command::Replicate { target, req } => { write!(f, "Replicate: target={}, req: {}", target, req) } + Command::BroadcastTransferLeader { req } => write!(f, "TransferLeader: {}", req), Command::RebuildReplicationStreams { targets } => { write!(f, "RebuildReplicationStreams: {}", targets.display_n::<10>()) } @@ -185,6 +192,7 @@ where (Command::SaveCommitted { committed }, Command::SaveCommitted { committed: b }) => committed == b, (Command::Apply { already_committed, upto, }, Command::Apply { already_committed: b_committed, upto: b_upto, }, ) => already_committed == b_committed && upto == b_upto, (Command::Replicate { target, req }, Command::Replicate { target: b_target, req: other_req, }, ) => target == b_target && req == other_req, + (Command::BroadcastTransferLeader { req }, Command::BroadcastTransferLeader { req: b, }, ) => req == b, (Command::RebuildReplicationStreams { targets }, Command::RebuildReplicationStreams { targets: b }, ) => targets == b, (Command::SaveVote { vote }, Command::SaveVote { vote: b }) => vote == b, (Command::SendVote { vote_req }, Command::SendVote { vote_req: b }, ) => vote_req == b, @@ -219,6 +227,7 @@ where C: RaftTypeConfig Command::ReplicateCommitted { .. } => CommandKind::Network, Command::Replicate { .. } => CommandKind::Network, + Command::BroadcastTransferLeader { .. } => CommandKind::Network, Command::SendVote { .. } => CommandKind::Network, Command::Apply { .. } => CommandKind::StateMachine, @@ -243,6 +252,7 @@ where C: RaftTypeConfig Command::ReplicateCommitted { .. } => None, Command::Replicate { .. } => None, + Command::BroadcastTransferLeader { .. } => None, Command::SendVote { .. } => None, Command::Apply { .. } => None, diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index aada6fbc3..34ae9edd5 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -279,7 +279,7 @@ where C: RaftTypeConfig tracing::info!( my_vote = display(&**local_leased_vote), my_last_log_id = display(self.state.last_log_id().display()), - lease = display(local_leased_vote.time_info(now)), + lease = display(local_leased_vote.display_lease_info(now)), "Engine::handle_vote_req" ); @@ -288,7 +288,7 @@ where C: RaftTypeConfig if !local_leased_vote.is_expired(now, Duration::from_millis(0)) { tracing::info!( "reject vote-request: leader lease has not yet expire: {}", - local_leased_vote.time_info(now) + local_leased_vote.display_lease_info(now) ); return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().copied(), false); @@ -605,6 +605,21 @@ where C: RaftTypeConfig self.log_handler().update_purge_upto(log_id); self.try_purge_log(); } + + pub(crate) fn trigger_transfer_leader(&mut self, to: C::NodeId) { + tracing::info!(to = display(to), "{}", func_name!()); + + let Some((mut lh, _)) = self.get_leader_handler_or_reject(None) else { + tracing::info!( + to = display(to), + "{}: this node is not a Leader, ignore transfer Leader", + func_name!() + ); + return; + }; + + lh.transfer_leader(to); + } } /// Supporting util diff --git a/openraft/src/engine/handler/leader_handler/mod.rs b/openraft/src/engine/handler/leader_handler/mod.rs index f32a76d99..5f8a61042 100644 --- a/openraft/src/engine/handler/leader_handler/mod.rs +++ b/openraft/src/engine/handler/leader_handler/mod.rs @@ -6,6 +6,7 @@ use crate::engine::EngineOutput; use crate::entry::RaftPayload; use crate::proposer::Leader; use crate::proposer::LeaderQuorumSet; +use crate::raft::message::TransferLeaderRequest; use crate::raft_state::IOId; use crate::raft_state::LogStateReader; use crate::type_config::alias::LogIdOf; @@ -19,6 +20,8 @@ mod append_entries_test; mod get_read_log_id_test; #[cfg(test)] mod send_heartbeat_test; +#[cfg(test)] +mod transfer_leader_test; /// Handle leader operations. /// @@ -107,6 +110,16 @@ where C: RaftTypeConfig std::cmp::max(self.leader.noop_log_id, committed) } + /// Disable proposing new logs for this Leader, and transfer Leader to another node + pub(crate) fn transfer_leader(&mut self, to: C::NodeId) { + self.leader.mark_transfer(to); + self.state.vote.disable_lease(); + + self.output.push_command(Command::BroadcastTransferLeader { + req: TransferLeaderRequest::new(*self.leader.committed_vote, to, self.leader.last_log_id().copied()), + }); + } + pub(crate) fn replication_handler(&mut self) -> ReplicationHandler { ReplicationHandler { config: self.config, diff --git a/openraft/src/engine/handler/leader_handler/transfer_leader_test.rs b/openraft/src/engine/handler/leader_handler/transfer_leader_test.rs new file mode 100644 index 000000000..5521fab46 --- /dev/null +++ b/openraft/src/engine/handler/leader_handler/transfer_leader_test.rs @@ -0,0 +1,76 @@ +use std::sync::Arc; +use std::time::Duration; + +use maplit::btreeset; +#[allow(unused_imports)] +use pretty_assertions::assert_eq; +#[allow(unused_imports)] +use pretty_assertions::assert_ne; +#[allow(unused_imports)] +use pretty_assertions::assert_str_eq; + +use crate::engine::testing::UTConfig; +use crate::engine::Command; +use crate::engine::Engine; +use crate::raft::TransferLeaderRequest; +use crate::testing::log_id; +use crate::type_config::TypeConfigExt; +use crate::utime::Leased; +use crate::EffectiveMembership; +use crate::Membership; +use crate::MembershipState; +use crate::Vote; + +fn m23() -> Membership { + Membership::::new(vec![btreeset! {2,3}], btreeset! {1,2,3}) +} + +fn eng() -> Engine { + let mut eng = Engine::testing_default(0); + eng.state.enable_validation(false); // Disable validation for incomplete state + + eng.config.id = 1; + eng.state.vote = Leased::new( + UTConfig::<()>::now(), + Duration::from_millis(500), + Vote::new_committed(3, 1), + ); + eng.state.log_ids.append(log_id(1, 1, 1)); + eng.state.log_ids.append(log_id(2, 1, 3)); + eng.state.membership_state = MembershipState::new( + Arc::new(EffectiveMembership::new(Some(log_id(1, 1, 1)), m23())), + Arc::new(EffectiveMembership::new(Some(log_id(2, 1, 3)), m23())), + ); + eng.testing_new_leader(); + eng.state.server_state = eng.calc_server_state(); + + eng +} + +#[test] +fn test_leader_send_heartbeat() -> anyhow::Result<()> { + let mut eng = eng(); + eng.output.take_commands(); + + let mut lh = eng.leader_handler()?; + + lh.transfer_leader(2); + + assert_eq!(lh.leader.transfer_to, Some(2)); + + let lease_info = lh.state.vote.lease_info(); + assert_eq!(lease_info.1, Duration::default()); + assert_eq!(lease_info.2, false); + + assert_eq!( + vec![ + // + Command::BroadcastTransferLeader { + req: TransferLeaderRequest::new(Vote::new_committed(3, 1), 2, Some(log_id(2, 1, 3))), + }, + ], + eng.output.take_commands() + ); + + Ok(()) +} diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 71ef25b92..69dd28a0d 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -98,8 +98,6 @@ where C: RaftTypeConfig /// /// Note: This method does not check last-log-id. handle-vote-request has to deal with /// last-log-id itself. - /// - /// This method also implies calling [`Self::update_last_seen`]. #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn update_vote(&mut self, vote: &Vote) -> Result<(), RejectVoteRequest> { // Partial ord compare: diff --git a/openraft/src/error.rs b/openraft/src/error.rs index 34553e2b6..3c7557db9 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -458,6 +458,9 @@ impl fmt::Display for PayloadTooLarge { RPCTypes::InstallSnapshot => { write!(f, "bytes:{}", self.bytes_hint)?; } + RPCTypes::TransferLeader => { + unreachable!("TransferLeader rpc should not have payload") + } } write!(f, ")")?; diff --git a/openraft/src/error/streaming_error.rs b/openraft/src/error/streaming_error.rs index 705cd5a2f..d5983c107 100644 --- a/openraft/src/error/streaming_error.rs +++ b/openraft/src/error/streaming_error.rs @@ -64,6 +64,20 @@ impl From>> for ReplicationError From> for StreamingError { + fn from(value: RPCError) -> Self { + match value { + RPCError::Timeout(e) => StreamingError::Timeout(e), + RPCError::Unreachable(e) => StreamingError::Unreachable(e), + RPCError::PayloadTooLarge(_e) => { + unreachable!("PayloadTooLarge should not be converted to StreamingError") + } + RPCError::Network(e) => StreamingError::Network(e), + RPCError::RemoteError(e) => StreamingError::RemoteError(e), + } + } +} + impl From> for ReplicationError { fn from(e: StreamingError) -> Self { match e { diff --git a/openraft/src/network/rpc_type.rs b/openraft/src/network/rpc_type.rs index c4fba3b8d..381079a8d 100644 --- a/openraft/src/network/rpc_type.rs +++ b/openraft/src/network/rpc_type.rs @@ -8,6 +8,7 @@ pub enum RPCTypes { Vote, AppendEntries, InstallSnapshot, + TransferLeader, } impl fmt::Display for RPCTypes { diff --git a/openraft/src/network/v2/network.rs b/openraft/src/network/v2/network.rs index 499fc6679..d5de5c353 100644 --- a/openraft/src/network/v2/network.rs +++ b/openraft/src/network/v2/network.rs @@ -1,14 +1,17 @@ use std::future::Future; use std::time::Duration; +use anyerror::AnyError; use openraft_macros::add_async_trait; use openraft_macros::since; use crate::error::RPCError; use crate::error::ReplicationClosed; use crate::error::StreamingError; +use crate::error::Unreachable; use crate::network::Backoff; use crate::network::RPCOption; +use crate::raft::message::TransferLeaderRequest; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; use crate::raft::SnapshotResponse; @@ -64,10 +67,12 @@ where C: RaftTypeConfig /// /// The `vote` is the leader vote which is used to check if the leader is still valid by a /// follower. - /// When the follower finished receiving snapshot, it calls `Raft::install_full_snapshot()` + /// When the follower finished receiving snapshot, it calls [`Raft::install_full_snapshot()`] /// with this vote. /// /// `cancel` get `Ready` when the caller decides to cancel this snapshot transmission. + /// + /// [`Raft::install_full_snapshot()`]: crate::raft::Raft::install_full_snapshot async fn full_snapshot( &mut self, vote: Vote, @@ -76,6 +81,22 @@ where C: RaftTypeConfig option: RPCOption, ) -> Result, StreamingError>; + /// Send TransferLeader message to the target node. + /// + /// The node received this message should pass it to [`Raft::handle_transfer_leader()`]. + /// + /// This method provide a default implementation that just return [`Unreachable`] error to + /// ignore it. In case the application did not implement it, other nodes just wait for the + /// Leader lease to timeout and then restart election. + /// + /// [`Raft::handle_transfer_leader()`]: crate::raft::Raft::handle_transfer_leader + #[since(version = "0.10.0")] + async fn transfer_leader(&mut self, _req: TransferLeaderRequest, _option: RPCOption) -> Result<(), RPCError> { + return Err(RPCError::Unreachable(Unreachable::new(&AnyError::error( + "transfer_leader not implemented", + )))); + } + /// Build a backoff instance if the target node is temporarily(or permanently) unreachable. /// /// When a [`Unreachable`](`crate::error::Unreachable`) error is returned from the `Network` diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index 93e24cd9e..36595fc51 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -34,6 +34,15 @@ use crate::RaftTypeConfig; pub(crate) struct Leader> where C: RaftTypeConfig { + /// Whether this Leader is marked as transferring to another node. + /// + /// Proposing is disabled when Leader has been transferring to another node. + /// Indicates whether the current Leader is in the process of transferring leadership to another + /// node. + /// + /// Leadership transfers disable proposing new logs. + pub(crate) transfer_to: Option, + /// The vote this leader works in. /// /// `self.voting` may be in progress requesting vote for a higher vote. @@ -110,6 +119,7 @@ where let last_log_id = last_leader_log_id.last().copied(); let mut leader = Self { + transfer_to: None, committed_vote: vote, next_heartbeat: C::now(), last_log_id, @@ -144,6 +154,14 @@ where &self.committed_vote } + pub(crate) fn mark_transfer(&mut self, to: C::NodeId) { + self.transfer_to = Some(to); + } + + pub(crate) fn get_transfer_to(&self) -> Option<&C::NodeId> { + self.transfer_to.as_ref() + } + /// Assign log ids to the entries. /// /// This method update the `self.last_log_id`. @@ -151,6 +169,8 @@ where &mut self, entries: impl IntoIterator, ) { + debug_assert!(self.transfer_to.is_none(), "leader is disabled to propose new log"); + let committed_leader_id = self.committed_vote.committed_leader_id(); let first = LogId::new(committed_leader_id, self.last_log_id().next_index()); diff --git a/openraft/src/raft/message/mod.rs b/openraft/src/raft/message/mod.rs index 64ae2d458..413687a4d 100644 --- a/openraft/src/raft/message/mod.rs +++ b/openraft/src/raft/message/mod.rs @@ -5,6 +5,7 @@ mod append_entries; mod install_snapshot; +mod transfer_leader; mod vote; mod client_write; @@ -16,5 +17,6 @@ pub use client_write::ClientWriteResult; pub use install_snapshot::InstallSnapshotRequest; pub use install_snapshot::InstallSnapshotResponse; pub use install_snapshot::SnapshotResponse; +pub use transfer_leader::TransferLeaderRequest; pub use vote::VoteRequest; pub use vote::VoteResponse; diff --git a/openraft/src/raft/message/transfer_leader.rs b/openraft/src/raft/message/transfer_leader.rs new file mode 100644 index 000000000..118cc4dba --- /dev/null +++ b/openraft/src/raft/message/transfer_leader.rs @@ -0,0 +1,56 @@ +use std::fmt; + +use crate::display_ext::DisplayOptionExt; +use crate::LogId; +use crate::RaftTypeConfig; +use crate::Vote; + +#[derive(Clone, Debug)] +#[derive(PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] +pub struct TransferLeaderRequest +where C: RaftTypeConfig +{ + /// The vote of the Leader that is transferring the leadership. + pub(crate) from: Vote, + + /// The assigned node to be the next Leader. + pub(crate) to: C::NodeId, + + /// The last log id the `to` node should at least have to become Leader. + pub(crate) last_log_id: Option>, +} + +impl TransferLeaderRequest +where C: RaftTypeConfig +{ + pub fn new(from: Vote, to: C::NodeId, last_log_id: Option>) -> Self { + Self { from, to, last_log_id } + } + + pub fn from(&self) -> &Vote { + &self.from + } + + pub fn to(&self) -> &C::NodeId { + &self.to + } + + pub fn last_log_id(&self) -> Option<&LogId> { + self.last_log_id.as_ref() + } +} + +impl fmt::Display for TransferLeaderRequest +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "(from={}, to={}, last_log_id={})", + self.from, + self.to, + self.last_log_id.display() + ) + } +} diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 05b94f94e..79f6e2cb4 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -35,6 +35,7 @@ pub use message::ClientWriteResult; pub use message::InstallSnapshotRequest; pub use message::InstallSnapshotResponse; pub use message::SnapshotResponse; +pub use message::TransferLeaderRequest; pub use message::VoteRequest; pub use message::VoteResponse; use openraft_macros::since; @@ -638,32 +639,44 @@ where C: RaftTypeConfig /// expected log entries are flushed. /// Otherwise, it just resets the Leader lease to allow the `to` node to become the Leader. /// - /// To implement Leader transfer, Call this method on every node in the cluster with the same - /// arguments. - // TODO: Explain what the `from` Leader node do. + /// The application calls + /// [`Raft::trigger().transfer_leader()`](crate::raft::trigger::Trigger::transfer_leader) to + /// submit Transfer Leader command. Then, the current Leader will broadcast it to every node in + /// the cluster via [`RaftNetworkV2::transfer_leader`] and the implementation on the remote node + /// responds to transfer leader request by calling this method. + /// + /// [`RaftNetworkV2::transfer_leader`]: crate::network::v2::RaftNetworkV2::transfer_leader #[since(version = "0.10.0")] - pub async fn transfer_leader( - &self, - from: Vote, - to: C::NodeId, - flushed_log: Option>, - ) -> Result<(), Fatal> { + pub async fn handle_transfer_leader(&self, req: TransferLeaderRequest) -> Result<(), Fatal> { // Reset the Leader lease at once and quit, if this is not the assigned next leader. // Only the assigned next Leader waits for the log to be flushed. - if to != self.inner.id { - self.inner.send_msg(RaftMsg::TransferLeader { from, to }).await?; - return Ok(()); + if req.to == self.inner.id { + self.ensure_log_flushed_for_transfer_leader(&req).await?; } + let raft_msg = RaftMsg::HandleTransferLeader { + from: req.from, + to: req.to, + }; + + self.inner.send_msg(raft_msg).await?; + + Ok(()) + } + + /// Wait for the log to be flushed to make sure the RequestVote.last_log_id is upto date, then + /// TransferLeader will be able to proceed. + async fn ensure_log_flushed_for_transfer_leader(&self, req: &TransferLeaderRequest) -> Result<(), Fatal> { // If the next Leader is this node, wait for the log to be flushed to make sure the // RequestVote.last_log_id is upto date. // Condition satisfied to become Leader - let ok = |m: &RaftMetrics| (from == m.vote && m.last_log_index.next_index() >= flushed_log.next_index()); + let ok = + |m: &RaftMetrics| (req.from == m.vote && m.last_log_index.next_index() >= req.last_log_id.next_index()); // Condition failed to become Leader #[allow(clippy::neg_cmp_op_on_partial_ord)] - let fail = |m: &RaftMetrics| !(from >= m.vote); + let fail = |m: &RaftMetrics| !(req.from >= m.vote); let timeout = Some(Duration::from_millis(self.inner.config.election_timeout_min)); let metrics_res = @@ -674,7 +687,7 @@ where C: RaftTypeConfig if fail(&metrics) { tracing::warn!( "Vote changed, give up Leader-transfer; expected vote: {}, metrics: {}", - from, + req.from, metrics ); return Ok(()); @@ -682,23 +695,21 @@ where C: RaftTypeConfig tracing::info!( "Leader-transfer condition satisfied, submit Leader-transfer message; \ expected: (vote: {}, flushed_log: {})", - from, - flushed_log.display(), + req.from, + req.last_log_id.display(), ); } Err(err) => { tracing::warn!( "Leader-transfer condition fail to satisfy, still submit Leader-transfer; \ expected: (vote: {}; flushed_log: {}), error: {}", - from, - flushed_log.display(), + req.from, + req.last_log_id.display(), err ); } }; - self.inner.send_msg(RaftMsg::TransferLeader { from, to }).await?; - Ok(()) } diff --git a/openraft/src/raft/trigger.rs b/openraft/src/raft/trigger.rs index 496b8c659..a2d40659d 100644 --- a/openraft/src/raft/trigger.rs +++ b/openraft/src/raft/trigger.rs @@ -77,4 +77,13 @@ where C: RaftTypeConfig pub async fn purge_log(&self, upto: u64) -> Result<(), Fatal> { self.raft_inner.send_external_command(ExternalCommand::PurgeLog { upto }, "purge_log").await } + + /// Submit a command to inform RaftCore to transfer leadership to the specified node. + /// + /// If this node is not a Leader, it is just ignored. + pub async fn transfer_leader(&self, to: C::NodeId) -> Result<(), Fatal> { + self.raft_inner + .send_external_command(ExternalCommand::TriggerTransferLeader { to }, "transfer_leader") + .await + } } diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index a4fe80153..f1ba9df76 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -409,16 +409,22 @@ where C: RaftTypeConfig // Safe unwrap(): vote that is committed has to already have voted for some node. let id = vote.leader_id().voted_for().unwrap(); - // leader may not step down after being removed from `voters`. - // It does not have to be a voter, being in membership is just enough - let node = self.membership_state.effective().get_node(&id); - if let Some(n) = node { - return ForwardToLeader::new(id, n.clone()); - } else { - tracing::debug!("id={} is not in membership, when getting leader id", id); - } - }; + return self.new_forward_to_leader(id); + } ForwardToLeader::empty() } + + pub(crate) fn new_forward_to_leader(&self, to: C::NodeId) -> ForwardToLeader { + // leader may not step down after being removed from `voters`. + // It does not have to be a voter, being in membership is just enough + let node = self.membership_state.effective().get_node(&to); + + if let Some(n) = node { + ForwardToLeader::new(to, n.clone()) + } else { + tracing::debug!("id={} is not in membership, when getting leader id", to); + ForwardToLeader::empty() + } + } } diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 071acc720..081ff7827 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -350,6 +350,9 @@ where // TODO: handle too large tracing::error!("InstallSnapshot RPC is too large, but it is not supported yet"); } + RPCTypes::TransferLeader => { + unreachable!("TransferLeader RPC should not be too large") + } } } diff --git a/openraft/src/utime.rs b/openraft/src/utime.rs index 69de05552..36e2d8196 100644 --- a/openraft/src/utime.rs +++ b/openraft/src/utime.rs @@ -80,15 +80,21 @@ impl Leased { self.last_update } + /// Return a tuple of the last updated time, lease duration, and whether the lease is enabled. + #[allow(dead_code)] + pub(crate) fn lease_info(&self) -> (Option, Duration, bool) { + (self.last_update, self.lease, self.lease_enabled) + } + /// Return a Display instance that shows the last updated time and lease duration relative to /// `now`. - pub(crate) fn time_info(&self, now: I) -> impl fmt::Display + '_ { - struct DisplayTimeInfo<'a, T, I: Instant> { + pub(crate) fn display_lease_info(&self, now: I) -> impl fmt::Display + '_ { + struct DisplayLeaseInfo<'a, T, I: Instant> { now: I, leased: &'a Leased, } - impl<'a, T, I> fmt::Display for DisplayTimeInfo<'a, T, I> + impl<'a, T, I> fmt::Display for DisplayLeaseInfo<'a, T, I> where I: Instant { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -113,7 +119,7 @@ impl Leased { } } - DisplayTimeInfo { now, leased: self } + DisplayLeaseInfo { now, leased: self } } /// Consumes this object and returns the inner data. diff --git a/tests/tests/append_entries/t10_conflict_with_empty_entries.rs b/tests/tests/append_entries/t10_conflict_with_empty_entries.rs index c0ca53561..4fae50ec1 100644 --- a/tests/tests/append_entries/t10_conflict_with_empty_entries.rs +++ b/tests/tests/append_entries/t10_conflict_with_empty_entries.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Result; +use openraft::network::v2::RaftNetworkV2; use openraft::network::RPCOption; -use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; use openraft::testing::blank_ent; diff --git a/tests/tests/append_entries/t10_see_higher_vote.rs b/tests/tests/append_entries/t10_see_higher_vote.rs index 8f4327589..e28670814 100644 --- a/tests/tests/append_entries/t10_see_higher_vote.rs +++ b/tests/tests/append_entries/t10_see_higher_vote.rs @@ -3,8 +3,8 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::network::v2::RaftNetworkV2; use openraft::network::RPCOption; -use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::VoteRequest; use openraft::CommittedLeaderId; diff --git a/tests/tests/append_entries/t11_append_entries_with_bigger_term.rs b/tests/tests/append_entries/t11_append_entries_with_bigger_term.rs index 12a084fa4..a7e8fbb56 100644 --- a/tests/tests/append_entries/t11_append_entries_with_bigger_term.rs +++ b/tests/tests/append_entries/t11_append_entries_with_bigger_term.rs @@ -3,8 +3,8 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::network::v2::RaftNetworkV2; use openraft::network::RPCOption; -use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; use openraft::testing::log_id; diff --git a/tests/tests/client_api/t14_transfer_leader.rs b/tests/tests/client_api/t14_transfer_leader.rs index c886eb939..b7c481a2c 100644 --- a/tests/tests/client_api/t14_transfer_leader.rs +++ b/tests/tests/client_api/t14_transfer_leader.rs @@ -2,14 +2,17 @@ use std::sync::Arc; use std::time::Duration; use maplit::btreeset; +use openraft::raft::TransferLeaderRequest; use openraft::Config; use openraft::ServerState; use crate::fixtures::ut_harness; use crate::fixtures::RaftRouter; -/// Call [`transfer_leader`](openraft::raft::Raft::transfer_leader) on every non-leader node to -/// force establish a new leader. +/// Test handling of transfer leader request. +/// +/// Call [`handle_transfer_leader`](openraft::raft::Raft::handle_transfer_leader) on every +/// non-leader node to force establish a new leader. #[tracing::instrument] #[test_harness::test(harness = ut_harness)] async fn transfer_leader() -> anyhow::Result<()> { @@ -35,10 +38,12 @@ async fn transfer_leader() -> anyhow::Result<()> { let leader_vote = metrics.vote; let last_log_id = metrics.last_applied; + let req = TransferLeaderRequest::new(leader_vote, 2, last_log_id); + tracing::info!("--- transfer Leader from 0 to 2"); { - n1.transfer_leader(leader_vote, 2, last_log_id).await?; - n2.transfer_leader(leader_vote, 2, last_log_id).await?; + n1.handle_transfer_leader(req.clone()).await?; + n2.handle_transfer_leader(req.clone()).await?; n2.wait(timeout()).state(ServerState::Leader, "node-2 become leader").await?; n0.wait(timeout()).state(ServerState::Follower, "node-0 become follower").await?; @@ -46,8 +51,10 @@ async fn transfer_leader() -> anyhow::Result<()> { tracing::info!("--- can NOT transfer Leader from 2 to 0 with an old vote"); { - n0.transfer_leader(leader_vote, 0, last_log_id).await?; - n1.transfer_leader(leader_vote, 0, last_log_id).await?; + let req = TransferLeaderRequest::new(leader_vote, 0, last_log_id); + + n0.handle_transfer_leader(req.clone()).await?; + n1.handle_transfer_leader(req.clone()).await?; let n0_res = n0 .wait(Some(Duration::from_millis(1_000))) @@ -60,6 +67,43 @@ async fn transfer_leader() -> anyhow::Result<()> { Ok(()) } +/// Test trigger transfer leader on the Leader. +/// +/// Call [`trigger().transfer_leader()`](openraft::raft::trigger::Trigger::transfer_leader) on the +/// Leader node to force establish a new leader. +#[tracing::instrument] +#[test_harness::test(harness = ut_harness)] +async fn trigger_transfer_leader() -> anyhow::Result<()> { + let config = Arc::new( + Config { + election_timeout_min: 150, + election_timeout_max: 300, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + + tracing::info!("--- initializing cluster"); + let _log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?; + + let n0 = router.get_raft_handle(&0)?; + let n1 = router.get_raft_handle(&1)?; + let n2 = router.get_raft_handle(&2)?; + + tracing::info!("--- trigger transfer Leader from 0 to 2"); + { + n0.trigger().transfer_leader(2).await?; + + n2.wait(timeout()).state(ServerState::Leader, "node-2 become leader").await?; + n0.wait(timeout()).state(ServerState::Follower, "node-0 become follower").await?; + n1.wait(timeout()).state(ServerState::Follower, "node-1 become follower").await?; + } + + Ok(()) +} + fn timeout() -> Option { Some(Duration::from_millis(500)) } diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 0f135b4ea..b2e70e141 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -9,6 +9,7 @@ use std::collections::BTreeSet; use std::collections::HashMap; use std::env; use std::fmt; +use std::future::Future; use std::panic::PanicInfo; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -25,22 +26,22 @@ use openraft::error::CheckIsLeaderError; use openraft::error::ClientWriteError; use openraft::error::Fatal; use openraft::error::Infallible; -use openraft::error::InstallSnapshotError; use openraft::error::NetworkError; use openraft::error::PayloadTooLarge; use openraft::error::RPCError; use openraft::error::RaftError; -use openraft::error::RemoteError; +use openraft::error::ReplicationClosed; +use openraft::error::StreamingError; use openraft::error::Unreachable; use openraft::metrics::Wait; use openraft::network::RPCOption; -use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; use openraft::raft::AppendEntriesResponse; use openraft::raft::ClientWriteResponse; use openraft::raft::InstallSnapshotRequest; -use openraft::raft::InstallSnapshotResponse; +use openraft::raft::SnapshotResponse; +use openraft::raft::TransferLeaderRequest; use openraft::raft::VoteRequest; use openraft::raft::VoteResponse; use openraft::storage::RaftLogStorage; @@ -48,6 +49,7 @@ use openraft::storage::RaftStateMachine; use openraft::Config; use openraft::LogId; use openraft::LogIdOptionExt; +use openraft::OptionalSend; use openraft::RPCTypes; use openraft::Raft; use openraft::RaftLogId; @@ -56,6 +58,7 @@ use openraft::RaftMetrics; use openraft::RaftState; use openraft::RaftTypeConfig; use openraft::ServerState; +use openraft::Snapshot; use openraft::Vote; use openraft_memstore::ClientRequest; use openraft_memstore::ClientResponse; @@ -182,6 +185,7 @@ impl fmt::Display for Direction { } } +use openraft::network::v2::RaftNetworkV2; use Direction::NetRecv; use Direction::NetSend; @@ -196,11 +200,8 @@ pub enum RPCErrorType { } impl RPCErrorType { - fn make_error(&self, id: C::NodeId, dir: Direction) -> RPCError> - where - C: RaftTypeConfig, - E: std::error::Error, - { + fn make_error(&self, id: C::NodeId, dir: Direction) -> RPCError + where C: RaftTypeConfig { let msg = format!("error {} id={}", dir, id); match self { @@ -214,6 +215,9 @@ impl RPCErrorType { RPCTypes::InstallSnapshot => { unreachable!("InstallSnapshot RPC should not be too large") } + RPCTypes::TransferLeader => { + unreachable!("TransferLeader RPC should not be too large") + } }, } } @@ -226,7 +230,9 @@ pub type PreHookResult = Result<(), RPCError>; pub enum RPCRequest { AppendEntries(AppendEntriesRequest), InstallSnapshot(InstallSnapshotRequest), + InstallFullSnapshot(Snapshot), Vote(VoteRequest), + TransferLeader(TransferLeaderRequest), } impl RPCRequest { @@ -234,7 +240,9 @@ impl RPCRequest { match self { RPCRequest::AppendEntries(_) => RPCTypes::AppendEntries, RPCRequest::InstallSnapshot(_) => RPCTypes::InstallSnapshot, + RPCRequest::InstallFullSnapshot(_) => RPCTypes::InstallSnapshot, RPCRequest::Vote(_) => RPCTypes::Vote, + RPCRequest::TransferLeader(_) => RPCTypes::TransferLeader, } } } @@ -966,14 +974,7 @@ impl TypedRaftRouter { } #[tracing::instrument(level = "debug", skip(self))] - pub fn emit_rpc_error( - &self, - id: MemNodeId, - target: MemNodeId, - ) -> Result<(), RPCError>> - where - E: std::error::Error, - { + pub fn emit_rpc_error(&self, id: MemNodeId, target: MemNodeId) -> Result<(), RPCError> { let fails = self.fail_rpc.lock().unwrap(); for key in [(id, NetSend), (target, NetRecv)] { @@ -1002,13 +1003,13 @@ pub struct RaftRouterNetwork { owner: TypedRaftRouter, } -impl RaftNetwork for RaftRouterNetwork { +impl RaftNetworkV2 for RaftRouterNetwork { /// Send an AppendEntries RPC to the target Raft node (§5). async fn append_entries( &mut self, mut rpc: AppendEntriesRequest, _option: RPCOption, - ) -> Result, RPCError>> { + ) -> Result, RPCError> { let from_id = rpc.vote.leader_id().voted_for().unwrap(); tracing::debug!("append_entries to id={} {}", self.target, rpc); @@ -1054,7 +1055,12 @@ impl RaftNetwork for RaftRouterNetwork { let resp = node.append_entries(rpc).await; tracing::debug!("append_entries: recv resp from id={} {:?}", self.target, resp); - let resp = resp.map_err(|e| RemoteError::new(self.target, e))?; + let resp = resp.map_err(|e| { + RPCError::Unreachable(Unreachable::new(&AnyError::error(format!( + "error: {} target={}", + e, self.target + )))) + })?; // If entries are truncated by quota, return an partial success response. if let Some(truncated) = truncated { @@ -1067,24 +1073,29 @@ impl RaftNetwork for RaftRouterNetwork { } } - /// Send an InstallSnapshot RPC to the target Raft node (§7). - async fn install_snapshot( + async fn full_snapshot( &mut self, - rpc: InstallSnapshotRequest, + vote: Vote, + snapshot: Snapshot, + _cancel: impl Future + OptionalSend + 'static, _option: RPCOption, - ) -> Result, RPCError>> - { - let from_id = rpc.vote.leader_id().voted_for().unwrap(); + ) -> Result, StreamingError> { + let from_id = vote.leader_id().voted_for().unwrap(); self.owner.count_rpc(RPCTypes::InstallSnapshot); - self.owner.call_rpc_pre_hook(rpc.clone(), from_id, self.target)?; + self.owner.call_rpc_pre_hook(snapshot.clone(), from_id, self.target)?; self.owner.emit_rpc_error(from_id, self.target)?; self.owner.rand_send_delay().await; let node = self.owner.get_raft_handle(&self.target)?; - let resp = node.install_snapshot(rpc).await; - let resp = resp.map_err(|e| RemoteError::new(self.target, e))?; + let resp = node.install_full_snapshot(vote, snapshot).await; + let resp = resp.map_err(|e| { + RPCError::Unreachable(Unreachable::new(&AnyError::error(format!( + "error: {} target={}", + e, self.target + )))) + })?; Ok(resp) } @@ -1094,7 +1105,7 @@ impl RaftNetwork for RaftRouterNetwork { &mut self, rpc: VoteRequest, _option: RPCOption, - ) -> Result, RPCError>> { + ) -> Result, RPCError> { let from_id = rpc.vote.leader_id().voted_for().unwrap(); self.owner.count_rpc(RPCTypes::Vote); @@ -1105,10 +1116,38 @@ impl RaftNetwork for RaftRouterNetwork { let node = self.owner.get_raft_handle(&self.target)?; let resp = node.vote(rpc).await; - let resp = resp.map_err(|e| RemoteError::new(self.target, e))?; + let resp = resp.map_err(|e| { + RPCError::Unreachable(Unreachable::new(&AnyError::error(format!( + "error: {} target={}", + e, self.target + )))) + })?; Ok(resp) } + + async fn transfer_leader( + &mut self, + rpc: TransferLeaderRequest, + _option: RPCOption, + ) -> Result<(), RPCError> { + let from_id = rpc.from().leader_id().voted_for().unwrap(); + + self.owner.count_rpc(RPCTypes::TransferLeader); + self.owner.call_rpc_pre_hook(rpc.clone(), from_id, self.target)?; + self.owner.emit_rpc_error(from_id, self.target)?; + self.owner.rand_send_delay().await; + + let node = self.owner.get_raft_handle(&self.target)?; + + let resp = node.handle_transfer_leader(rpc).await; + resp.map_err(|e| { + RPCError::Unreachable(Unreachable::new(&AnyError::error(format!( + "error: {} target={}", + e, self.target + )))) + }) + } } pub enum ValueTest { diff --git a/tests/tests/snapshot_building/t10_build_snapshot.rs b/tests/tests/snapshot_building/t10_build_snapshot.rs index 4b69a315f..ce8299061 100644 --- a/tests/tests/snapshot_building/t10_build_snapshot.rs +++ b/tests/tests/snapshot_building/t10_build_snapshot.rs @@ -3,8 +3,8 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::network::v2::RaftNetworkV2; use openraft::network::RPCOption; -use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; use openraft::storage::RaftLogStorageExt; diff --git a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs index c6301c21e..461c36211 100644 --- a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs +++ b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs @@ -3,8 +3,8 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::network::v2::RaftNetworkV2; use openraft::network::RPCOption; -use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; use openraft::testing::blank_ent; diff --git a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs index 2acc1fac6..7b3e43427 100644 --- a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs +++ b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs @@ -3,8 +3,8 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::network::v2::RaftNetworkV2; use openraft::network::RPCOption; -use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; use openraft::testing::blank_ent; diff --git a/tests/tests/snapshot_streaming/t31_snapshot_overrides_membership.rs b/tests/tests/snapshot_streaming/t31_snapshot_overrides_membership.rs index bbe820f8d..cb3d4b506 100644 --- a/tests/tests/snapshot_streaming/t31_snapshot_overrides_membership.rs +++ b/tests/tests/snapshot_streaming/t31_snapshot_overrides_membership.rs @@ -3,8 +3,8 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::network::v2::RaftNetworkV2; use openraft::network::RPCOption; -use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; use openraft::storage::StorageHelper; diff --git a/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs b/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs index f2d0b1c2b..5fd79869f 100644 --- a/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs +++ b/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs @@ -3,11 +3,10 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::network::v2::RaftNetworkV2; use openraft::network::RPCOption; -use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; -use openraft::raft::InstallSnapshotRequest; use openraft::storage::RaftLogStorage; use openraft::storage::RaftLogStorageExt; use openraft::storage::RaftStateMachine; @@ -148,18 +147,15 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> { b.build_snapshot().await? }; - let req = InstallSnapshotRequest { - vote: sto0.read_vote().await?.unwrap(), - meta: snap.meta.clone(), - offset: 0, - data: snap.snapshot.into_inner(), - done: true, - }; - + let vote = sto0.read_vote().await?.unwrap(); let option = RPCOption::new(Duration::from_millis(1_000)); #[allow(deprecated)] - router.new_client(1, &()).await.install_snapshot(req, option).await?; + router + .new_client(1, &()) + .await + .full_snapshot(vote, snap, futures::future::pending(), option) + .await?; tracing::info!(log_index, "--- DONE installing snapshot");