Skip to content

Commit

Permalink
Refactor: simplify HigherVote notification
Browse files Browse the repository at this point in the history
`HigherVote` can be sent directly to Notification channel.
replication::Response does not need `HigherVote` variant any more.
And `Response` is renamed to `Progress`
  • Loading branch information
drmingdrmer committed Aug 3, 2024
1 parent ad1330a commit 5132324
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 128 deletions.
6 changes: 3 additions & 3 deletions openraft/src/core/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ where C: RaftTypeConfig
LocalIO { io_id: IOId<C> },

/// Result of executing a command sent from network worker.
Network { response: replication::Response<C> },
ReplicationProgress { progress: replication::Progress<C> },

/// Result of executing a command sent from state machine worker.
StateMachine { command_result: sm::CommandResult<C> },
Expand Down Expand Up @@ -96,8 +96,8 @@ where C: RaftTypeConfig
}
Self::StorageError { error } => write!(f, "StorageError: {}", error),
Self::LocalIO { io_id } => write!(f, "IOFlushed: {}", io_id),
Self::Network { response } => {
write!(f, "{}", response)
Self::ReplicationProgress { progress } => {
write!(f, "{}", progress)
}
Self::StateMachine { command_result } => {
write!(f, "{}", command_result)
Expand Down
40 changes: 5 additions & 35 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft_state::io_state::io_id::IOId;
use crate::raft_state::LogStateReader;
use crate::replication;
use crate::replication::request::Replicate;
use crate::replication::request_id::RequestId;
use crate::replication::response::ReplicationResult;
Expand Down Expand Up @@ -1418,40 +1417,11 @@ where
}
}

