Skip to content

Commit

Permalink
Refactor: Send heartbeat with dedicated workers
Browse files Browse the repository at this point in the history
Heavy AppendEntries traffic can block heartbeat messages. For example,
future AppendEntries in stream RPC may not receive a response indicating
a follower is alive. In such cases, the leader might time out to extend
its lease, and be considered partitioned from the cluster.

This commit moves heartbeat broadcasting to separate tasks that won't be
blocked by AppendEntries. This ensures the leader can always be
acknowledged with the liveness of followers.

Separate log progress notification and clock progress notification:
When ReplicationCore successfully finished one RPC to Follower/Learner,
it informs the RaftCore to update log progress and clock(heartbeat) progress.
This commit split these two informations into two `Notification`
variants, in order to make progress handling more clear.

Another improvement is to ignore a heartbeat progress if it is sent with
an older cluster membership config. Because a follower can be removed
and re-added, the obsolete heartbeat progress is invalid. This check is
done by remembering the membership log id in the `HeartbeatEvent`.

`HigherVote` can be sent directly to Notification channel.
replication::Response does not need `HigherVote` variant any more.
And `Response` is renamed to `Progress`
  • Loading branch information
drmingdrmer committed Aug 6, 2024
1 parent 77caec9 commit dad8032
Show file tree
Hide file tree
Showing 20 changed files with 550 additions and 342 deletions.
64 changes: 64 additions & 0 deletions openraft/src/core/heartbeat/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::fmt;

use crate::display_ext::DisplayInstantExt;
use crate::display_ext::DisplayOptionExt;
use crate::replication::ReplicationSessionId;
use crate::type_config::alias::InstantOf;
use crate::LogId;
use crate::RaftTypeConfig;

/// The information for broadcasting a heartbeat.
#[derive(Debug, Clone, Copy)]
#[derive(PartialEq, Eq)]
pub struct HeartbeatEvent<C>
where C: RaftTypeConfig
{
/// The timestamp when this heartbeat is sent.
///
/// The Leader use this sending time to calculate the quorum acknowledge time, but not the
/// receiving timestamp.
pub(crate) time: InstantOf<C>,

/// The vote of the Leader that submit this heartbeat and the log id of the cluster config.
///
/// The response that matches this session id is considered as a valid response.
/// Otherwise, it is considered as an outdated response from older leader or older cluster
/// membership config and will be ignored.
pub(crate) session_id: ReplicationSessionId<C>,

/// The last known committed log id of the Leader.
///
/// When there are no new logs to replicate, the Leader sends a heartbeat to replicate committed
/// log id to followers to update their committed log id.
pub(crate) committed: Option<LogId<C::NodeId>>,
}

impl<C> HeartbeatEvent<C>
where C: RaftTypeConfig
{
pub(crate) fn new(
time: InstantOf<C>,
session_id: ReplicationSessionId<C>,
committed: Option<LogId<C::NodeId>>,
) -> Self {
Self {
time,
session_id,
committed,
}
}
}

impl<C> fmt::Display for HeartbeatEvent<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"(time={}, leader_vote: {}, committed: {})",
self.time.display(),
self.session_id,
self.committed.display()
)
}
}
97 changes: 97 additions & 0 deletions openraft/src/core/heartbeat/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use tracing::Instrument;
use tracing::Level;
use tracing::Span;

use crate::async_runtime::watch::WatchSender;
use crate::core::heartbeat::event::HeartbeatEvent;
use crate::core::heartbeat::worker::HeartbeatWorker;
use crate::core::notification::Notification;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::WatchReceiverOf;
use crate::type_config::alias::WatchSenderOf;
use crate::type_config::TypeConfigExt;
use crate::Config;
use crate::RaftNetworkFactory;
use crate::RaftTypeConfig;

