Skip to content

Commit

Permalink
Refactor: Remove request_id from replication
Browse files Browse the repository at this point in the history
The `request_id` was used to track request/response pairs between
`RaftCore` and `ReplicationCore` in a one-request, one-response mode. To
support streaming replication, which requires a one-request,
many-response mode, `request_id` has been removed in this
commit.
  • Loading branch information
drmingdrmer committed Aug 13, 2024
1 parent 6197f05 commit 2ae4550
Show file tree
Hide file tree
Showing 19 changed files with 180 additions and 532 deletions.
59 changes: 9 additions & 50 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::core::sm;
use crate::core::ServerState;
use crate::display_ext::DisplayInstantExt;
use crate::display_ext::DisplayOptionExt;
use crate::display_ext::DisplayResultExt;
use crate::display_ext::DisplaySlice;
use crate::display_ext::DisplaySliceExt;
use crate::engine::Command;
Expand Down Expand Up @@ -68,7 +67,6 @@ use crate::network::RPCOption;
use crate::network::RPCTypes;
use crate::network::RaftNetworkFactory;
use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::progress::Progress;
use crate::quorum::QuorumSet;
use crate::raft::message::TransferLeaderRequest;
Expand All @@ -81,8 +79,6 @@ use crate::raft::VoteResponse;
use crate::raft_state::io_state::io_id::IOId;
use crate::raft_state::LogStateReader;
use crate::replication::request::Replicate;
use crate::replication::request_id::RequestId;
use crate::replication::response::ReplicationResult;
use crate::replication::ReplicationCore;
use crate::replication::ReplicationHandle;
use crate::replication::ReplicationSessionId;
Expand Down Expand Up @@ -1426,7 +1422,11 @@ where
// If vote or membership changes, ignore the message.
// There is chance delayed message reports a wrong state.
if self.does_replication_session_match(&progress.session_id, "ReplicationProgress") {
self.handle_replication_progress(progress.target, progress.request_id, progress.result);
tracing::debug!(progress = display(&progress), "recv Notification::ReplicationProgress");

// replication_handler() won't panic because:
// The leader is still valid because progress.session_id.leader_vote does not change.
self.engine.replication_handler().update_progress(progress.target, progress.result);
}
}

Expand All @@ -1442,9 +1442,9 @@ where
sending_time = display(sending_time.display()),
"HeartbeatProgress"
);
if self.engine.leader.is_some() {
self.engine.replication_handler().update_leader_clock(target, sending_time);
}
// replication_handler() won't panic because:
// The leader is still valid because progress.session_id.leader_vote does not change.
self.engine.replication_handler().update_leader_clock(target, sending_time);
}
}

Expand Down Expand Up @@ -1551,33 +1551,6 @@ where
self.engine.elect();
}

#[tracing::instrument(level = "debug", skip_all)]
fn handle_replication_progress(
&mut self,
target: C::NodeId,
request_id: u64,
result: Result<ReplicationResult<C>, String>,
) {
tracing::debug!(
target = display(target),
request_id = display(request_id),
result = display(result.display()),
"handle_replication_progress"
);

#[allow(clippy::collapsible_if)]
if tracing::enabled!(Level::DEBUG) {
if !self.replications.contains_key(&target) {
tracing::warn!("leader has removed target: {}", target);
};
}

// A leader may have stepped down.
if self.engine.leader.is_some() {
self.engine.replication_handler().update_progress(target, request_id, result);
}
}

/// If a message is sent by a previous server state but is received by current server state,
/// it is a stale message and should be just ignored.
fn does_vote_match(&self, sender_vote: &Vote<C::NodeId>, msg: impl fmt::Display) -> bool {
Expand Down Expand Up @@ -1790,21 +1763,7 @@ where
}
Command::Replicate { req, target } => {
let node = self.replications.get(&target).expect("replication to target node exists");

match req {
Inflight::None => {
let _ = node.tx_repl.send(Replicate::Heartbeat);
}
Inflight::Logs { id, log_id_range } => {
let _ = node.tx_repl.send(Replicate::logs(RequestId::new_append_entries(id), log_id_range));
}
Inflight::Snapshot { id, last_log_id } => {
// unwrap: The replication channel must not be dropped or it is a bug.
node.tx_repl.send(Replicate::snapshot(RequestId::new_snapshot(id), last_log_id)).map_err(
|_e| StorageError::read_snapshot(None, AnyError::error("replication channel closed")),
)?;
}
}
let _ = node.tx_repl.send(req);
}
Command::BroadcastTransferLeader { req } => self.broadcast_transfer_leader(req).await,

