Skip to content

Commit

Permalink
minor fix
Browse files Browse the repository at this point in the history
  • Loading branch information
lihzeng committed Aug 23, 2024
1 parent 1181dee commit eceae11
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
1 change: 0 additions & 1 deletion include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,6 @@ protected:
void apply_to_not_responding_peers(const std::function<void(const ptr<peer>&)>&, int expiry = 0);

ptr<resp_msg> handle_append_entries(req_msg& req);
bool try_start_append(ptr<peer>& p, bool make_busy_success);
ptr<resp_msg> handle_prevote_req(req_msg& req);
ptr<resp_msg> handle_vote_req(req_msg& req);
ptr<resp_msg> handle_cli_req_prelock(req_msg& req, const req_ext_params& ext_params);
Expand Down
25 changes: 19 additions & 6 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -288,19 +288,30 @@ bool raft_server::request_append_entries(ptr<peer> p) {
msg = create_append_entries_req(p, last_streamed_log_idx);
m_handler = resp_handler_;
if (msg) {
p_tr("send request to %d\n in stream mode, last streamed log idx: %ld",
(int)p->get_id(), last_streamed_log_idx);
if (msg->get_type() == msg_type::append_entries_request) {
// two condition
// 1. streaming is on
// 2. streaming is off, but it is the first request
if (last_streamed_log_idx == 0
&& p->start_append_entry() == 0) {
&& p->start_append_entry()) {
p_tr("send first request to %d\n in stream mode, "
"start idx: %ld", (int)p->get_id(),
msg->get_last_log_idx());
return send_request(p, msg, m_handler);
} else if (last_streamed_log_idx > 0) {
p_tr("send following request to %d\n in stream mode, "
"start idx: %ld", (int)p->get_id(),
msg->get_last_log_idx());
p->add_append_entry_request();
return send_request(p, msg, m_handler);
}
} else {
if (p->make_busy()) {
p_tr("send %s request to %d\n in stream mode",
msg_type_to_string(msg->get_type()).c_str(),
(int)p->get_id());
return send_request(p, msg, m_handler);
}
}
Expand Down Expand Up @@ -368,7 +379,9 @@ bool raft_server::request_append_entries(ptr<peer> p) {



bool raft_server::send_request(ptr<peer>& p, ptr<req_msg>& msg, rpc_handler& m_handler) {
bool raft_server::send_request(ptr<peer>& p,
ptr<req_msg>& msg,
rpc_handler& m_handler) {
if (!p->is_manual_free()) {
// Actual recovery.
if ( p->get_long_puase_warnings() >=
Expand Down Expand Up @@ -482,7 +495,7 @@ ptr<req_msg> raft_server::create_append_entries_req(ptr<peer>& pp ,ulong last_st
last_log_idx + 1 + ctx_->get_params()->max_append_size_ );
if (last_streamed_log_idx > 0) {
end_idx = std::min( end_idx, last_log_idx + (p.get_max_log_gap_in_stream() -
last_streamed_log_idx - p.get_next_log_idx() + 1) );
(last_streamed_log_idx - p.get_next_log_idx() + 1)) );
}

// NOTE: If this is a retry, probably the follower is down.
Expand Down Expand Up @@ -1084,13 +1097,13 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
commit( committed_index );

// Try enable stream here
ulong accepted_pre_commit_idx = resp.get_next_idx() +
ulong accepted_precommit_idx = resp.get_next_idx() +
p->get_max_log_gap_in_stream();
if (p->is_streaming() &&
p->get_last_streamed_log_idx() == 0 &&
resp.get_next_idx() > 0 && p->is_streaming() &&
resp.get_next_idx() > 0 &&
p->get_last_sent_idx() < resp.get_next_idx() &&
precommit_index_ < accepted_pre_commit_idx) {
precommit_index_ < accepted_precommit_idx) {
p->set_last_streamed_log_idx(0, resp.get_next_idx() - 1);
}

Expand Down
5 changes: 3 additions & 2 deletions src/peer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,9 @@ void peer::try_set_free(msg_type type) {
}

if (type == msg_type::append_entries_request) {
flying_append_entry_request_.fetch_sub(1);
if (!is_streaming()) {
if (is_streaming()) {
flying_append_entry_request_.fetch_sub(1);
} else {
set_free();
}
}
Expand Down

0 comments on commit eceae11

Please sign in to comment.