pub(crate) struct HeartbeatWorkersHandle<C>
where C: RaftTypeConfig
{
pub(crate) id: C::NodeId,

pub(crate) config: Arc<Config>,

/// Inform the heartbeat task to broadcast heartbeat message.
///
/// A Leader will periodically update this value to trigger sending heartbeat messages.
pub(crate) tx: WatchSenderOf<C, Option<HeartbeatEvent<C>>>,

/// The receiving end of heartbeat command.
///
/// A separate task will have a clone of this receiver to receive and execute heartbeat command.
pub(crate) rx: WatchReceiverOf<C, Option<HeartbeatEvent<C>>>,

pub(crate) workers: BTreeMap<C::NodeId, (OneshotSenderOf<C, ()>, JoinHandleOf<C, ()>)>,
}

impl<C> HeartbeatWorkersHandle<C>
where C: RaftTypeConfig
{
pub(crate) fn new(id: C::NodeId, config: Arc<Config>) -> Self {
let (tx, rx) = C::watch_channel(None);

Self {
id,
config,
tx,
rx,
workers: Default::default(),
}
}

pub(crate) fn broadcast(&self, event: HeartbeatEvent<C>) {
tracing::debug!("id={} send_heartbeat {}", self.id, event);
let _ = self.tx.send(Some(event));
}

pub(crate) async fn spawn_workers<NF>(
&mut self,
network_factory: &mut NF,
tx_notification: &MpscUnboundedSenderOf<C, Notification<C>>,
targets: impl IntoIterator<Item = (C::NodeId, C::Node)>,
) where
NF: RaftNetworkFactory<C>,
{
for (target, node) in targets {
tracing::debug!("id={} spawn HeartbeatWorker target={}", self.id, target);
let network = network_factory.new_client(target, &node).await;

let worker = HeartbeatWorker {
id: self.id,
rx: self.rx.clone(),
network,
target,
node,
config: self.config.clone(),
tx_notification: tx_notification.clone(),
};

let span = tracing::span!(parent: &Span::current(), Level::DEBUG, "heartbeat", id=display(self.id), target=display(target));

let (tx_shutdown, rx_shutdown) = C::oneshot();

let worker_handle = C::spawn(worker.run(rx_shutdown).instrument(span));
self.workers.insert(target, (tx_shutdown, worker_handle));
}
}

pub(crate) fn shutdown(&mut self) {
self.workers.clear();
tracing::info!("id={} HeartbeatWorker are shutdown", self.id);
}
}
3 changes: 3 additions & 0 deletions openraft/src/core/heartbeat/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub(crate) mod event;
pub(crate) mod handle;
pub(crate) mod worker;
114 changes: 114 additions & 0 deletions openraft/src/core/heartbeat/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::fmt;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

use futures::FutureExt;

use crate::async_runtime::watch::WatchReceiver;
use crate::async_runtime::MpscUnboundedSender;
use crate::core::heartbeat::event::HeartbeatEvent;
use crate::core::notification::Notification;
use crate::network::v2::RaftNetworkV2;
use crate::network::RPCOption;
use crate::raft::AppendEntriesRequest;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::WatchReceiverOf;
use crate::type_config::TypeConfigExt;
use crate::Config;
use crate::RaftTypeConfig;

/// A dedicate worker sending heartbeat to a specific follower.
pub struct HeartbeatWorker<C, N>
where
C: RaftTypeConfig,
N: RaftNetworkV2<C>,
{
pub(crate) id: C::NodeId,

/// The receiver will be changed when a new heartbeat is needed to be sent.
pub(crate) rx: WatchReceiverOf<C, Option<HeartbeatEvent<C>>>,

pub(crate) network: N,

pub(crate) target: C::NodeId,

#[allow(dead_code)]
pub(crate) node: C::Node,

pub(crate) config: Arc<Config>,

/// For sending back result to the [`RaftCore`].
///
/// [`RaftCore`]: crate::core::RaftCore
pub(crate) tx_notification: MpscUnboundedSenderOf<C, Notification<C>>,
}