Expand Down
2 changes: 1 addition & 1 deletion openraft/src/docs/protocol/snapshot_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ Installing snapshot includes two steps:
The final step is to purge logs up to [`snapshot_meta.last_log_id`].
This step is necessary because:

- 1) A local log that is <= [`snapshot_meta.last_log_id`] may conflict with the leader, and can not be used anymore.
- 1) A local log that is `<=` [`snapshot_meta.last_log_id`] may conflict with the leader, and can not be used anymore.

- 2) There may be a hole in the logs, if `snapshot_last_log_id > local_last_log_id`:

Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use crate::engine::CommandKind;
use crate::error::Infallible;
use crate::error::InitializeError;
use crate::error::InstallSnapshotError;
use crate::progress::Inflight;
use crate::raft::message::TransferLeaderRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::InstallSnapshotResponse;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft_state::IOId;
use crate::replication::request::Replicate;
use crate::replication::ReplicationSessionId;
use crate::type_config::alias::OneshotSenderOf;
use crate::vote::CommittedVote;
Expand Down Expand Up @@ -96,7 +96,7 @@ where C: RaftTypeConfig
},

/// Replicate log entries or snapshot to a target.
Replicate { target: C::NodeId, req: Inflight<C> },
Replicate { target: C::NodeId, req: Replicate<C> },