Notification::Network { response } => {
//
match response {
replication::Response::Progress {
target,
request_id: id,
result,
session_id,
} => {
// If vote or membership changes, ignore the message.
// There is chance delayed message reports a wrong state.
if self.does_replication_session_match(&session_id, "UpdateReplicationMatched") {
self.handle_replication_progress(target, id, result);
}
}

replication::Response::HigherVote {
target,
higher,
sender_vote,
} => {
tracing::info!(
target = display(target),
higher_vote = display(&higher),
sender_vote = display(&sender_vote),
"received Notification::HigherVote: {}",
func_name!()
);

if self.does_vote_match(&sender_vote, "HigherVote") {
// Rejected vote change is ok.
let _ = self.engine.vote_handler().update_vote(&higher);
}
}
Notification::ReplicationProgress { progress } => {
// If vote or membership changes, ignore the message.
// There is chance delayed message reports a wrong state.
if self.does_replication_session_match(&progress.session_id, "UpdateReplicationMatched") {
self.handle_replication_progress(progress.target, progress.request_id, progress.result);
}
}

Expand Down
20 changes: 9 additions & 11 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub(crate) use replication_session_id::ReplicationSessionId;
use request::Data;
use request::DataWithId;
use request::Replicate;
pub(crate) use response::Progress;
use response::ReplicationResult;
pub(crate) use response::Response;
use tracing_futures::Instrument;

use crate::async_runtime::MpscUnboundedReceiver;
Expand Down Expand Up @@ -265,12 +265,10 @@ where
return Err(closed);
}
ReplicationError::HigherVote(h) => {
let _ = self.tx_raft_core.send(Notification::Network {
response: Response::HigherVote {
target: self.target,
higher: h.higher,
sender_vote: *self.session_id.vote_ref(),
},
let _ = self.tx_raft_core.send(Notification::HigherVote {
target: self.target,
higher: h.higher,
sender_vote: *self.session_id.vote_ref(),
});
return Ok(());
}
Expand Down Expand Up @@ -499,8 +497,8 @@ where
/// Send the error result to RaftCore.
/// RaftCore will then submit another replication command.
fn send_progress_error(&mut self, request_id: RequestId, err: RPCError<C>) {
let _ = self.tx_raft_core.send(Notification::Network {
response: Response::Progress {
let _ = self.tx_raft_core.send(Notification::ReplicationProgress {
progress: Progress {
target: self.target,
request_id,
result: Err(err.to_string()),
Expand Down Expand Up @@ -531,8 +529,8 @@ where
}

let _ = self.tx_raft_core.send({
Notification::Network {
response: Response::Progress {
Notification::ReplicationProgress {
progress: Progress {
session_id: self.session_id,
request_id,
target: self.target,
Expand Down
115 changes: 37 additions & 78 deletions openraft/src/replication/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,94 +8,53 @@ use crate::replication::ReplicationSessionId;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::LogIdOf;
use crate::RaftTypeConfig;
use crate::Vote;

/// The response of replication command.
///
/// Update the `matched` log id of a replication target.
/// Sent by a replication task `ReplicationCore`.
#[derive(Debug)]
pub(crate) enum Response<C>
pub(crate) struct Progress<C>
where C: RaftTypeConfig
{
// /// Logs that are submitted to append has been persisted to disk.
// LogPersisted {},
/// Update the `matched` log id of a replication target.
/// Sent by a replication task `ReplicationCore`.
Progress {
/// The ID of the target node for which the match index is to be updated.
target: C::NodeId,

/// The id of the subject that submit this replication action.
request_id: RequestId,

/// The request by this leader has been successfully handled by the target node,
/// or an error in string.
///
/// A successful result can still be log matching or log conflicting.
/// In either case, the request is considered accepted, i.e., this leader is still valid to
/// the target node.
///
/// The result also track the time when this request is sent.
result: Result<ReplicationResult<C>, String>,

/// In which session this message is sent.
///
/// This session id identifies a certain leader(by vote) that is replicating to a certain
/// group of nodes.
///
/// A message should be discarded if it does not match the present vote and
/// membership_log_id.
session_id: ReplicationSessionId<C>,
},

/// ReplicationCore has seen a higher `vote`.
/// Sent by a replication task `ReplicationCore`.
HigherVote {
/// The ID of the target node from which the new term was observed.
target: C::NodeId,

/// The higher vote observed.
higher: Vote<C::NodeId>,

/// Which state(a Leader or Candidate) sent this message
sender_vote: Vote<C::NodeId>,
// TODO: need this?
// /// The cluster this replication works for.
// membership_log_id: Option<LogId<C::NodeId>>,
},
/// The ID of the target node for which the match index is to be updated.
pub(crate) target: C::NodeId,

/// The id of the subject that submit this replication action.
pub(crate) request_id: RequestId,

/// The request by this leader has been successfully handled by the target node,
/// or an error in string.
///
/// A successful result can still be log matching or log conflicting.
/// In either case, the request is considered accepted, i.e., this leader is still valid to
/// the target node.
///
/// The result also track the time when this request is sent.
pub(crate) result: Result<ReplicationResult<C>, String>,

/// In which session this message is sent.
///
/// This session id identifies a certain leader(by vote) that is replicating to a certain
/// group of nodes.
///
/// A message should be discarded if it does not match the present vote and
/// membership_log_id.
pub(crate) session_id: ReplicationSessionId<C>,
}

impl<C> fmt::Display for Response<C>
impl<C> fmt::Display for Progress<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Progress {
target,
request_id,
result,
session_id,
} => {
write!(
f,
"replication::Progress: target={}, request_id: {}, result: {}, session_id: {}",
target,
request_id,
result.display(),
session_id
)
}

Self::HigherVote {
target,
higher,
sender_vote,
} => {
write!(
f,
"replication::Seen a higher vote: target={}, higher: {}, sender_vote: {}",
target, higher, sender_vote
)
}
}
write!(
f,
"replication::Progress: target={}, request_id: {}, result: {}, session_id: {}",
self.target,
self.request_id,
self.result.display(),
self.session_id
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion openraft/src/storage/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ where C: RaftTypeConfig
}
Notification::HigherVote { .. }
| Notification::StorageError { .. }
| Notification::Network { .. }
| Notification::ReplicationProgress { .. }
| Notification::StateMachine { .. }
| Notification::Tick { .. } => {
unreachable!("Unexpected notification: {}", self.notification)
Expand Down

0 comments on commit 5132324

Please sign in to comment.