impl<C, N> fmt::Display for HeartbeatWorker<C, N>
where
C: RaftTypeConfig,
N: RaftNetworkV2<C>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "HeartbeatWorker(id={}, target={})", self.id, self.target)
}
}

impl<C, N> HeartbeatWorker<C, N>
where
C: RaftTypeConfig,
N: RaftNetworkV2<C>,
{
pub(crate) async fn run(mut self, mut rx_shutdown: OneshotReceiverOf<C, ()>) {
loop {
tracing::debug!("{} is waiting for a new heartbeat event.", self);

futures::select! {
_ = (&mut rx_shutdown).fuse() => {
tracing::info!("{} is shutdown.", self);
return;
},
_ = self.rx.changed().fuse() => {},
}

let heartbeat: Option<HeartbeatEvent<C>> = *self.rx.borrow_watched();

// None is the initial value of the WatchReceiver, ignore it.
let Some(heartbeat) = heartbeat else {
continue;
};

let timeout = Duration::from_millis(self.config.heartbeat_interval);
let option = RPCOption::new(timeout);

let payload = AppendEntriesRequest {
vote: *heartbeat.session_id.leader_vote.deref(),
prev_log_id: None,
leader_commit: heartbeat.committed,
entries: vec![],
};

let res = C::timeout(timeout, self.network.append_entries(payload, option)).await;
tracing::debug!("{} sent a heartbeat: {}, result: {:?}", self, heartbeat, res);

match res {
Ok(Ok(_)) => {
let res = self.tx_notification.send(Notification::HeartbeatProgress {
session_id: heartbeat.session_id,
sending_time: heartbeat.time,
target: self.target,
});

if res.is_err() {
tracing::error!("{} failed to send a heartbeat progress to RaftCore. quit", self);
return;
}
}
_ => {
tracing::warn!("{} failed to send a heartbeat: {:?}", self, res);
}
}
}
}
}
1 change: 1 addition & 0 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! storage or forward messages to other raft nodes.

pub(crate) mod balancer;
pub(crate) mod heartbeat;
pub(crate) mod notification;
mod raft_core;
pub(crate) mod raft_msg;
Expand Down
28 changes: 25 additions & 3 deletions openraft/src/core/notification.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::fmt;

use crate::core::sm;
use crate::display_ext::DisplayInstantExt;
use crate::raft::VoteResponse;
use crate::raft_state::IOId;
use crate::replication;
use crate::replication::ReplicationSessionId;
use crate::type_config::alias::InstantOf;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::Vote;
Expand Down Expand Up @@ -47,7 +50,13 @@ where C: RaftTypeConfig
LocalIO { io_id: IOId<C> },

/// Result of executing a command sent from network worker.
Network { response: replication::Response<C> },
ReplicationProgress { progress: replication::Progress<C> },

HeartbeatProgress {
session_id: ReplicationSessionId<C>,
sending_time: InstantOf<C>,
target: C::NodeId,
},

/// Result of executing a command sent from state machine worker.
StateMachine { command_result: sm::CommandResult<C> },
Expand Down Expand Up @@ -96,8 +105,21 @@ where C: RaftTypeConfig
}
Self::StorageError { error } => write!(f, "StorageError: {}", error),
Self::LocalIO { io_id } => write!(f, "IOFlushed: {}", io_id),
Self::Network { response } => {
write!(f, "{}", response)
Self::ReplicationProgress { progress } => {
write!(f, "{}", progress)
}
Self::HeartbeatProgress {
session_id: leader_vote,
sending_time,
target,
} => {
write!(
f,
"HeartbeatProgress: target={}, leader_vote: {}, sending_time: {}",
target,
leader_vote,
sending_time.display(),
)
}
Self::StateMachine { command_result } => {
write!(f, "{}", command_result)
Expand Down
Loading

0 comments on commit dad8032

Please sign in to comment.