/// Broadcast transfer Leader message to all other nodes.
BroadcastTransferLeader { req: TransferLeaderRequest<C> },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ use crate::engine::Command;
use crate::engine::Engine;
use crate::engine::ReplicationProgress;
use crate::entry::RaftEntry;
use crate::log_id_range::LogIdRange;
use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::replication::request::Replicate;
use crate::testing::blank_ent;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
Expand Down Expand Up @@ -153,11 +154,11 @@ fn test_leader_append_entries_normal() -> anyhow::Result<()> {
},
Command::Replicate {
target: 2,
req: Inflight::logs(None, Some(log_id(3, 1, 6))).with_id(1),
req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6)))),
},
Command::Replicate {
target: 3,
req: Inflight::logs(None, Some(log_id(3, 1, 6))).with_id(1),
req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6)))),
},
],
eng.output.take_commands()
Expand Down Expand Up @@ -285,7 +286,7 @@ fn test_leader_append_entries_with_membership_log() -> anyhow::Result<()> {
},
Command::Replicate {
target: 2,
req: Inflight::logs(None, Some(log_id(3, 1, 6))).with_id(1),
req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6))))
},
],
eng.output.take_commands()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,32 +155,27 @@ fn test_leader_append_membership_update_learner_process() -> anyhow::Result<()>
if let Some(l) = &mut eng.leader.as_mut() {
assert_eq!(
&ProgressEntry::new(Some(log_id(1, 1, 4)))
.with_inflight(Inflight::logs(Some(log_id(1, 1, 4)), Some(log_id(5, 1, 10))).with_id(1))
.with_curr_inflight_id(1),
.with_inflight(Inflight::logs(Some(log_id(1, 1, 4)), Some(log_id(5, 1, 10)))),
l.progress.get(&4),
"learner-4 progress should be transferred to voter progress"
);

assert_eq!(
&ProgressEntry::new(Some(log_id(1, 1, 3)))
.with_inflight(Inflight::logs(Some(log_id(1, 1, 3)), Some(log_id(5, 1, 10))).with_id(1))
.with_curr_inflight_id(1),
.with_inflight(Inflight::logs(Some(log_id(1, 1, 3)), Some(log_id(5, 1, 10)))),
l.progress.get(&3),
"voter-3 progress should be transferred to learner progress"
);

assert_eq!(
&ProgressEntry::new(Some(log_id(1, 1, 5)))
.with_inflight(Inflight::logs(Some(log_id(1, 1, 5)), Some(log_id(5, 1, 10))).with_id(1))
.with_curr_inflight_id(1),
.with_inflight(Inflight::logs(Some(log_id(1, 1, 5)), Some(log_id(5, 1, 10)))),
l.progress.get(&5),
"learner-5 has previous value"
);

assert_eq!(
&ProgressEntry::empty(11)
.with_inflight(Inflight::logs(None, Some(log_id(5, 1, 10))).with_id(1))
.with_curr_inflight_id(1),
&ProgressEntry::empty(11).with_inflight(Inflight::logs(None, Some(log_id(5, 1, 10)))),
l.progress.get(&6)
);
} else {
Expand Down
63 changes: 16 additions & 47 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::progress::Progress;
use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
use crate::raft_state::LogStateReader;
use crate::replication::request::Replicate;
use crate::replication::response::ReplicationResult;
use crate::type_config::alias::InstantOf;
use crate::EffectiveMembership;
Expand Down Expand Up @@ -145,10 +146,9 @@ where C: RaftTypeConfig
/// Update progress when replicated data(logs or snapshot) matches on follower/learner and is
/// accepted.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_matching(&mut self, node_id: C::NodeId, inflight_id: u64, log_id: Option<LogId<C::NodeId>>) {
pub(crate) fn update_matching(&mut self, node_id: C::NodeId, log_id: Option<LogId<C::NodeId>>) {
tracing::debug!(
node_id = display(node_id),
inflight_id = display(inflight_id),
log_id = display(log_id.display()),
"{}",
func_name!()
Expand All @@ -161,13 +161,7 @@ where C: RaftTypeConfig
let quorum_accepted = *self
.leader
.progress
.update_with(&node_id, |prog_entry| {
let res = prog_entry.update_matching(inflight_id, log_id);
if let Err(e) = &res {
tracing::error!(error = display(e), "update_matching");
panic!("update_matching error: {}", e);
}
})
.update_with(&node_id, |prog_entry| prog_entry.update_matching(log_id))
.expect("it should always update existing progress");

tracing::debug!(
Expand Down Expand Up @@ -213,33 +207,19 @@ where C: RaftTypeConfig
/// Update progress when replicated data(logs or snapshot) does not match follower/learner state
/// and is rejected.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_conflicting(&mut self, target: C::NodeId, inflight_id: u64, conflict: LogId<C::NodeId>) {
pub(crate) fn update_conflicting(&mut self, target: C::NodeId, conflict: LogId<C::NodeId>) {
// TODO(2): test it?

let prog_entry = self.leader.progress.get_mut(&target).unwrap();

debug_assert_eq!(
prog_entry.inflight.get_id(),
Some(inflight_id),
"inflight({:?}) id should match: {}",
prog_entry.inflight,
inflight_id
);

prog_entry.update_conflicting(inflight_id, conflict.index).unwrap();
prog_entry.update_conflicting(conflict.index);
}

/// Update replication progress when a response is received.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_progress(
&mut self,
target: C::NodeId,
request_id: u64,
repl_res: Result<ReplicationResult<C>, String>,
) {
pub(crate) fn update_progress(&mut self, target: C::NodeId, repl_res: Result<ReplicationResult<C>, String>) {
tracing::debug!(
target = display(target),
request_id = display(request_id),
result = display(repl_res.display()),
progress = display(&self.leader.progress),
"{}",
Expand All @@ -249,29 +229,17 @@ where C: RaftTypeConfig
match repl_res {
Ok(p) => match p.0 {
Ok(matching) => {
self.update_matching(target, request_id, matching);
self.update_matching(target, matching);
}
Err(conflict) => {
self.update_conflicting(target, request_id, conflict);
self.update_conflicting(target, conflict);
}
},
Err(err_str) => {
tracing::warn!(
request_id = display(request_id),
result = display(&err_str),
"update progress error"
);
tracing::warn!(result = display(&err_str), "update progress error");

// Reset inflight state and it will retry.
let p = self.leader.progress.get_mut(&target).unwrap();

debug_assert!(
p.inflight.is_my_id(request_id),
"inflight({:?}) id should match: {}",
p.inflight,
request_id
);

p.inflight = Inflight::None;
}
};
Expand Down Expand Up @@ -328,10 +296,12 @@ where C: RaftTypeConfig

#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn send_to_target(output: &mut EngineOutput<C>, target: &C::NodeId, inflight: &Inflight<C>) {
output.push_command(Command::Replicate {
target: *target,
req: *inflight,
});
let req = match inflight {
Inflight::None => unreachable!("no data to send"),
Inflight::Logs { log_id_range } => Replicate::logs(*log_id_range),
Inflight::Snapshot { last_log_id } => Replicate::snapshot(*last_log_id),
};
output.push_command(Command::Replicate { target: *target, req });
}

/// Try to run a pending purge job, if no tasks are using the logs to be purged.
Expand Down Expand Up @@ -403,8 +373,7 @@ where C: RaftTypeConfig
// TODO: It should be self.state.last_log_id() but None is ok.
prog_entry.inflight = Inflight::logs(None, upto);

let inflight_id = prog_entry.inflight.get_id().unwrap();
self.update_matching(id, inflight_id, upto);
self.update_matching(id, upto);
}
}

Expand Down
Loading

0 comments on commit 2ae4550

Please sign in to comment.