From eceae119b14c1df7b083df9059e32bb9d3381015 Mon Sep 17 00:00:00 2001 From: lihzeng Date: Thu, 22 Aug 2024 19:41:47 -0700 Subject: [PATCH] minor fix --- include/libnuraft/raft_server.hxx | 1 - src/handle_append_entries.cxx | 25 +++++++++++++++++++------ src/peer.cxx | 5 +++-- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/include/libnuraft/raft_server.hxx b/include/libnuraft/raft_server.hxx index 2d44abf..c1e9623 100644 --- a/include/libnuraft/raft_server.hxx +++ b/include/libnuraft/raft_server.hxx @@ -893,7 +893,6 @@ protected: void apply_to_not_responding_peers(const std::function&)>&, int expiry = 0); ptr handle_append_entries(req_msg& req); - bool try_start_append(ptr& p, bool make_busy_success); ptr handle_prevote_req(req_msg& req); ptr handle_vote_req(req_msg& req); ptr handle_cli_req_prelock(req_msg& req, const req_ext_params& ext_params); diff --git a/src/handle_append_entries.cxx b/src/handle_append_entries.cxx index d31e89c..7c406c9 100644 --- a/src/handle_append_entries.cxx +++ b/src/handle_append_entries.cxx @@ -288,19 +288,30 @@ bool raft_server::request_append_entries(ptr p) { msg = create_append_entries_req(p, last_streamed_log_idx); m_handler = resp_handler_; if (msg) { + p_tr("send request to %d\n in stream mode, last streamed log idx: %ld", + (int)p->get_id(), last_streamed_log_idx); if (msg->get_type() == msg_type::append_entries_request) { // two condition // 1. streaming is on // 2. streaming is off, but it is the first request if (last_streamed_log_idx == 0 - && p->start_append_entry() == 0) { + && p->start_append_entry()) { + p_tr("send first request to %d\n in stream mode, " + "start idx: %ld", (int)p->get_id(), + msg->get_last_log_idx()); return send_request(p, msg, m_handler); } else if (last_streamed_log_idx > 0) { + p_tr("send following request to %d\n in stream mode, " + "start idx: %ld", (int)p->get_id(), + msg->get_last_log_idx()); p->add_append_entry_request(); return send_request(p, msg, m_handler); } } else { if (p->make_busy()) { + p_tr("send %s request to %d\n in stream mode", + msg_type_to_string(msg->get_type()).c_str(), + (int)p->get_id()); return send_request(p, msg, m_handler); } } @@ -368,7 +379,9 @@ bool raft_server::request_append_entries(ptr p) { -bool raft_server::send_request(ptr& p, ptr& msg, rpc_handler& m_handler) { +bool raft_server::send_request(ptr& p, + ptr& msg, + rpc_handler& m_handler) { if (!p->is_manual_free()) { // Actual recovery. if ( p->get_long_puase_warnings() >= @@ -482,7 +495,7 @@ ptr raft_server::create_append_entries_req(ptr& pp ,ulong last_st last_log_idx + 1 + ctx_->get_params()->max_append_size_ ); if (last_streamed_log_idx > 0) { end_idx = std::min( end_idx, last_log_idx + (p.get_max_log_gap_in_stream() - - last_streamed_log_idx - p.get_next_log_idx() + 1) ); + (last_streamed_log_idx - p.get_next_log_idx() + 1)) ); } // NOTE: If this is a retry, probably the follower is down. @@ -1084,13 +1097,13 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) { commit( committed_index ); // Try enable stream here - ulong accepted_pre_commit_idx = resp.get_next_idx() + + ulong accepted_precommit_idx = resp.get_next_idx() + p->get_max_log_gap_in_stream(); if (p->is_streaming() && p->get_last_streamed_log_idx() == 0 && - resp.get_next_idx() > 0 && p->is_streaming() && + resp.get_next_idx() > 0 && p->get_last_sent_idx() < resp.get_next_idx() && - precommit_index_ < accepted_pre_commit_idx) { + precommit_index_ < accepted_precommit_idx) { p->set_last_streamed_log_idx(0, resp.get_next_idx() - 1); } diff --git a/src/peer.cxx b/src/peer.cxx index 7228073..cd1a315 100644 --- a/src/peer.cxx +++ b/src/peer.cxx @@ -189,8 +189,9 @@ void peer::try_set_free(msg_type type) { } if (type == msg_type::append_entries_request) { - flying_append_entry_request_.fetch_sub(1); - if (!is_streaming()) { + if (is_streaming()) { + flying_append_entry_request_.fetch_sub(1); + } else { set_free(); } }