From dad80320fc87cd7d1b8f1805b5bebaac085a5ea5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sat, 3 Aug 2024 13:51:13 +0800 Subject: [PATCH] Refactor: Send heartbeat with dedicated workers Heavy AppendEntries traffic can block heartbeat messages. For example, future AppendEntries in stream RPC may not receive a response indicating a follower is alive. In such cases, the leader might time out to extend its lease, and be considered partitioned from the cluster. This commit moves heartbeat broadcasting to separate tasks that won't be blocked by AppendEntries. This ensures the leader can always be acknowledged with the liveness of followers. Separate log progress notification and clock progress notification: When ReplicationCore successfully finished one RPC to Follower/Learner, it informs the RaftCore to update log progress and clock(heartbeat) progress. This commit split these two informations into two `Notification` variants, in order to make progress handling more clear. Another improvement is to ignore a heartbeat progress if it is sent with an older cluster membership config. Because a follower can be removed and re-added, the obsolete heartbeat progress is invalid. This check is done by remembering the membership log id in the `HeartbeatEvent`. `HigherVote` can be sent directly to Notification channel. replication::Response does not need `HigherVote` variant any more. And `Response` is renamed to `Progress` --- openraft/src/core/heartbeat/event.rs | 64 ++++++++ openraft/src/core/heartbeat/handle.rs | 97 +++++++++++ openraft/src/core/heartbeat/mod.rs | 3 + openraft/src/core/heartbeat/worker.rs | 114 +++++++++++++ openraft/src/core/mod.rs | 1 + openraft/src/core/notification.rs | 28 +++- openraft/src/core/raft_core.rs | 77 ++++----- openraft/src/engine/command.rs | 20 ++- .../src/engine/handler/leader_handler/mod.rs | 15 +- .../leader_handler/send_heartbeat_test.rs | 54 ++----- .../engine/handler/replication_handler/mod.rs | 91 +++-------- .../src/engine/handler/vote_handler/mod.rs | 3 +- openraft/src/progress/inflight/mod.rs | 7 +- openraft/src/raft/message/append_entries.rs | 6 + openraft/src/raft/mod.rs | 2 + openraft/src/replication/mod.rs | 83 ++++++---- .../src/replication/replication_session_id.rs | 14 +- openraft/src/replication/response.rs | 151 +++++------------- openraft/src/storage/callback.rs | 3 +- .../t60_snapshot_chunk_size.rs | 59 ++----- 20 files changed, 550 insertions(+), 342 deletions(-) create mode 100644 openraft/src/core/heartbeat/event.rs create mode 100644 openraft/src/core/heartbeat/handle.rs create mode 100644 openraft/src/core/heartbeat/mod.rs create mode 100644 openraft/src/core/heartbeat/worker.rs diff --git a/openraft/src/core/heartbeat/event.rs b/openraft/src/core/heartbeat/event.rs new file mode 100644 index 000000000..931f0810b --- /dev/null +++ b/openraft/src/core/heartbeat/event.rs @@ -0,0 +1,64 @@ +use std::fmt; + +use crate::display_ext::DisplayInstantExt; +use crate::display_ext::DisplayOptionExt; +use crate::replication::ReplicationSessionId; +use crate::type_config::alias::InstantOf; +use crate::LogId; +use crate::RaftTypeConfig; + +/// The information for broadcasting a heartbeat. +#[derive(Debug, Clone, Copy)] +#[derive(PartialEq, Eq)] +pub struct HeartbeatEvent +where C: RaftTypeConfig +{ + /// The timestamp when this heartbeat is sent. + /// + /// The Leader use this sending time to calculate the quorum acknowledge time, but not the + /// receiving timestamp. + pub(crate) time: InstantOf, + + /// The vote of the Leader that submit this heartbeat and the log id of the cluster config. + /// + /// The response that matches this session id is considered as a valid response. + /// Otherwise, it is considered as an outdated response from older leader or older cluster + /// membership config and will be ignored. + pub(crate) session_id: ReplicationSessionId, + + /// The last known committed log id of the Leader. + /// + /// When there are no new logs to replicate, the Leader sends a heartbeat to replicate committed + /// log id to followers to update their committed log id. + pub(crate) committed: Option>, +} + +impl HeartbeatEvent +where C: RaftTypeConfig +{ + pub(crate) fn new( + time: InstantOf, + session_id: ReplicationSessionId, + committed: Option>, + ) -> Self { + Self { + time, + session_id, + committed, + } + } +} + +impl fmt::Display for HeartbeatEvent +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "(time={}, leader_vote: {}, committed: {})", + self.time.display(), + self.session_id, + self.committed.display() + ) + } +} diff --git a/openraft/src/core/heartbeat/handle.rs b/openraft/src/core/heartbeat/handle.rs new file mode 100644 index 000000000..42065ba0e --- /dev/null +++ b/openraft/src/core/heartbeat/handle.rs @@ -0,0 +1,97 @@ +use std::collections::BTreeMap; +use std::sync::Arc; + +use tracing::Instrument; +use tracing::Level; +use tracing::Span; + +use crate::async_runtime::watch::WatchSender; +use crate::core::heartbeat::event::HeartbeatEvent; +use crate::core::heartbeat::worker::HeartbeatWorker; +use crate::core::notification::Notification; +use crate::type_config::alias::JoinHandleOf; +use crate::type_config::alias::MpscUnboundedSenderOf; +use crate::type_config::alias::OneshotSenderOf; +use crate::type_config::alias::WatchReceiverOf; +use crate::type_config::alias::WatchSenderOf; +use crate::type_config::TypeConfigExt; +use crate::Config; +use crate::RaftNetworkFactory; +use crate::RaftTypeConfig; + +pub(crate) struct HeartbeatWorkersHandle +where C: RaftTypeConfig +{ + pub(crate) id: C::NodeId, + + pub(crate) config: Arc, + + /// Inform the heartbeat task to broadcast heartbeat message. + /// + /// A Leader will periodically update this value to trigger sending heartbeat messages. + pub(crate) tx: WatchSenderOf>>, + + /// The receiving end of heartbeat command. + /// + /// A separate task will have a clone of this receiver to receive and execute heartbeat command. + pub(crate) rx: WatchReceiverOf>>, + + pub(crate) workers: BTreeMap, JoinHandleOf)>, +} + +impl HeartbeatWorkersHandle +where C: RaftTypeConfig +{ + pub(crate) fn new(id: C::NodeId, config: Arc) -> Self { + let (tx, rx) = C::watch_channel(None); + + Self { + id, + config, + tx, + rx, + workers: Default::default(), + } + } + + pub(crate) fn broadcast(&self, event: HeartbeatEvent) { + tracing::debug!("id={} send_heartbeat {}", self.id, event); + let _ = self.tx.send(Some(event)); + } + + pub(crate) async fn spawn_workers( + &mut self, + network_factory: &mut NF, + tx_notification: &MpscUnboundedSenderOf>, + targets: impl IntoIterator, + ) where + NF: RaftNetworkFactory, + { + for (target, node) in targets { + tracing::debug!("id={} spawn HeartbeatWorker target={}", self.id, target); + let network = network_factory.new_client(target, &node).await; + + let worker = HeartbeatWorker { + id: self.id, + rx: self.rx.clone(), + network, + target, + node, + config: self.config.clone(), + tx_notification: tx_notification.clone(), + }; + + let span = tracing::span!(parent: &Span::current(), Level::DEBUG, "heartbeat", id=display(self.id), target=display(target)); + + let (tx_shutdown, rx_shutdown) = C::oneshot(); + + let worker_handle = C::spawn(worker.run(rx_shutdown).instrument(span)); + self.workers.insert(target, (tx_shutdown, worker_handle)); + } + } + + pub(crate) fn shutdown(&mut self) { + self.workers.clear(); + tracing::info!("id={} HeartbeatWorker are shutdown", self.id); + } +} diff --git a/openraft/src/core/heartbeat/mod.rs b/openraft/src/core/heartbeat/mod.rs new file mode 100644 index 000000000..ddc964b27 --- /dev/null +++ b/openraft/src/core/heartbeat/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod event; +pub(crate) mod handle; +pub(crate) mod worker; diff --git a/openraft/src/core/heartbeat/worker.rs b/openraft/src/core/heartbeat/worker.rs new file mode 100644 index 000000000..8ff245f05 --- /dev/null +++ b/openraft/src/core/heartbeat/worker.rs @@ -0,0 +1,114 @@ +use std::fmt; +use std::ops::Deref; +use std::sync::Arc; +use std::time::Duration; + +use futures::FutureExt; + +use crate::async_runtime::watch::WatchReceiver; +use crate::async_runtime::MpscUnboundedSender; +use crate::core::heartbeat::event::HeartbeatEvent; +use crate::core::notification::Notification; +use crate::network::v2::RaftNetworkV2; +use crate::network::RPCOption; +use crate::raft::AppendEntriesRequest; +use crate::type_config::alias::MpscUnboundedSenderOf; +use crate::type_config::alias::OneshotReceiverOf; +use crate::type_config::alias::WatchReceiverOf; +use crate::type_config::TypeConfigExt; +use crate::Config; +use crate::RaftTypeConfig; + +/// A dedicate worker sending heartbeat to a specific follower. +pub struct HeartbeatWorker +where + C: RaftTypeConfig, + N: RaftNetworkV2, +{ + pub(crate) id: C::NodeId, + + /// The receiver will be changed when a new heartbeat is needed to be sent. + pub(crate) rx: WatchReceiverOf>>, + + pub(crate) network: N, + + pub(crate) target: C::NodeId, + + #[allow(dead_code)] + pub(crate) node: C::Node, + + pub(crate) config: Arc, + + /// For sending back result to the [`RaftCore`]. + /// + /// [`RaftCore`]: crate::core::RaftCore + pub(crate) tx_notification: MpscUnboundedSenderOf>, +} + +impl fmt::Display for HeartbeatWorker +where + C: RaftTypeConfig, + N: RaftNetworkV2, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "HeartbeatWorker(id={}, target={})", self.id, self.target) + } +} + +impl HeartbeatWorker +where + C: RaftTypeConfig, + N: RaftNetworkV2, +{ + pub(crate) async fn run(mut self, mut rx_shutdown: OneshotReceiverOf) { + loop { + tracing::debug!("{} is waiting for a new heartbeat event.", self); + + futures::select! { + _ = (&mut rx_shutdown).fuse() => { + tracing::info!("{} is shutdown.", self); + return; + }, + _ = self.rx.changed().fuse() => {}, + } + + let heartbeat: Option> = *self.rx.borrow_watched(); + + // None is the initial value of the WatchReceiver, ignore it. + let Some(heartbeat) = heartbeat else { + continue; + }; + + let timeout = Duration::from_millis(self.config.heartbeat_interval); + let option = RPCOption::new(timeout); + + let payload = AppendEntriesRequest { + vote: *heartbeat.session_id.leader_vote.deref(), + prev_log_id: None, + leader_commit: heartbeat.committed, + entries: vec![], + }; + + let res = C::timeout(timeout, self.network.append_entries(payload, option)).await; + tracing::debug!("{} sent a heartbeat: {}, result: {:?}", self, heartbeat, res); + + match res { + Ok(Ok(_)) => { + let res = self.tx_notification.send(Notification::HeartbeatProgress { + session_id: heartbeat.session_id, + sending_time: heartbeat.time, + target: self.target, + }); + + if res.is_err() { + tracing::error!("{} failed to send a heartbeat progress to RaftCore. quit", self); + return; + } + } + _ => { + tracing::warn!("{} failed to send a heartbeat: {:?}", self, res); + } + } + } + } +} diff --git a/openraft/src/core/mod.rs b/openraft/src/core/mod.rs index 1a7a4850c..b7bba56aa 100644 --- a/openraft/src/core/mod.rs +++ b/openraft/src/core/mod.rs @@ -5,6 +5,7 @@ //! storage or forward messages to other raft nodes. pub(crate) mod balancer; +pub(crate) mod heartbeat; pub(crate) mod notification; mod raft_core; pub(crate) mod raft_msg; diff --git a/openraft/src/core/notification.rs b/openraft/src/core/notification.rs index f5cdb4d29..36dff9a63 100644 --- a/openraft/src/core/notification.rs +++ b/openraft/src/core/notification.rs @@ -1,9 +1,12 @@ use std::fmt; use crate::core::sm; +use crate::display_ext::DisplayInstantExt; use crate::raft::VoteResponse; use crate::raft_state::IOId; use crate::replication; +use crate::replication::ReplicationSessionId; +use crate::type_config::alias::InstantOf; use crate::RaftTypeConfig; use crate::StorageError; use crate::Vote; @@ -47,7 +50,13 @@ where C: RaftTypeConfig LocalIO { io_id: IOId }, /// Result of executing a command sent from network worker. - Network { response: replication::Response }, + ReplicationProgress { progress: replication::Progress }, + + HeartbeatProgress { + session_id: ReplicationSessionId, + sending_time: InstantOf, + target: C::NodeId, + }, /// Result of executing a command sent from state machine worker. StateMachine { command_result: sm::CommandResult }, @@ -96,8 +105,21 @@ where C: RaftTypeConfig } Self::StorageError { error } => write!(f, "StorageError: {}", error), Self::LocalIO { io_id } => write!(f, "IOFlushed: {}", io_id), - Self::Network { response } => { - write!(f, "{}", response) + Self::ReplicationProgress { progress } => { + write!(f, "{}", progress) + } + Self::HeartbeatProgress { + session_id: leader_vote, + sending_time, + target, + } => { + write!( + f, + "HeartbeatProgress: target={}, leader_vote: {}, sending_time: {}", + target, + leader_vote, + sending_time.display(), + ) } Self::StateMachine { command_result } => { write!(f, "{}", command_result) diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 48af51fc0..a6ee487ab 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -24,6 +24,8 @@ use crate::async_runtime::TryRecvError; use crate::config::Config; use crate::config::RuntimeConfig; use crate::core::balancer::Balancer; +use crate::core::heartbeat::event::HeartbeatEvent; +use crate::core::heartbeat::handle::HeartbeatWorkersHandle; use crate::core::notification::Notification; use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::AppendEntriesTx; @@ -38,7 +40,6 @@ use crate::display_ext::DisplayOptionExt; use crate::display_ext::DisplayResultExt; use crate::display_ext::DisplaySlice; use crate::display_ext::DisplaySliceExt; -use crate::engine::handler::replication_handler::SendNone; use crate::engine::Command; use crate::engine::Condition; use crate::engine::Engine; @@ -79,7 +80,6 @@ use crate::raft::VoteRequest; use crate::raft::VoteResponse; use crate::raft_state::io_state::io_id::IOId; use crate::raft_state::LogStateReader; -use crate::replication; use crate::replication::request::Replicate; use crate::replication::request_id::RequestId; use crate::replication::response::ReplicationResult; @@ -197,6 +197,8 @@ where /// A mapping of node IDs the replication state of the target node. pub(crate) replications: BTreeMap>, + pub(crate) heartbeat_handle: HeartbeatWorkersHandle, + #[allow(dead_code)] pub(crate) tx_api: MpscUnboundedSenderOf>, pub(crate) rx_api: MpscUnboundedReceiverOf>, @@ -841,6 +843,8 @@ where pub async fn remove_all_replication(&mut self) { tracing::info!("remove all replication"); + self.heartbeat_handle.shutdown(); + let nodes = std::mem::take(&mut self.replications); tracing::debug!( @@ -974,7 +978,7 @@ where // Keep replicating to a target if the replication stream to it is idle. if let Ok(mut lh) = self.engine.leader_handler() { - lh.replication_handler().initiate_replication(SendNone::False); + lh.replication_handler().initiate_replication(); } self.run_engine_commands().await?; } @@ -1418,39 +1422,28 @@ where } } - Notification::Network { response } => { - // - match response { - replication::Response::Progress { - target, - request_id: id, - result, - session_id, - } => { - // If vote or membership changes, ignore the message. - // There is chance delayed message reports a wrong state. - if self.does_replication_session_match(&session_id, "UpdateReplicationMatched") { - self.handle_replication_progress(target, id, result); - } - } - - replication::Response::HigherVote { - target, - higher, - sender_vote, - } => { - tracing::info!( - target = display(target), - higher_vote = display(&higher), - sender_vote = display(&sender_vote), - "received Notification::HigherVote: {}", - func_name!() - ); + Notification::ReplicationProgress { progress } => { + // If vote or membership changes, ignore the message. + // There is chance delayed message reports a wrong state. + if self.does_replication_session_match(&progress.session_id, "ReplicationProgress") { + self.handle_replication_progress(progress.target, progress.request_id, progress.result); + } + } - if self.does_vote_match(&sender_vote, "HigherVote") { - // Rejected vote change is ok. - let _ = self.engine.vote_handler().update_vote(&higher); - } + Notification::HeartbeatProgress { + session_id, + sending_time, + target, + } => { + if self.does_replication_session_match(&session_id, "HeartbeatProgress") { + tracing::debug!( + session_id = display(session_id), + target = display(target), + sending_time = display(sending_time.display()), + "HeartbeatProgress" + ); + if self.engine.leader.is_some() { + self.engine.replication_handler().update_leader_clock(target, sending_time); } } } @@ -1562,7 +1555,7 @@ where fn handle_replication_progress( &mut self, target: C::NodeId, - request_id: RequestId, + request_id: u64, result: Result, String>, ) { tracing::debug!( @@ -1782,6 +1775,9 @@ where let _ = node.tx_repl.send(Replicate::Committed(committed)); } } + Command::BroadcastHeartbeat { session_id, committed } => { + self.heartbeat_handle.broadcast(HeartbeatEvent::new(C::now(), session_id, committed)) + } Command::SaveCommitted { committed } => { self.log_store.save_committed(Some(committed)).await?; } @@ -1819,6 +1815,15 @@ where let handle = self.spawn_replication_stream(*target, *matching).await; self.replications.insert(*target, handle); } + + let effective = self.engine.state.membership_state.effective().clone(); + + let nodes = targets.into_iter().map(|p| { + let node_id = p.0; + (node_id, effective.get_node(&node_id).unwrap().clone()) + }); + + self.heartbeat_handle.spawn_workers(&mut self.network_factory, &self.tx_notification, nodes).await; } Command::StateMachine { command } => { let io_id = command.get_submit_io(); diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 651e62e9c..fd962573f 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -19,6 +19,7 @@ use crate::raft::SnapshotResponse; use crate::raft::VoteRequest; use crate::raft::VoteResponse; use crate::raft_state::IOId; +use crate::replication::ReplicationSessionId; use crate::type_config::alias::OneshotSenderOf; use crate::vote::CommittedVote; use crate::LogId; @@ -66,6 +67,12 @@ where C: RaftTypeConfig /// Replicate the committed log id to other nodes ReplicateCommitted { committed: Option> }, + /// Broadcast heartbeat to all other nodes. + BroadcastHeartbeat { + session_id: ReplicationSessionId, + committed: Option>, + }, + /// Save the committed log id to [`RaftLogStorage`]. /// /// Upon startup, the saved committed log ids will be re-applied to state machine to restore the @@ -147,6 +154,14 @@ where C: RaftTypeConfig Command::ReplicateCommitted { committed } => { write!(f, "ReplicateCommitted: {}", committed.display()) } + Command::BroadcastHeartbeat { session_id, committed } => { + write!( + f, + "BroadcastHeartbeat: session_id:{}, committed:{}", + session_id, + committed.display() + ) + } Command::SaveCommitted { committed } => write!(f, "SaveCommitted: {}", committed), Command::Apply { already_committed, @@ -188,7 +203,8 @@ where match (self, other) { (Command::UpdateIOProgress { when, io_id }, Command::UpdateIOProgress { when: wb, io_id: ab }, ) => when == wb && io_id == ab, (Command::AppendInputEntries { committed_vote: vote, entries }, Command::AppendInputEntries { committed_vote: vb, entries: b }, ) => vote == vb && entries == b, - (Command::ReplicateCommitted { committed }, Command::ReplicateCommitted { committed: b }, ) => committed == b, + (Command::ReplicateCommitted { committed }, Command::ReplicateCommitted { committed: b }, ) => committed == b, + (Command::BroadcastHeartbeat { session_id, committed }, Command::BroadcastHeartbeat { session_id: sb, committed: b }, ) => session_id == sb && committed == b, (Command::SaveCommitted { committed }, Command::SaveCommitted { committed: b }) => committed == b, (Command::Apply { already_committed, upto, }, Command::Apply { already_committed: b_committed, upto: b_upto, }, ) => already_committed == b_committed && upto == b_upto, (Command::Replicate { target, req }, Command::Replicate { target: b_target, req: other_req, }, ) => target == b_target && req == other_req, @@ -226,6 +242,7 @@ where C: RaftTypeConfig Command::PurgeLog { .. } => CommandKind::Log, Command::ReplicateCommitted { .. } => CommandKind::Network, + Command::BroadcastHeartbeat { .. } => CommandKind::Network, Command::Replicate { .. } => CommandKind::Network, Command::BroadcastTransferLeader { .. } => CommandKind::Network, Command::SendVote { .. } => CommandKind::Network, @@ -251,6 +268,7 @@ where C: RaftTypeConfig Command::PurgeLog { upto } => Some(Condition::Snapshot { log_id: Some(*upto) }), Command::ReplicateCommitted { .. } => None, + Command::BroadcastHeartbeat { .. } => None, Command::Replicate { .. } => None, Command::BroadcastTransferLeader { .. } => None, Command::SendVote { .. } => None, diff --git a/openraft/src/engine/handler/leader_handler/mod.rs b/openraft/src/engine/handler/leader_handler/mod.rs index 5f8a61042..368f09ecc 100644 --- a/openraft/src/engine/handler/leader_handler/mod.rs +++ b/openraft/src/engine/handler/leader_handler/mod.rs @@ -1,5 +1,4 @@ use crate::engine::handler::replication_handler::ReplicationHandler; -use crate::engine::handler::replication_handler::SendNone; use crate::engine::Command; use crate::engine::EngineConfig; use crate::engine::EngineOutput; @@ -9,6 +8,7 @@ use crate::proposer::LeaderQuorumSet; use crate::raft::message::TransferLeaderRequest; use crate::raft_state::IOId; use crate::raft_state::LogStateReader; +use crate::replication::ReplicationSessionId; use crate::type_config::alias::LogIdOf; use crate::RaftLogId; use crate::RaftState; @@ -92,13 +92,18 @@ where C: RaftTypeConfig rh.append_membership(&log_id, &m); } - rh.initiate_replication(SendNone::False); + rh.initiate_replication(); } #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn send_heartbeat(&mut self) -> () { - let mut rh = self.replication_handler(); - rh.initiate_replication(SendNone::True); + pub(crate) fn send_heartbeat(&mut self) { + let membership_log_id = self.state.membership_state.effective().log_id(); + let session_id = ReplicationSessionId::new(self.leader.committed_vote, *membership_log_id); + + self.output.push_command(Command::BroadcastHeartbeat { + session_id, + committed: self.state.committed().copied(), + }); } /// Get the log id for a linearizable read. diff --git a/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs b/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs index 55fe77ab3..e05f6c550 100644 --- a/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs +++ b/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs @@ -2,18 +2,12 @@ use std::sync::Arc; use std::time::Duration; use maplit::btreeset; -#[allow(unused_imports)] use pretty_assertions::assert_eq; -#[allow(unused_imports)] -use pretty_assertions::assert_ne; -#[allow(unused_imports)] -use pretty_assertions::assert_str_eq; use crate::engine::testing::UTConfig; use crate::engine::Command; use crate::engine::Engine; -use crate::progress::Inflight; -use crate::progress::Progress; +use crate::replication::ReplicationSessionId; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; @@ -63,47 +57,31 @@ fn test_leader_send_heartbeat() -> anyhow::Result<()> { eng.leader_handler()?.send_heartbeat(); assert_eq!( vec![ - Command::Replicate { - target: 2, - req: Inflight::logs(None, Some(log_id(2, 1, 3))).with_id(1), - }, - Command::Replicate { - target: 3, - req: Inflight::logs(None, Some(log_id(2, 1, 3))).with_id(1), + // + Command::BroadcastHeartbeat { + session_id: ReplicationSessionId::new(Vote::new(3, 1).into_committed(), Some(log_id(2, 1, 3))), + committed: Some(log_id(0, 1, 0)) }, ], eng.output.take_commands() ); } - // No RPC will be sent if there are inflight RPC + // Heartbeat will be resent { eng.output.clear_commands(); eng.leader_handler()?.send_heartbeat(); - assert!(eng.output.take_commands().is_empty()); - } - - // No data to send, sending a heartbeat is to send empty RPC: - { - let l = eng.leader_handler()?; - let _ = l.leader.progress.update_with(&2, |ent| ent.update_matching(1, Some(log_id(2, 1, 3))).unwrap()); - let _ = l.leader.progress.update_with(&3, |ent| ent.update_matching(1, Some(log_id(2, 1, 3))).unwrap()); + assert_eq!( + vec![ + // + Command::BroadcastHeartbeat { + session_id: ReplicationSessionId::new(Vote::new(3, 1).into_committed(), Some(log_id(2, 1, 3))), + committed: Some(log_id(0, 1, 0)) + }, + ], + eng.output.take_commands() + ); } - eng.output.clear_commands(); - eng.leader_handler()?.send_heartbeat(); - assert_eq!( - vec![ - Command::Replicate { - target: 2, - req: Inflight::logs(Some(log_id(2, 1, 3)), Some(log_id(2, 1, 3))).with_id(1), - }, - Command::Replicate { - target: 3, - req: Inflight::logs(Some(log_id(2, 1, 3)), Some(log_id(2, 1, 3))).with_id(1), - }, - ], - eng.output.take_commands() - ); Ok(()) } diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index 3f1085ff1..dbd1b07b5 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -13,7 +13,6 @@ use crate::progress::Progress; use crate::proposer::Leader; use crate::proposer::LeaderQuorumSet; use crate::raft_state::LogStateReader; -use crate::replication::request_id::RequestId; use crate::replication::response::ReplicationResult; use crate::type_config::alias::InstantOf; use crate::EffectiveMembership; @@ -46,16 +45,6 @@ where C: RaftTypeConfig pub(crate) output: &'x mut EngineOutput, } -/// An option about whether to send an RPC to follower/learner even when there is no data to send. -/// -/// Sending none data serves as a heartbeat. -#[derive(Debug)] -#[derive(PartialEq, Eq)] -pub(crate) enum SendNone { - False, - True, -} - impl<'x, C> ReplicationHandler<'x, C> where C: RaftTypeConfig { @@ -88,7 +77,7 @@ where C: RaftTypeConfig self.rebuild_progresses(); self.rebuild_replication_streams(); - self.initiate_replication(SendNone::False); + self.initiate_replication(); } /// Rebuild leader's replication progress to reflect replication changes. @@ -118,33 +107,6 @@ where C: RaftTypeConfig } } - /// Update replication progress when a successful response is received. - #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn update_success_progress( - &mut self, - target: C::NodeId, - request_id: RequestId, - result: ReplicationResult, - ) { - // No matter what the result is, the validity of the leader is granted by a follower. - self.update_leader_clock(target, result.sending_time); - - let id = request_id.request_id(); - let Some(id) = id else { - tracing::debug!(request_id = display(request_id), "no data for this request, return"); - return; - }; - - match result.result { - Ok(matching) => { - self.update_matching(target, id, matching); - } - Err(conflict) => { - self.update_conflicting(target, id, conflict); - } - } - } - /// Update progress when replicated data(logs or snapshot) matches on follower/learner and is /// accepted. #[tracing::instrument(level = "debug", skip_all)] @@ -272,7 +234,7 @@ where C: RaftTypeConfig pub(crate) fn update_progress( &mut self, target: C::NodeId, - request_id: RequestId, + request_id: u64, repl_res: Result, String>, ) { tracing::debug!( @@ -285,9 +247,14 @@ where C: RaftTypeConfig ); match repl_res { - Ok(p) => { - self.update_success_progress(target, request_id, p); - } + Ok(p) => match p.0 { + Ok(matching) => { + self.update_matching(target, request_id, matching); + } + Err(conflict) => { + self.update_conflicting(target, request_id, conflict); + } + }, Err(err_str) => { tracing::warn!( request_id = display(request_id), @@ -295,21 +262,17 @@ where C: RaftTypeConfig "update progress error" ); - if request_id == RequestId::HeartBeat { - tracing::warn!("heartbeat error: {}, no update to inflight data", err_str); - } else { - // Reset inflight state and it will retry. - let p = self.leader.progress.get_mut(&target).unwrap(); + // Reset inflight state and it will retry. + let p = self.leader.progress.get_mut(&target).unwrap(); - debug_assert!( - p.inflight.is_my_id(request_id), - "inflight({:?}) id should match: {}", - p.inflight, - request_id - ); + debug_assert!( + p.inflight.is_my_id(request_id), + "inflight({:?}) id should match: {}", + p.inflight, + request_id + ); - p.inflight = Inflight::None; - } + p.inflight = Inflight::None; } }; @@ -339,7 +302,7 @@ where C: RaftTypeConfig /// /// `send_none` specifies whether to force to send a message even when there is no data to send. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn initiate_replication(&mut self, send_none: SendNone) { + pub(crate) fn initiate_replication(&mut self) { tracing::debug!(progress = debug(&self.leader.progress), "{}", func_name!()); for (id, prog_entry) in self.leader.progress.iter_mut() { @@ -357,19 +320,7 @@ where C: RaftTypeConfig Self::send_to_target(self.output, id, inflight); } Err(e) => { - tracing::debug!( - "no data to replicate for node-{}: current inflight: {:?}, send_none: {:?}", - id, - e, - send_none - ); - - #[allow(clippy::collapsible_if)] - if e == &Inflight::None { - if send_none == SendNone::True { - Self::send_to_target(self.output, id, e); - } - } + tracing::debug!("no data to replicate for node-{}: current inflight: {:?}", id, e,); } } } diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 69dd28a0d..a64d225a1 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -4,7 +4,6 @@ use std::time::Duration; use crate::core::raft_msg::ResultSender; use crate::engine::handler::leader_handler::LeaderHandler; use crate::engine::handler::replication_handler::ReplicationHandler; -use crate::engine::handler::replication_handler::SendNone; use crate::engine::handler::server_state_handler::ServerStateHandler; use crate::engine::Command; use crate::engine::Condition; @@ -208,7 +207,7 @@ where C: RaftTypeConfig self.leader_handler() .leader_append_entries(vec![C::Entry::new_blank(LogId::::default())]); } else { - self.replication_handler().initiate_replication(SendNone::False); + self.replication_handler().initiate_replication(); } } diff --git a/openraft/src/progress/inflight/mod.rs b/openraft/src/progress/inflight/mod.rs index d09106b7f..f42db7772 100644 --- a/openraft/src/progress/inflight/mod.rs +++ b/openraft/src/progress/inflight/mod.rs @@ -9,7 +9,6 @@ use validit::Validate; use crate::display_ext::DisplayOptionExt; use crate::log_id_range::LogIdRange; -use crate::replication::request_id::RequestId; use crate::LogId; use crate::LogIdOptionExt; use crate::RaftTypeConfig; @@ -115,11 +114,11 @@ where C: RaftTypeConfig } } - pub(crate) fn is_my_id(&self, res_id: RequestId) -> bool { + pub(crate) fn is_my_id(&self, res_id: u64) -> bool { match self { Inflight::None => false, - Inflight::Logs { id, .. } => RequestId::AppendEntries { id: *id } == res_id, - Inflight::Snapshot { id, .. } => RequestId::Snapshot { id: *id } == res_id, + Inflight::Logs { id, .. } => *id == res_id, + Inflight::Snapshot { id, .. } => *id == res_id, } } diff --git a/openraft/src/raft/message/append_entries.rs b/openraft/src/raft/message/append_entries.rs index b43b31a8e..726344644 100644 --- a/openraft/src/raft/message/append_entries.rs +++ b/openraft/src/raft/message/append_entries.rs @@ -7,6 +7,12 @@ use crate::RaftTypeConfig; use crate::Vote; /// An RPC sent by a cluster leader to replicate log entries (§5.3), and as a heartbeat (§5.2). +/// +/// In Openraft a heartbeat [`AppendEntriesRequest`] message could have `prev_log_id=None` and +/// `entries` empty. Which means: to append nothing at the very beginning position on the Follower, +/// which is always valid. Because `prev_log_id` is used to assert `entries` to be consecutive with +/// the previous log entries, and `prev_log_id=None` is the very beginning position and there are no +/// previous log entries. #[derive(Clone)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct AppendEntriesRequest { diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index c9852f1ed..3d8d3ba89 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -51,6 +51,7 @@ use crate::base::BoxFuture; use crate::base::BoxOnce; use crate::config::Config; use crate::config::RuntimeConfig; +use crate::core::heartbeat::handle::HeartbeatWorkersHandle; use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::RaftMsg; use crate::core::replication_lag; @@ -297,6 +298,7 @@ where C: RaftTypeConfig replications: Default::default(), + heartbeat_handle: HeartbeatWorkersHandle::new(id, config.clone()), tx_api: tx_api.clone(), rx_api, diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 87fefb93d..580f90ec5 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -16,8 +16,8 @@ pub(crate) use replication_session_id::ReplicationSessionId; use request::Data; use request::DataWithId; use request::Replicate; +pub(crate) use response::Progress; use response::ReplicationResult; -pub(crate) use response::Response; use tracing_futures::Instrument; use crate::async_runtime::MpscUnboundedReceiver; @@ -265,12 +265,10 @@ where return Err(closed); } ReplicationError::HigherVote(h) => { - let _ = self.tx_raft_core.send(Notification::Network { - response: Response::HigherVote { - target: self.target, - higher: h.higher, - sender_vote: *self.session_id.vote_ref(), - }, + let _ = self.tx_raft_core.send(Notification::HigherVote { + target: self.target, + higher: h.higher, + sender_vote: *self.session_id.vote_ref(), }); return Ok(()); } @@ -461,14 +459,21 @@ where match append_resp { AppendEntriesResponse::Success => { + self.notify_heartbeat_progress(leader_time); + let matching = sending_range.last; - let next = self.finish_success_append(matching, leader_time, log_ids); - Ok(next) + self.notify_progress(log_ids.request_id(), ReplicationResult(Ok(matching))); + + Ok(self.next_action_to_send(matching, log_ids)) } AppendEntriesResponse::PartialSuccess(matching) => { Self::debug_assert_partial_success(&sending_range, &matching); - let next = self.finish_success_append(matching, leader_time, log_ids); - Ok(next) + + self.notify_heartbeat_progress(leader_time); + + self.notify_progress(log_ids.request_id(), ReplicationResult(Ok(matching))); + + Ok(self.next_action_to_send(matching, log_ids)) } AppendEntriesResponse::HigherVote(vote) => { debug_assert!( @@ -489,7 +494,10 @@ where debug_assert!(conflict.is_some(), "prev_log_id=None never conflict"); let conflict = conflict.unwrap(); - self.send_progress(request_id, ReplicationResult::new(leader_time, Err(conflict))); + + // Conflict is also a successful replication RPC, because the leadership is acknowledged. + self.notify_heartbeat_progress(leader_time); + self.notify_progress(request_id, ReplicationResult(Err(conflict))); Ok(None) } @@ -499,8 +507,12 @@ where /// Send the error result to RaftCore. /// RaftCore will then submit another replication command. fn send_progress_error(&mut self, request_id: RequestId, err: RPCError) { - let _ = self.tx_raft_core.send(Notification::Network { - response: Response::Progress { + // If there is no id, it is a heartbeat and do not need to notify RaftCore + let Some(request_id) = request_id.request_id() else { + return; + }; + let _ = self.tx_raft_core.send(Notification::ReplicationProgress { + progress: Progress { target: self.target, request_id, result: Err(err.to_string()), @@ -509,8 +521,22 @@ where }); } - /// Send the success replication result(log matching or conflict) to RaftCore. - fn send_progress(&mut self, request_id: RequestId, replication_result: ReplicationResult) { + /// A successful replication implies a successful heartbeat. + /// This method notify [`RaftCore`] with a heartbeat progress. + /// + /// [`RaftCore`]: crate::core::RaftCore + fn notify_heartbeat_progress(&mut self, sending_time: InstantOf) { + let _ = self.tx_raft_core.send({ + Notification::HeartbeatProgress { + session_id: self.session_id, + target: self.target, + sending_time, + } + }); + } + + /// Notify RaftCore with the success replication result(log matching or conflict). + fn notify_progress(&mut self, request_id: RequestId, replication_result: ReplicationResult) { tracing::debug!( request_id = display(request_id), target = display(self.target), @@ -520,7 +546,7 @@ where func_name!() ); - match replication_result.result { + match replication_result.0 { Ok(matching) => { self.validate_matching(matching); self.matching = matching; @@ -530,9 +556,15 @@ where } } + // If there is no request id, it is a heartbeat RPC, + // no need to notify RaftCore with progress. + let Some(request_id) = request_id.request_id() else { + return; + }; + let _ = self.tx_raft_core.send({ - Notification::Network { - response: Response::Progress { + Notification::ReplicationProgress { + progress: Progress { session_id: self.session_id, request_id, target: self.target, @@ -822,25 +854,18 @@ where })); } - self.send_progress( - request_id, - ReplicationResult::new(start_time, Ok(snapshot_meta.last_log_id)), - ); + self.notify_heartbeat_progress(start_time); + self.notify_progress(request_id, ReplicationResult(Ok(snapshot_meta.last_log_id))); Ok(None) } - /// Update matching and build a return value for a successful append-entries RPC. - /// /// If there are more logs to send, it returns a new `Some(Data::Logs)` to send. - fn finish_success_append( + fn next_action_to_send( &mut self, matching: Option>, - leader_time: InstantOf, log_ids: DataWithId>, ) -> Option> { - self.send_progress(log_ids.request_id(), ReplicationResult::new(leader_time, Ok(matching))); - if matching < log_ids.data().last { Some(Data::new_logs( log_ids.request_id(), diff --git a/openraft/src/replication/replication_session_id.rs b/openraft/src/replication/replication_session_id.rs index 9e2d044b4..24ad0b2d0 100644 --- a/openraft/src/replication/replication_session_id.rs +++ b/openraft/src/replication/replication_session_id.rs @@ -30,11 +30,12 @@ use crate::Vote; /// But the delayed message `{target=c, matched=log_id-1}` may be process by raft core and make raft /// core believe node `c` already has `log_id=1`, and commit it. #[derive(Debug, Clone, Copy)] +#[derive(PartialEq, Eq)] pub(crate) struct ReplicationSessionId where C: RaftTypeConfig { /// The Leader or Candidate this replication belongs to. - pub(crate) vote: CommittedVote, + pub(crate) leader_vote: CommittedVote, /// The log id of the membership log this replication works for. pub(crate) membership_log_id: Option>, @@ -44,7 +45,12 @@ impl Display for ReplicationSessionId where C: RaftTypeConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}/{}", self.vote, self.membership_log_id.display()) + write!( + f, + "(leader_vote:{}, membership_log_id:{})", + self.leader_vote, + self.membership_log_id.display() + ) } } @@ -53,12 +59,12 @@ where C: RaftTypeConfig { pub(crate) fn new(vote: CommittedVote, membership_log_id: Option>) -> Self { Self { - vote, + leader_vote: vote, membership_log_id, } } pub(crate) fn vote_ref(&self) -> &Vote { - &self.vote + &self.leader_vote } } diff --git a/openraft/src/replication/response.rs b/openraft/src/replication/response.rs index 3340f1fb0..fae9ccd65 100644 --- a/openraft/src/replication/response.rs +++ b/openraft/src/replication/response.rs @@ -1,136 +1,74 @@ use std::fmt; -use crate::display_ext::DisplayInstantExt; use crate::display_ext::DisplayOptionExt; use crate::display_ext::DisplayResultExt; -use crate::replication::request_id::RequestId; use crate::replication::ReplicationSessionId; -use crate::type_config::alias::InstantOf; use crate::type_config::alias::LogIdOf; use crate::RaftTypeConfig; -use crate::Vote; /// The response of replication command. +/// +/// Update the `matched` log id of a replication target. +/// Sent by a replication task `ReplicationCore`. #[derive(Debug)] -pub(crate) enum Response +pub(crate) struct Progress where C: RaftTypeConfig { - // /// Logs that are submitted to append has been persisted to disk. - // LogPersisted {}, - /// Update the `matched` log id of a replication target. - /// Sent by a replication task `ReplicationCore`. - Progress { - /// The ID of the target node for which the match index is to be updated. - target: C::NodeId, + /// The ID of the target node for which the match index is to be updated. + pub(crate) target: C::NodeId, - /// The id of the subject that submit this replication action. - request_id: RequestId, + /// The id of the subject that submit this replication action. + pub(crate) request_id: u64, - /// The request by this leader has been successfully handled by the target node, - /// or an error in string. - /// - /// A successful result can still be log matching or log conflicting. - /// In either case, the request is considered accepted, i.e., this leader is still valid to - /// the target node. - /// - /// The result also track the time when this request is sent. - result: Result, String>, - - /// In which session this message is sent. - /// - /// This session id identifies a certain leader(by vote) that is replicating to a certain - /// group of nodes. - /// - /// A message should be discarded if it does not match the present vote and - /// membership_log_id. - session_id: ReplicationSessionId, - }, - - /// ReplicationCore has seen a higher `vote`. - /// Sent by a replication task `ReplicationCore`. - HigherVote { - /// The ID of the target node from which the new term was observed. - target: C::NodeId, - - /// The higher vote observed. - higher: Vote, + /// The request by this leader has been successfully handled by the target node, + /// or an error in string. + /// + /// A successful result can still be log matching or log conflicting. + /// In either case, the request is considered accepted, i.e., this leader is still valid to + /// the target node. + /// + /// The result also track the time when this request is sent. + pub(crate) result: Result, String>, - /// Which state(a Leader or Candidate) sent this message - sender_vote: Vote, - // TODO: need this? - // /// The cluster this replication works for. - // membership_log_id: Option>, - }, + /// In which session this message is sent. + /// + /// This session id identifies a certain leader(by vote) that is replicating to a certain + /// group of nodes. + /// + /// A message should be discarded if it does not match the present vote and + /// membership_log_id. + pub(crate) session_id: ReplicationSessionId, } -impl fmt::Display for Response +impl fmt::Display for Progress where C: RaftTypeConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Progress { - target, - request_id, - result, - session_id, - } => { - write!( - f, - "replication::Progress: target={}, request_id: {}, result: {}, session_id: {}", - target, - request_id, - result.display(), - session_id - ) - } - - Self::HigherVote { - target, - higher, - sender_vote, - } => { - write!( - f, - "replication::Seen a higher vote: target={}, higher: {}, sender_vote: {}", - target, higher, sender_vote - ) - } - } + write!( + f, + "replication::Progress: target={}, request_id: {}, result: {}, session_id: {}", + self.target, + self.request_id, + self.result.display(), + self.session_id + ) } } /// Result of an append-entries replication +/// +/// Ok for matching, Err for conflict. #[derive(Clone, Debug)] -pub(crate) struct ReplicationResult { - /// The timestamp when this request is sent. - /// - /// It is used to update the lease for leader. - pub(crate) sending_time: InstantOf, - - /// Ok for matching, Err for conflict. - pub(crate) result: Result>, LogIdOf>, -} +pub(crate) struct ReplicationResult(pub(crate) Result>, LogIdOf>); impl fmt::Display for ReplicationResult where C: RaftTypeConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{{sending_time:{}, result:", self.sending_time.display())?; - - match &self.result { - Ok(matching) => write!(f, "Match:{}", matching.display())?, - Err(conflict) => write!(f, "Conflict:{}", conflict)?, + match &self.0 { + Ok(matching) => write!(f, "(Match:{})", matching.display()), + Err(conflict) => write!(f, "(Conflict:{})", conflict), } - - write!(f, "}}") - } -} - -impl ReplicationResult -where C: RaftTypeConfig -{ - pub(crate) fn new(sending_time: InstantOf, result: Result>, LogIdOf>) -> Self { - Self { sending_time, result } } } @@ -139,18 +77,17 @@ mod tests { use crate::engine::testing::UTConfig; use crate::replication::response::ReplicationResult; use crate::testing::log_id; - use crate::type_config::TypeConfigExt; #[test] fn test_replication_result_display() { // NOTE that with single-term-leader, log id is `1-3` - let result = ReplicationResult::::new(UTConfig::<()>::now(), Ok(Some(log_id(1, 2, 3)))); - let want = format!(", result:Match:{}}}", log_id(1, 2, 3)); + let result = ReplicationResult::(Ok(Some(log_id(1, 2, 3)))); + let want = format!("(Match:{})", log_id(1, 2, 3)); assert!(result.to_string().ends_with(&want), "{}", result.to_string()); - let result = ReplicationResult::::new(UTConfig::<()>::now(), Err(log_id(1, 2, 3))); - let want = format!(", result:Conflict:{}}}", log_id(1, 2, 3)); + let result = ReplicationResult::(Err(log_id(1, 2, 3))); + let want = format!("(Conflict:{})", log_id(1, 2, 3)); assert!(result.to_string().ends_with(&want), "{}", result.to_string()); } } diff --git a/openraft/src/storage/callback.rs b/openraft/src/storage/callback.rs index 5deca6a25..5b5cbc19f 100644 --- a/openraft/src/storage/callback.rs +++ b/openraft/src/storage/callback.rs @@ -87,7 +87,8 @@ where C: RaftTypeConfig } Notification::HigherVote { .. } | Notification::StorageError { .. } - | Notification::Network { .. } + | Notification::ReplicationProgress { .. } + | Notification::HeartbeatProgress { .. } | Notification::StateMachine { .. } | Notification::Tick { .. } => { unreachable!("Unexpected notification: {}", self.notification) diff --git a/tests/tests/snapshot_streaming/t60_snapshot_chunk_size.rs b/tests/tests/snapshot_streaming/t60_snapshot_chunk_size.rs index 069cb0ddb..4c463b38e 100644 --- a/tests/tests/snapshot_streaming/t60_snapshot_chunk_size.rs +++ b/tests/tests/snapshot_streaming/t60_snapshot_chunk_size.rs @@ -3,16 +3,15 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; -use openraft::LogId; use openraft::ServerState; use openraft::SnapshotPolicy; use crate::fixtures::ut_harness; use crate::fixtures::RaftRouter; -/// Test transfer snapshot in small chnuks +/// Test transfer snapshot in small chunks /// /// What does this test do? /// @@ -29,6 +28,7 @@ async fn snapshot_chunk_size() -> Result<()> { snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold), snapshot_max_chunk_size: 10, enable_heartbeat: false, + max_in_snapshot_log_to_keep: 0, ..Default::default() } .validate()?, @@ -65,22 +65,14 @@ async fn snapshot_chunk_size() -> Result<()> { "send log to trigger snapshot", ) .await?; + router.wait_for_snapshot(&btreeset![0], log_id(1, 0, log_index), timeout(), "snapshot").await?; + router.assert_storage_state(1, log_index, Some(0), log_id(1, 0, log_index), want_snap).await?; + + let n0 = router.get_raft_handle(&0)?; + n0.trigger().purge_log(log_index).await?; router - .wait_for_snapshot( - &btreeset![0], - LogId::new(CommittedLeaderId::new(1, 0), log_index), - None, - "snapshot", - ) - .await?; - router - .assert_storage_state( - 1, - log_index, - Some(0), - LogId::new(CommittedLeaderId::new(1, 0), log_index), - want_snap, - ) + .wait(&0, timeout()) + .purged(Some(log_id(1, 0, 9)), "purge Leader-0 all in snapshot logs") .await?; } @@ -90,24 +82,9 @@ async fn snapshot_chunk_size() -> Result<()> { router.add_learner(0, 1).await.expect("failed to add new node as learner"); log_index += 1; - router.wait_for_log(&btreeset![0, 1], Some(log_index), None, "add learner").await?; - router - .wait_for_snapshot( - &btreeset![1], - LogId::new(CommittedLeaderId::new(1, 0), log_index), - None, - "", - ) - .await?; - - router - .wait_for_snapshot( - &btreeset![0], - LogId::new(CommittedLeaderId::new(1, 0), log_index - 1), - None, - "", - ) - .await?; + router.wait_for_log(&btreeset![0, 1], Some(log_index), timeout(), "add learner").await?; + router.wait(&1, timeout()).applied_index(Some(log_index), "sync all data to learner-1").await?; + router.wait(&1, timeout()).snapshot(log_id(1, 0, log_index - 1), "learner-1 snapshot").await?; // after add_learner, log_index + 1, // leader has only log_index log in snapshot, cause it has compacted before add_learner @@ -120,13 +97,11 @@ async fn snapshot_chunk_size() -> Result<()> { 1, log_index, Some(0), - LogId::new(CommittedLeaderId::new(1, 0), log_index), + log_id(1, 0, log_index), &Some(((log_index - 1).into(), 1)), ) .await?; - // learner has log_index + 1 log in snapshot, cause it do compact after add_learner, - // so learner's snapshot include add_learner log let (mut store, mut sm) = router.get_storage_handle(&1)?; router .assert_storage_state_with_sto( @@ -136,8 +111,8 @@ async fn snapshot_chunk_size() -> Result<()> { 1, log_index, Some(0), - LogId::new(CommittedLeaderId::new(1, 0), log_index), - &Some(((log_index).into(), 1)), + log_id(1, 0, log_index), + &Some(((log_index - 1).into(), 1)), ) .await?; } @@ -146,5 +121,5 @@ async fn snapshot_chunk_size() -> Result<()> { } fn timeout() -> Option { - Some(Duration::from_millis(1000)) + Some(Duration::from_millis(1_000)) }