diff --git a/openraft/src/core/heartbeat/event.rs b/openraft/src/core/heartbeat/event.rs new file mode 100644 index 000000000..931f0810b --- /dev/null +++ b/openraft/src/core/heartbeat/event.rs @@ -0,0 +1,64 @@ +use std::fmt; + +use crate::display_ext::DisplayInstantExt; +use crate::display_ext::DisplayOptionExt; +use crate::replication::ReplicationSessionId; +use crate::type_config::alias::InstantOf; +use crate::LogId; +use crate::RaftTypeConfig; + +/// The information for broadcasting a heartbeat. +#[derive(Debug, Clone, Copy)] +#[derive(PartialEq, Eq)] +pub struct HeartbeatEvent +where C: RaftTypeConfig +{ + /// The timestamp when this heartbeat is sent. + /// + /// The Leader use this sending time to calculate the quorum acknowledge time, but not the + /// receiving timestamp. + pub(crate) time: InstantOf, + + /// The vote of the Leader that submit this heartbeat and the log id of the cluster config. + /// + /// The response that matches this session id is considered as a valid response. + /// Otherwise, it is considered as an outdated response from older leader or older cluster + /// membership config and will be ignored. + pub(crate) session_id: ReplicationSessionId, + + /// The last known committed log id of the Leader. + /// + /// When there are no new logs to replicate, the Leader sends a heartbeat to replicate committed + /// log id to followers to update their committed log id. + pub(crate) committed: Option>, +} + +impl HeartbeatEvent +where C: RaftTypeConfig +{ + pub(crate) fn new( + time: InstantOf, + session_id: ReplicationSessionId, + committed: Option>, + ) -> Self { + Self { + time, + session_id, + committed, + } + } +} + +impl fmt::Display for HeartbeatEvent +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "(time={}, leader_vote: {}, committed: {})", + self.time.display(), + self.session_id, + self.committed.display() + ) + } +} diff --git a/openraft/src/core/heartbeat/handle.rs b/openraft/src/core/heartbeat/handle.rs new file mode 100644 index 000000000..42065ba0e --- /dev/null +++ b/openraft/src/core/heartbeat/handle.rs @@ -0,0 +1,97 @@ +use std::collections::BTreeMap; +use std::sync::Arc; + +use tracing::Instrument; +use tracing::Level; +use tracing::Span; + +use crate::async_runtime::watch::WatchSender; +use crate::core::heartbeat::event::HeartbeatEvent; +use crate::core::heartbeat::worker::HeartbeatWorker; +use crate::core::notification::Notification; +use crate::type_config::alias::JoinHandleOf; +use crate::type_config::alias::MpscUnboundedSenderOf; +use crate::type_config::alias::OneshotSenderOf; +use crate::type_config::alias::WatchReceiverOf; +use crate::type_config::alias::WatchSenderOf; +use crate::type_config::TypeConfigExt; +use crate::Config; +use crate::RaftNetworkFactory; +use crate::RaftTypeConfig; + +pub(crate) struct HeartbeatWorkersHandle +where C: RaftTypeConfig +{ + pub(crate) id: C::NodeId, + + pub(crate) config: Arc, + + /// Inform the heartbeat task to broadcast heartbeat message. + /// + /// A Leader will periodically update this value to trigger sending heartbeat messages. + pub(crate) tx: WatchSenderOf>>, + + /// The receiving end of heartbeat command. + /// + /// A separate task will have a clone of this receiver to receive and execute heartbeat command. + pub(crate) rx: WatchReceiverOf>>, + + pub(crate) workers: BTreeMap, JoinHandleOf)>, +} + +impl HeartbeatWorkersHandle +where C: RaftTypeConfig +{ + pub(crate) fn new(id: C::NodeId, config: Arc) -> Self { + let (tx, rx) = C::watch_channel(None); + + Self { + id, + config, + tx, + rx, + workers: Default::default(), + } + } + + pub(crate) fn broadcast(&self, event: HeartbeatEvent) { + tracing::debug!("id={} send_heartbeat {}", self.id, event); + let _ = self.tx.send(Some(event)); + } + + pub(crate) async fn spawn_workers( + &mut self, + network_factory: &mut NF, + tx_notification: &MpscUnboundedSenderOf>, + targets: impl IntoIterator, + ) where + NF: RaftNetworkFactory, + { + for (target, node) in targets { + tracing::debug!("id={} spawn HeartbeatWorker target={}", self.id, target); + let network = network_factory.new_client(target, &node).await; + + let worker = HeartbeatWorker { + id: self.id, + rx: self.rx.clone(), + network, + target, + node, + config: self.config.clone(), + tx_notification: tx_notification.clone(), + }; + + let span = tracing::span!(parent: &Span::current(), Level::DEBUG, "heartbeat", id=display(self.id), target=display(target)); + + let (tx_shutdown, rx_shutdown) = C::oneshot(); + + let worker_handle = C::spawn(worker.run(rx_shutdown).instrument(span)); + self.workers.insert(target, (tx_shutdown, worker_handle)); + } + } + + pub(crate) fn shutdown(&mut self) { + self.workers.clear(); + tracing::info!("id={} HeartbeatWorker are shutdown", self.id); + } +} diff --git a/openraft/src/core/heartbeat/mod.rs b/openraft/src/core/heartbeat/mod.rs new file mode 100644 index 000000000..ddc964b27 --- /dev/null +++ b/openraft/src/core/heartbeat/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod event; +pub(crate) mod handle; +pub(crate) mod worker; diff --git a/openraft/src/core/heartbeat/worker.rs b/openraft/src/core/heartbeat/worker.rs new file mode 100644 index 000000000..8ff245f05 --- /dev/null +++ b/openraft/src/core/heartbeat/worker.rs @@ -0,0 +1,114 @@ +use std::fmt; +use std::ops::Deref; +use std::sync::Arc; +use std::time::Duration; + +use futures::FutureExt; + +use crate::async_runtime::watch::WatchReceiver; +use crate::async_runtime::MpscUnboundedSender; +use crate::core::heartbeat::event::HeartbeatEvent; +use crate::core::notification::Notification; +use crate::network::v2::RaftNetworkV2; +use crate::network::RPCOption; +use crate::raft::AppendEntriesRequest; +use crate::type_config::alias::MpscUnboundedSenderOf; +use crate::type_config::alias::OneshotReceiverOf; +use crate::type_config::alias::WatchReceiverOf; +use crate::type_config::TypeConfigExt; +use crate::Config; +use crate::RaftTypeConfig; + +/// A dedicate worker sending heartbeat to a specific follower. +pub struct HeartbeatWorker +where + C: RaftTypeConfig, + N: RaftNetworkV2, +{ + pub(crate) id: C::NodeId, + + /// The receiver will be changed when a new heartbeat is needed to be sent. + pub(crate) rx: WatchReceiverOf>>, + + pub(crate) network: N, + + pub(crate) target: C::NodeId, + + #[allow(dead_code)] + pub(crate) node: C::Node, + + pub(crate) config: Arc, + + /// For sending back result to the [`RaftCore`]. + /// + /// [`RaftCore`]: crate::core::RaftCore + pub(crate) tx_notification: MpscUnboundedSenderOf>, +} + +impl fmt::Display for HeartbeatWorker +where + C: RaftTypeConfig, + N: RaftNetworkV2, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "HeartbeatWorker(id={}, target={})", self.id, self.target) + } +} + +impl HeartbeatWorker +where + C: RaftTypeConfig, + N: RaftNetworkV2, +{ + pub(crate) async fn run(mut self, mut rx_shutdown: OneshotReceiverOf) { + loop { + tracing::debug!("{} is waiting for a new heartbeat event.", self); + + futures::select! { + _ = (&mut rx_shutdown).fuse() => { + tracing::info!("{} is shutdown.", self); + return; + }, + _ = self.rx.changed().fuse() => {}, + } + + let heartbeat: Option> = *self.rx.borrow_watched(); + + // None is the initial value of the WatchReceiver, ignore it. + let Some(heartbeat) = heartbeat else { + continue; + }; + + let timeout = Duration::from_millis(self.config.heartbeat_interval); + let option = RPCOption::new(timeout); + + let payload = AppendEntriesRequest { + vote: *heartbeat.session_id.leader_vote.deref(), + prev_log_id: None, + leader_commit: heartbeat.committed, + entries: vec![], + }; + + let res = C::timeout(timeout, self.network.append_entries(payload, option)).await; + tracing::debug!("{} sent a heartbeat: {}, result: {:?}", self, heartbeat, res); + + match res { + Ok(Ok(_)) => { + let res = self.tx_notification.send(Notification::HeartbeatProgress { + session_id: heartbeat.session_id, + sending_time: heartbeat.time, + target: self.target, + }); + + if res.is_err() { + tracing::error!("{} failed to send a heartbeat progress to RaftCore. quit", self); + return; + } + } + _ => { + tracing::warn!("{} failed to send a heartbeat: {:?}", self, res); + } + } + } + } +} diff --git a/openraft/src/core/mod.rs b/openraft/src/core/mod.rs index 1a7a4850c..b7bba56aa 100644 --- a/openraft/src/core/mod.rs +++ b/openraft/src/core/mod.rs @@ -5,6 +5,7 @@ //! storage or forward messages to other raft nodes. pub(crate) mod balancer; +pub(crate) mod heartbeat; pub(crate) mod notification; mod raft_core; pub(crate) mod raft_msg; diff --git a/openraft/src/core/notification.rs b/openraft/src/core/notification.rs index f5cdb4d29..36dff9a63 100644 --- a/openraft/src/core/notification.rs +++ b/openraft/src/core/notification.rs @@ -1,9 +1,12 @@ use std::fmt; use crate::core::sm; +use crate::display_ext::DisplayInstantExt; use crate::raft::VoteResponse; use crate::raft_state::IOId; use crate::replication; +use crate::replication::ReplicationSessionId; +use crate::type_config::alias::InstantOf; use crate::RaftTypeConfig; use crate::StorageError; use crate::Vote; @@ -47,7 +50,13 @@ where C: RaftTypeConfig LocalIO { io_id: IOId }, /// Result of executing a command sent from network worker. - Network { response: replication::Response }, + ReplicationProgress { progress: replication::Progress }, + + HeartbeatProgress { + session_id: ReplicationSessionId, + sending_time: InstantOf, + target: C::NodeId, + }, /// Result of executing a command sent from state machine worker. StateMachine { command_result: sm::CommandResult }, @@ -96,8 +105,21 @@ where C: RaftTypeConfig } Self::StorageError { error } => write!(f, "StorageError: {}", error), Self::LocalIO { io_id } => write!(f, "IOFlushed: {}", io_id), - Self::Network { response } => { - write!(f, "{}", response) + Self::ReplicationProgress { progress } => { + write!(f, "{}", progress) + } + Self::HeartbeatProgress { + session_id: leader_vote, + sending_time, + target, + } => { + write!( + f, + "HeartbeatProgress: target={}, leader_vote: {}, sending_time: {}", + target, + leader_vote, + sending_time.display(), + ) } Self::StateMachine { command_result } => { write!(f, "{}", command_result) diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 48af51fc0..a6ee487ab 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -24,6 +24,8 @@ use crate::async_runtime::TryRecvError; use crate::config::Config; use crate::config::RuntimeConfig; use crate::core::balancer::Balancer; +use crate::core::heartbeat::event::HeartbeatEvent; +use crate::core::heartbeat::handle::HeartbeatWorkersHandle; use crate::core::notification::Notification; use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::AppendEntriesTx; @@ -38,7 +40,6 @@ use crate::display_ext::DisplayOptionExt; use crate::display_ext::DisplayResultExt; use crate::display_ext::DisplaySlice; use crate::display_ext::DisplaySliceExt; -use crate::engine::handler::replication_handler::SendNone; use crate::engine::Command; use crate::engine::Condition; use crate::engine::Engine; @@ -79,7 +80,6 @@ use crate::raft::VoteRequest; use crate::raft::VoteResponse; use crate::raft_state::io_state::io_id::IOId; use crate::raft_state::LogStateReader; -use crate::replication; use crate::replication::request::Replicate; use crate::replication::request_id::RequestId; use crate::replication::response::ReplicationResult; @@ -197,6 +197,8 @@ where /// A mapping of node IDs the replication state of the target node. pub(crate) replications: BTreeMap>, + pub(crate) heartbeat_handle: HeartbeatWorkersHandle, + #[allow(dead_code)] pub(crate) tx_api: MpscUnboundedSenderOf>, pub(crate) rx_api: MpscUnboundedReceiverOf>, @@ -841,6 +843,8 @@ where pub async fn remove_all_replication(&mut self) { tracing::info!("remove all replication"); + self.heartbeat_handle.shutdown(); + let nodes = std::mem::take(&mut self.replications); tracing::debug!( @@ -974,7 +978,7 @@ where // Keep replicating to a target if the replication stream to it is idle. if let Ok(mut lh) = self.engine.leader_handler() { - lh.replication_handler().initiate_replication(SendNone::False); + lh.replication_handler().initiate_replication(); } self.run_engine_commands().await?; } @@ -1418,39 +1422,28 @@ where } } - Notification::Network { response } => { - // - match response { - replication::Response::Progress { - target, - request_id: id, - result, - session_id, - } => { - // If vote or membership changes, ignore the message. - // There is chance delayed message reports a wrong state. - if self.does_replication_session_match(&session_id, "UpdateReplicationMatched") { - self.handle_replication_progress(target, id, result); - } - } - - replication::Response::HigherVote { - target, - higher, - sender_vote, - } => { - tracing::info!( - target = display(target), - higher_vote = display(&higher), - sender_vote = display(&sender_vote), - "received Notification::HigherVote: {}", - func_name!() - ); + Notification::ReplicationProgress { progress } => { + // If vote or membership changes, ignore the message. + // There is chance delayed message reports a wrong state. + if self.does_replication_session_match(&progress.session_id, "ReplicationProgress") { + self.handle_replication_progress(progress.target, progress.request_id, progress.result); + } + } - if self.does_vote_match(&sender_vote, "HigherVote") { - // Rejected vote change is ok. - let _ = self.engine.vote_handler().update_vote(&higher); - } + Notification::HeartbeatProgress { + session_id, + sending_time, + target, + } => { + if self.does_replication_session_match(&session_id, "HeartbeatProgress") { + tracing::debug!( + session_id = display(session_id), + target = display(target), + sending_time = display(sending_time.display()), + "HeartbeatProgress" + ); + if self.engine.leader.is_some() { + self.engine.replication_handler().update_leader_clock(target, sending_time); } } } @@ -1562,7 +1555,7 @@ where fn handle_replication_progress( &mut self, target: C::NodeId, - request_id: RequestId, + request_id: u64, result: Result, String>, ) { tracing::debug!( @@ -1782,6 +1775,9 @@ where let _ = node.tx_repl.send(Replicate::Committed(committed)); } } + Command::BroadcastHeartbeat { session_id, committed } => { + self.heartbeat_handle.broadcast(HeartbeatEvent::new(C::now(), session_id, committed)) + } Command::SaveCommitted { committed } => { self.log_store.save_committed(Some(committed)).await?; } @@ -1819,6 +1815,15 @@ where let handle = self.spawn_replication_stream(*target, *matching).await; self.replications.insert(*target, handle); } + + let effective = self.engine.state.membership_state.effective().clone(); + + let nodes = targets.into_iter().map(|p| { + let node_id = p.0; + (node_id, effective.get_node(&node_id).unwrap().clone()) + }); + + self.heartbeat_handle.spawn_workers(&mut self.network_factory, &self.tx_notification, nodes).await; } Command::StateMachine { command } => { let io_id = command.get_submit_io(); diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 651e62e9c..fd962573f 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -19,6 +19,7 @@ use crate::raft::SnapshotResponse; use crate::raft::VoteRequest; use crate::raft::VoteResponse; use crate::raft_state::IOId; +use crate::replication::ReplicationSessionId; use crate::type_config::alias::OneshotSenderOf; use crate::vote::CommittedVote; use crate::LogId; @@ -66,6 +67,12 @@ where C: RaftTypeConfig /// Replicate the committed log id to other nodes ReplicateCommitted { committed: Option> }, + /// Broadcast heartbeat to all other nodes. + BroadcastHeartbeat { + session_id: ReplicationSessionId, + committed: Option>, + }, + /// Save the committed log id to [`RaftLogStorage`]. /// /// Upon startup, the saved committed log ids will be re-applied to state machine to restore the @@ -147,6 +154,14 @@ where C: RaftTypeConfig Command::ReplicateCommitted { committed } => { write!(f, "ReplicateCommitted: {}", committed.display()) } + Command::BroadcastHeartbeat { session_id, committed } => { + write!( + f, + "BroadcastHeartbeat: session_id:{}, committed:{}", + session_id, + committed.display() + ) + } Command::SaveCommitted { committed } => write!(f, "SaveCommitted: {}", committed), Command::Apply { already_committed, @@ -188,7 +203,8 @@ where match (self, other) { (Command::UpdateIOProgress { when, io_id }, Command::UpdateIOProgress { when: wb, io_id: ab }, ) => when == wb && io_id == ab, (Command::AppendInputEntries { committed_vote: vote, entries }, Command::AppendInputEntries { committed_vote: vb, entries: b }, ) => vote == vb && entries == b, - (Command::ReplicateCommitted { committed }, Command::ReplicateCommitted { committed: b }, ) => committed == b, + (Command::ReplicateCommitted { committed }, Command::ReplicateCommitted { committed: b }, ) => committed == b, + (Command::BroadcastHeartbeat { session_id, committed }, Command::BroadcastHeartbeat { session_id: sb, committed: b }, ) => session_id == sb && committed == b, (Command::SaveCommitted { committed }, Command::SaveCommitted { committed: b }) => committed == b, (Command::Apply { already_committed, upto, }, Command::Apply { already_committed: b_committed, upto: b_upto, }, ) => already_committed == b_committed && upto == b_upto, (Command::Replicate { target, req }, Command::Replicate { target: b_target, req: other_req, }, ) => target == b_target && req == other_req, @@ -226,6 +242,7 @@ where C: RaftTypeConfig Command::PurgeLog { .. } => CommandKind::Log, Command::ReplicateCommitted { .. } => CommandKind::Network, + Command::BroadcastHeartbeat { .. } => CommandKind::Network, Command::Replicate { .. } => CommandKind::Network, Command::BroadcastTransferLeader { .. } => CommandKind::Network, Command::SendVote { .. } => CommandKind::Network, @@ -251,6 +268,7 @@ where C: RaftTypeConfig Command::PurgeLog { upto } => Some(Condition::Snapshot { log_id: Some(*upto) }), Command::ReplicateCommitted { .. } => None, + Command::BroadcastHeartbeat { .. } => None, Command::Replicate { .. } => None, Command::BroadcastTransferLeader { .. } => None, Command::SendVote { .. } => None, diff --git a/openraft/src/engine/handler/leader_handler/mod.rs b/openraft/src/engine/handler/leader_handler/mod.rs index 5f8a61042..368f09ecc 100644 --- a/openraft/src/engine/handler/leader_handler/mod.rs +++ b/openraft/src/engine/handler/leader_handler/mod.rs @@ -1,5 +1,4 @@ use crate::engine::handler::replication_handler::ReplicationHandler; -use crate::engine::handler::replication_handler::SendNone; use crate::engine::Command; use crate::engine::EngineConfig; use crate::engine::EngineOutput; @@ -9,6 +8,7 @@ use crate::proposer::LeaderQuorumSet; use crate::raft::message::TransferLeaderRequest; use crate::raft_state::IOId; use crate::raft_state::LogStateReader; +use crate::replication::ReplicationSessionId; use crate::type_config::alias::LogIdOf; use crate::RaftLogId; use crate::RaftState; @@ -92,13 +92,18 @@ where C: RaftTypeConfig rh.append_membership(&log_id, &m); } - rh.initiate_replication(SendNone::False); + rh.initiate_replication(); } #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn send_heartbeat(&mut self) -> () { - let mut rh = self.replication_handler(); - rh.initiate_replication(SendNone::True); + pub(crate) fn send_heartbeat(&mut self) { + let membership_log_id = self.state.membership_state.effective().log_id(); + let session_id = ReplicationSessionId::new(self.leader.committed_vote, *membership_log_id); + + self.output.push_command(Command::BroadcastHeartbeat { + session_id, + committed: self.state.committed().copied(), + }); } /// Get the log id for a linearizable read. diff --git a/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs b/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs index 55fe77ab3..e05f6c550 100644 --- a/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs +++ b/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs @@ -2,18 +2,12 @@ use std::sync::Arc; use std::time::Duration; use maplit::btreeset; -#[allow(unused_imports)] use pretty_assertions::assert_eq; -#[allow(unused_imports)] -use pretty_assertions::assert_ne; -#[allow(unused_imports)] -use pretty_assertions::assert_str_eq; use crate::engine::testing::UTConfig; use crate::engine::Command; use crate::engine::Engine; -use crate::progress::Inflight; -use crate::progress::Progress; +use crate::replication::ReplicationSessionId; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; @@ -63,47 +57,31 @@ fn test_leader_send_heartbeat() -> anyhow::Result<()> { eng.leader_handler()?.send_heartbeat(); assert_eq!( vec![ - Command::Replicate { - target: 2, - req: Inflight::logs(None, Some(log_id(2, 1, 3))).with_id(1), - }, - Command::Replicate { - target: 3, - req: Inflight::logs(None, Some(log_id(2, 1, 3))).with_id(1), + // + Command::BroadcastHeartbeat { + session_id: ReplicationSessionId::new(Vote::new(3, 1).into_committed(), Some(log_id(2, 1, 3))), + committed: Some(log_id(0, 1, 0)) }, ], eng.output.take_commands() ); } - // No RPC will be sent if there are inflight RPC + // Heartbeat will be resent { eng.output.clear_commands(); eng.leader_handler()?.send_heartbeat(); - assert!(eng.output.take_commands().is_empty()); - } - - // No data to send, sending a heartbeat is to send empty RPC: - { - let l = eng.leader_handler()?; - let _ = l.leader.progress.update_with(&2, |ent| ent.update_matching(1, Some(log_id(2, 1, 3))).unwrap()); - let _ = l.leader.progress.update_with(&3, |ent| ent.update_matching(1, Some(log_id(2, 1, 3))).unwrap()); + assert_eq!( + vec![ + // + Command::BroadcastHeartbeat { + session_id: ReplicationSessionId::new(Vote::new(3, 1).into_committed(), Some(log_id(2, 1, 3))), + committed: Some(log_id(0, 1, 0)) + }, + ], + eng.output.take_commands() + ); } - eng.output.clear_commands(); - eng.leader_handler()?.send_heartbeat(); - assert_eq!( - vec![ - Command::Replicate { - target: 2, - req: Inflight::logs(Some(log_id(2, 1, 3)), Some(log_id(2, 1, 3))).with_id(1), - }, - Command::Replicate { - target: 3, - req: Inflight::logs(Some(log_id(2, 1, 3)), Some(log_id(2, 1, 3))).with_id(1), - }, - ], - eng.output.take_commands() - ); Ok(()) } diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index 3f1085ff1..dbd1b07b5 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -13,7 +13,6 @@ use crate::progress::Progress; use crate::proposer::Leader; use crate::proposer::LeaderQuorumSet; use crate::raft_state::LogStateReader; -use crate::replication::request_id::RequestId; use crate::replication::response::ReplicationResult; use crate::type_config::alias::InstantOf; use crate::EffectiveMembership; @@ -46,16 +45,6 @@ where C: RaftTypeConfig pub(crate) output: &'x mut EngineOutput, } -/// An option about whether to send an RPC to follower/learner even when there is no data to send. -/// -/// Sending none data serves as a heartbeat. -#[derive(Debug)] -#[derive(PartialEq, Eq)] -pub(crate) enum SendNone { - False, - True, -} - impl<'x, C> ReplicationHandler<'x, C> where C: RaftTypeConfig { @@ -88,7 +77,7 @@ where C: RaftTypeConfig self.rebuild_progresses(); self.rebuild_replication_streams(); - self.initiate_replication(SendNone::False); + self.initiate_replication(); } /// Rebuild leader's replication progress to reflect replication changes. @@ -118,33 +107,6 @@ where C: RaftTypeConfig } } - /// Update replication progress when a successful response is received. - #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn update_success_progress( - &mut self, - target: C::NodeId, - request_id: RequestId, - result: ReplicationResult, - ) { - // No matter what the result is, the validity of the leader is granted by a follower. - self.update_leader_clock(target, result.sending_time); - - let id = request_id.request_id(); - let Some(id) = id else { - tracing::debug!(request_id = display(request_id), "no data for this request, return"); - return; - }; - - match result.result { - Ok(matching) => { - self.update_matching(target, id, matching); - } - Err(conflict) => { - self.update_conflicting(target, id, conflict); - } - } - } - /// Update progress when replicated data(logs or snapshot) matches on follower/learner and is /// accepted. #[tracing::instrument(level = "debug", skip_all)] @@ -272,7 +234,7 @@ where C: RaftTypeConfig pub(crate) fn update_progress( &mut self, target: C::NodeId, - request_id: RequestId, + request_id: u64, repl_res: Result, String>, ) { tracing::debug!( @@ -285,9 +247,14 @@ where C: RaftTypeConfig ); match repl_res { - Ok(p) => { - self.update_success_progress(target, request_id, p); - } + Ok(p) => match p.0 { + Ok(matching) => { + self.update_matching(target, request_id, matching); + } + Err(conflict) => { + self.update_conflicting(target, request_id, conflict); + } + }, Err(err_str) => { tracing::warn!( request_id = display(request_id), @@ -295,21 +262,17 @@ where C: RaftTypeConfig "update progress error" ); - if request_id == RequestId::HeartBeat { - tracing::warn!("heartbeat error: {}, no update to inflight data", err_str); - } else { - // Reset inflight state and it will retry. - let p = self.leader.progress.get_mut(&target).unwrap(); + // Reset inflight state and it will retry. + let p = self.leader.progress.get_mut(&target).unwrap(); - debug_assert!( - p.inflight.is_my_id(request_id), - "inflight({:?}) id should match: {}", - p.inflight, - request_id - ); + debug_assert!( + p.inflight.is_my_id(request_id), + "inflight({:?}) id should match: {}", + p.inflight, + request_id + ); - p.inflight = Inflight::None; - } + p.inflight = Inflight::None; } }; @@ -339,7 +302,7 @@ where C: RaftTypeConfig /// /// `send_none` specifies whether to force to send a message even when there is no data to send. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn initiate_replication(&mut self, send_none: SendNone) { + pub(crate) fn initiate_replication(&mut self) { tracing::debug!(progress = debug(&self.leader.progress), "{}", func_name!()); for (id, prog_entry) in self.leader.progress.iter_mut() { @@ -357,19 +320,7 @@ where C: RaftTypeConfig Self::send_to_target(self.output, id, inflight); } Err(e) => { - tracing::debug!( - "no data to replicate for node-{}: current inflight: {:?}, send_none: {:?}", - id, - e, - send_none - ); - - #[allow(clippy::collapsible_if)] - if e == &Inflight::None { - if send_none == SendNone::True { - Self::send_to_target(self.output, id, e); - } - } + tracing::debug!("no data to replicate for node-{}: current inflight: {:?}", id, e,); } } } diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 69dd28a0d..a64d225a1 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -4,7 +4,6 @@ use std::time::Duration; use crate::core::raft_msg::ResultSender; use crate::engine::handler::leader_handler::LeaderHandler; use crate::engine::handler::replication_handler::ReplicationHandler; -use crate::engine::handler::replication_handler::SendNone; use crate::engine::handler::server_state_handler::ServerStateHandler; use crate::engine::Command; use crate::engine::Condition; @@ -208,7 +207,7 @@ where C: RaftTypeConfig self.leader_handler() .leader_append_entries(vec![C::Entry::new_blank(LogId::::default())]); } else { - self.replication_handler().initiate_replication(SendNone::False); + self.replication_handler().initiate_replication(); } } diff --git a/openraft/src/progress/inflight/mod.rs b/openraft/src/progress/inflight/mod.rs index d09106b7f..f42db7772 100644 --- a/openraft/src/progress/inflight/mod.rs +++ b/openraft/src/progress/inflight/mod.rs @@ -9,7 +9,6 @@ use validit::Validate; use crate::display_ext::DisplayOptionExt; use crate::log_id_range::LogIdRange; -use crate::replication::request_id::RequestId; use crate::LogId; use crate::LogIdOptionExt; use crate::RaftTypeConfig; @@ -115,11 +114,11 @@ where C: RaftTypeConfig } } - pub(crate) fn is_my_id(&self, res_id: RequestId) -> bool { + pub(crate) fn is_my_id(&self, res_id: u64) -> bool { match self { Inflight::None => false, - Inflight::Logs { id, .. } => RequestId::AppendEntries { id: *id } == res_id, - Inflight::Snapshot { id, .. } => RequestId::Snapshot { id: *id } == res_id, + Inflight::Logs { id, .. } => *id == res_id, + Inflight::Snapshot { id, .. } => *id == res_id, } } diff --git a/openraft/src/raft/message/append_entries.rs b/openraft/src/raft/message/append_entries.rs index b43b31a8e..726344644 100644 --- a/openraft/src/raft/message/append_entries.rs +++ b/openraft/src/raft/message/append_entries.rs @@ -7,6 +7,12 @@ use crate::RaftTypeConfig; use crate::Vote; /// An RPC sent by a cluster leader to replicate log entries (§5.3), and as a heartbeat (§5.2). +/// +/// In Openraft a heartbeat [`AppendEntriesRequest`] message could have `prev_log_id=None` and +/// `entries` empty. Which means: to append nothing at the very beginning position on the Follower, +/// which is always valid. Because `prev_log_id` is used to assert `entries` to be consecutive with +/// the previous log entries, and `prev_log_id=None` is the very beginning position and there are no +/// previous log entries. #[derive(Clone)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct AppendEntriesRequest { diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index c9852f1ed..3d8d3ba89 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -51,6 +51,7 @@ use crate::base::BoxFuture; use crate::base::BoxOnce; use crate::config::Config; use crate::config::RuntimeConfig; +use crate::core::heartbeat::handle::HeartbeatWorkersHandle; use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::RaftMsg; use crate::core::replication_lag; @@ -297,6 +298,7 @@ where C: RaftTypeConfig replications: Default::default(), + heartbeat_handle: HeartbeatWorkersHandle::new(id, config.clone()), tx_api: tx_api.clone(), rx_api, diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 87fefb93d..580f90ec5 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -16,8 +16,8 @@ pub(crate) use replication_session_id::ReplicationSessionId; use request::Data; use request::DataWithId; use request::Replicate; +pub(crate) use response::Progress; use response::ReplicationResult; -pub(crate) use response::Response; use tracing_futures::Instrument; use crate::async_runtime::MpscUnboundedReceiver; @@ -265,12 +265,10 @@ where return Err(closed); } ReplicationError::HigherVote(h) => { - let _ = self.tx_raft_core.send(Notification::Network { - response: Response::HigherVote { - target: self.target, - higher: h.higher, - sender_vote: *self.session_id.vote_ref(), - }, + let _ = self.tx_raft_core.send(Notification::HigherVote { + target: self.target, + higher: h.higher, + sender_vote: *self.session_id.vote_ref(), }); return Ok(()); } @@ -461,14 +459,21 @@ where match append_resp { AppendEntriesResponse::Success => { + self.notify_heartbeat_progress(leader_time); + let matching = sending_range.last; - let next = self.finish_success_append(matching, leader_time, log_ids); - Ok(next) + self.notify_progress(log_ids.request_id(), ReplicationResult(Ok(matching))); + + Ok(self.next_action_to_send(matching, log_ids)) } AppendEntriesResponse::PartialSuccess(matching) => { Self::debug_assert_partial_success(&sending_range, &matching); - let next = self.finish_success_append(matching, leader_time, log_ids); - Ok(next) + + self.notify_heartbeat_progress(leader_time); + + self.notify_progress(log_ids.request_id(), ReplicationResult(Ok(matching))); + + Ok(self.next_action_to_send(matching, log_ids)) } AppendEntriesResponse::HigherVote(vote) => { debug_assert!( @@ -489,7 +494,10 @@ where debug_assert!(conflict.is_some(), "prev_log_id=None never conflict"); let conflict = conflict.unwrap(); - self.send_progress(request_id, ReplicationResult::new(leader_time, Err(conflict))); + + // Conflict is also a successful replication RPC, because the leadership is acknowledged. + self.notify_heartbeat_progress(leader_time); + self.notify_progress(request_id, ReplicationResult(Err(conflict))); Ok(None) } @@ -499,8 +507,12 @@ where /// Send the error result to RaftCore. /// RaftCore will then submit another replication command. fn send_progress_error(&mut self, request_id: RequestId, err: RPCError) { - let _ = self.tx_raft_core.send(Notification::Network { - response: Response::Progress { + // If there is no id, it is a heartbeat and do not need to notify RaftCore + let Some(request_id) = request_id.request_id() else { + return; + }; + let _ = self.tx_raft_core.send(Notification::ReplicationProgress { + progress: Progress { target: self.target, request_id, result: Err(err.to_string()), @@ -509,8 +521,22 @@ where }); } - /// Send the success replication result(log matching or conflict) to RaftCore. - fn send_progress(&mut self, request_id: RequestId, replication_result: ReplicationResult) { + /// A successful replication implies a successful heartbeat. + /// This method notify [`RaftCore`] with a heartbeat progress. + /// + /// [`RaftCore`]: crate::core::RaftCore + fn notify_heartbeat_progress(&mut self, sending_time: InstantOf) { + let _ = self.tx_raft_core.send({ + Notification::HeartbeatProgress { + session_id: self.session_id, + target: self.target, + sending_time, + } + }); + } + + /// Notify RaftCore with the success replication result(log matching or conflict). + fn notify_progress(&mut self, request_id: RequestId, replication_result: ReplicationResult) { tracing::debug!( request_id = display(request_id), target = display(self.target), @@ -520,7 +546,7 @@ where func_name!() ); - match replication_result.result { + match replication_result.0 { Ok(matching) => { self.validate_matching(matching); self.matching = matching; @@ -530,9 +556,15 @@ where } } + // If there is no request id, it is a heartbeat RPC, + // no need to notify RaftCore with progress. + let Some(request_id) = request_id.request_id() else { + return; + }; + let _ = self.tx_raft_core.send({ - Notification::Network { - response: Response::Progress { + Notification::ReplicationProgress { + progress: Progress { session_id: self.session_id, request_id, target: self.target, @@ -822,25 +854,18 @@ where })); } - self.send_progress( - request_id, - ReplicationResult::new(start_time, Ok(snapshot_meta.last_log_id)), - ); + self.notify_heartbeat_progress(start_time); + self.notify_progress(request_id, ReplicationResult(Ok(snapshot_meta.last_log_id))); Ok(None) } - /// Update matching and build a return value for a successful append-entries RPC. - /// /// If there are more logs to send, it returns a new `Some(Data::Logs)` to send. - fn finish_success_append( + fn next_action_to_send( &mut self, matching: Option>, - leader_time: InstantOf, log_ids: DataWithId>, ) -> Option> { - self.send_progress(log_ids.request_id(), ReplicationResult::new(leader_time, Ok(matching))); - if matching < log_ids.data().last { Some(Data::new_logs( log_ids.request_id(), diff --git a/openraft/src/replication/replication_session_id.rs b/openraft/src/replication/replication_session_id.rs index 9e2d044b4..24ad0b2d0 100644 --- a/openraft/src/replication/replication_session_id.rs +++ b/openraft/src/replication/replication_session_id.rs @@ -30,11 +30,12 @@ use crate::Vote; /// But the delayed message `{target=c, matched=log_id-1}` may be process by raft core and make raft /// core believe node `c` already has `log_id=1`, and commit it. #[derive(Debug, Clone, Copy)] +#[derive(PartialEq, Eq)] pub(crate) struct ReplicationSessionId where C: RaftTypeConfig { /// The Leader or Candidate this replication belongs to. - pub(crate) vote: CommittedVote, + pub(crate) leader_vote: CommittedVote, /// The log id of the membership log this replication works for. pub(crate) membership_log_id: Option>, @@ -44,7 +45,12 @@ impl Display for ReplicationSessionId where C: RaftTypeConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}/{}", self.vote, self.membership_log_id.display()) + write!( + f, + "(leader_vote:{}, membership_log_id:{})", + self.leader_vote, + self.membership_log_id.display() + ) } } @@ -53,12 +59,12 @@ where C: RaftTypeConfig { pub(crate) fn new(vote: CommittedVote, membership_log_id: Option>) -> Self { Self { - vote, + leader_vote: vote, membership_log_id, } } pub(crate) fn vote_ref(&self) -> &Vote { - &self.vote + &self.leader_vote } } diff --git a/openraft/src/replication/response.rs b/openraft/src/replication/response.rs index 3340f1fb0..fae9ccd65 100644 --- a/openraft/src/replication/response.rs +++ b/openraft/src/replication/response.rs @@ -1,136 +1,74 @@ use std::fmt; -use crate::display_ext::DisplayInstantExt; use crate::display_ext::DisplayOptionExt; use crate::display_ext::DisplayResultExt; -use crate::replication::request_id::RequestId; use crate::replication::ReplicationSessionId; -use crate::type_config::alias::InstantOf; use crate::type_config::alias::LogIdOf; use crate::RaftTypeConfig; -use crate::Vote; /// The response of replication command. +/// +/// Update the `matched` log id of a replication target. +/// Sent by a replication task `ReplicationCore`. #[derive(Debug)] -pub(crate) enum Response +pub(crate) struct Progress where C: RaftTypeConfig { - // /// Logs that are submitted to append has been persisted to disk. - // LogPersisted {}, - /// Update the `matched` log id of a replication target. - /// Sent by a replication task `ReplicationCore`. - Progress { - /// The ID of the target node for which the match index is to be updated. - target: C::NodeId, + /// The ID of the target node for which the match index is to be updated. + pub(crate) target: C::NodeId, - /// The id of the subject that submit this replication action. - request_id: RequestId, + /// The id of the subject that submit this replication action. + pub(crate) request_id: u64, - /// The request by this leader has been successfully handled by the target node, - /// or an error in string. - /// - /// A successful result can still be log matching or log conflicting. - /// In either case, the request is considered accepted, i.e., this leader is still valid to - /// the target node. - /// - /// The result also track the time when this request is sent. - result: Result, String>, - - /// In which session this message is sent. - /// - /// This session id identifies a certain leader(by vote) that is replicating to a certain - /// group of nodes. - /// - /// A message should be discarded if it does not match the present vote and - /// membership_log_id. - session_id: ReplicationSessionId, - }, - - /// ReplicationCore has seen a higher `vote`. - /// Sent by a replication task `ReplicationCore`. - HigherVote { - /// The ID of the target node from which the new term was observed. - target: C::NodeId, - - /// The higher vote observed. - higher: Vote, + /// The request by this leader has been successfully handled by the target node, + /// or an error in string. + /// + /// A successful result can still be log matching or log conflicting. + /// In either case, the request is considered accepted, i.e., this leader is still valid to + /// the target node. + /// + /// The result also track the time when this request is sent. + pub(crate) result: Result, String>, - /// Which state(a Leader or Candidate) sent this message - sender_vote: Vote, - // TODO: need this? - // /// The cluster this replication works for. - // membership_log_id: Option>, - }, + /// In which session this message is sent. + /// + /// This session id identifies a certain leader(by vote) that is replicating to a certain + /// group of nodes. + /// + /// A message should be discarded if it does not match the present vote and + /// membership_log_id. + pub(crate) session_id: ReplicationSessionId, } -impl fmt::Display for Response +impl fmt::Display for Progress where C: RaftTypeConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Progress { - target, - request_id, - result, - session_id, - } => { - write!( - f, - "replication::Progress: target={}, request_id: {}, result: {}, session_id: {}", - target, - request_id, - result.display(), - session_id - ) - } - - Self::HigherVote { - target, - higher, - sender_vote, - } => { - write!( - f, - "replication::Seen a higher vote: target={}, higher: {}, sender_vote: {}", - target, higher, sender_vote - ) - } - } + write!( + f, + "replication::Progress: target={}, request_id: {}, result: {}, session_id: {}", + self.target, + self.request_id, + self.result.display(), + self.session_id + ) } } /// Result of an append-entries replication +/// +/// Ok for matching, Err for conflict. #[derive(Clone, Debug)] -pub(crate) struct ReplicationResult { - /// The timestamp when this request is sent. - /// - /// It is used to update the lease for leader. - pub(crate) sending_time: InstantOf, - - /// Ok for matching, Err for conflict. - pub(crate) result: Result>, LogIdOf>, -} +pub(crate) struct ReplicationResult(pub(crate) Result>, LogIdOf>); impl fmt::Display for ReplicationResult where C: RaftTypeConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{{sending_time:{}, result:", self.sending_time.display())?; - - match &self.result { - Ok(matching) => write!(f, "Match:{}", matching.display())?, - Err(conflict) => write!(f, "Conflict:{}", conflict)?, + match &self.0 { + Ok(matching) => write!(f, "(Match:{})", matching.display()), + Err(conflict) => write!(f, "(Conflict:{})", conflict), } - - write!(f, "}}") - } -} - -impl ReplicationResult -where C: RaftTypeConfig -{ - pub(crate) fn new(sending_time: InstantOf, result: Result>, LogIdOf>) -> Self { - Self { sending_time, result } } } @@ -139,18 +77,17 @@ mod tests { use crate::engine::testing::UTConfig; use crate::replication::response::ReplicationResult; use crate::testing::log_id; - use crate::type_config::TypeConfigExt; #[test] fn test_replication_result_display() { // NOTE that with single-term-leader, log id is `1-3` - let result = ReplicationResult::::new(UTConfig::<()>::now(), Ok(Some(log_id(1, 2, 3)))); - let want = format!(", result:Match:{}}}", log_id(1, 2, 3)); + let result = ReplicationResult::(Ok(Some(log_id(1, 2, 3)))); + let want = format!("(Match:{})", log_id(1, 2, 3)); assert!(result.to_string().ends_with(&want), "{}", result.to_string()); - let result = ReplicationResult::::new(UTConfig::<()>::now(), Err(log_id(1, 2, 3))); - let want = format!(", result:Conflict:{}}}", log_id(1, 2, 3)); + let result = ReplicationResult::(Err(log_id(1, 2, 3))); + let want = format!("(Conflict:{})", log_id(1, 2, 3)); assert!(result.to_string().ends_with(&want), "{}", result.to_string()); } } diff --git a/openraft/src/storage/callback.rs b/openraft/src/storage/callback.rs index 5deca6a25..5b5cbc19f 100644 --- a/openraft/src/storage/callback.rs +++ b/openraft/src/storage/callback.rs @@ -87,7 +87,8 @@ where C: RaftTypeConfig } Notification::HigherVote { .. } | Notification::StorageError { .. } - | Notification::Network { .. } + | Notification::ReplicationProgress { .. } + | Notification::HeartbeatProgress { .. } | Notification::StateMachine { .. } | Notification::Tick { .. } => { unreachable!("Unexpected notification: {}", self.notification) diff --git a/tests/tests/snapshot_streaming/t60_snapshot_chunk_size.rs b/tests/tests/snapshot_streaming/t60_snapshot_chunk_size.rs index 069cb0ddb..4c463b38e 100644 --- a/tests/tests/snapshot_streaming/t60_snapshot_chunk_size.rs +++ b/tests/tests/snapshot_streaming/t60_snapshot_chunk_size.rs @@ -3,16 +3,15 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; -use openraft::LogId; use openraft::ServerState; use openraft::SnapshotPolicy; use crate::fixtures::ut_harness; use crate::fixtures::RaftRouter; -/// Test transfer snapshot in small chnuks +/// Test transfer snapshot in small chunks /// /// What does this test do? /// @@ -29,6 +28,7 @@ async fn snapshot_chunk_size() -> Result<()> { snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold), snapshot_max_chunk_size: 10, enable_heartbeat: false, + max_in_snapshot_log_to_keep: 0, ..Default::default() } .validate()?, @@ -65,22 +65,14 @@ async fn snapshot_chunk_size() -> Result<()> { "send log to trigger snapshot", ) .await?; + router.wait_for_snapshot(&btreeset![0], log_id(1, 0, log_index), timeout(), "snapshot").await?; + router.assert_storage_state(1, log_index, Some(0), log_id(1, 0, log_index), want_snap).await?; + + let n0 = router.get_raft_handle(&0)?; + n0.trigger().purge_log(log_index).await?; router - .wait_for_snapshot( - &btreeset![0], - LogId::new(CommittedLeaderId::new(1, 0), log_index), - None, - "snapshot", - ) - .await?; - router - .assert_storage_state( - 1, - log_index, - Some(0), - LogId::new(CommittedLeaderId::new(1, 0), log_index), - want_snap, - ) + .wait(&0, timeout()) + .purged(Some(log_id(1, 0, 9)), "purge Leader-0 all in snapshot logs") .await?; } @@ -90,24 +82,9 @@ async fn snapshot_chunk_size() -> Result<()> { router.add_learner(0, 1).await.expect("failed to add new node as learner"); log_index += 1; - router.wait_for_log(&btreeset![0, 1], Some(log_index), None, "add learner").await?; - router - .wait_for_snapshot( - &btreeset![1], - LogId::new(CommittedLeaderId::new(1, 0), log_index), - None, - "", - ) - .await?; - - router - .wait_for_snapshot( - &btreeset![0], - LogId::new(CommittedLeaderId::new(1, 0), log_index - 1), - None, - "", - ) - .await?; + router.wait_for_log(&btreeset![0, 1], Some(log_index), timeout(), "add learner").await?; + router.wait(&1, timeout()).applied_index(Some(log_index), "sync all data to learner-1").await?; + router.wait(&1, timeout()).snapshot(log_id(1, 0, log_index - 1), "learner-1 snapshot").await?; // after add_learner, log_index + 1, // leader has only log_index log in snapshot, cause it has compacted before add_learner @@ -120,13 +97,11 @@ async fn snapshot_chunk_size() -> Result<()> { 1, log_index, Some(0), - LogId::new(CommittedLeaderId::new(1, 0), log_index), + log_id(1, 0, log_index), &Some(((log_index - 1).into(), 1)), ) .await?; - // learner has log_index + 1 log in snapshot, cause it do compact after add_learner, - // so learner's snapshot include add_learner log let (mut store, mut sm) = router.get_storage_handle(&1)?; router .assert_storage_state_with_sto( @@ -136,8 +111,8 @@ async fn snapshot_chunk_size() -> Result<()> { 1, log_index, Some(0), - LogId::new(CommittedLeaderId::new(1, 0), log_index), - &Some(((log_index).into(), 1)), + log_id(1, 0, log_index), + &Some(((log_index - 1).into(), 1)), ) .await?; } @@ -146,5 +121,5 @@ async fn snapshot_chunk_size() -> Result<()> { } fn timeout() -> Option { - Some(Duration::from_millis(1000)) + Some(Duration::from_millis(1_000)) }