Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lihzeng committed Aug 23, 2024
1 parent eceae11 commit 60b3781
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 66 deletions.
32 changes: 4 additions & 28 deletions include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ public:
, rsv_msg_(nullptr)
, rsv_msg_handler_(nullptr)
, last_streamed_log_idx_(0)
, max_log_gap_in_stream_( ctx.get_params()->max_log_gap_in_stream_ )
, flying_append_entry_request_(0)
, l_(logger)
{
reset_ls_timer();
Expand Down Expand Up @@ -220,7 +218,8 @@ public:

void send_req(ptr<peer> myself,
ptr<req_msg>& req,
rpc_handler& handler);
rpc_handler& handler,
bool streaming = false);

void shutdown();

Expand Down Expand Up @@ -314,28 +313,11 @@ public:
last_streamed_log_idx_.compare_exchange_strong(expected, idx);
}

bool is_streaming() {
return max_log_gap_in_stream_ > 0;
}

int32 get_max_log_gap_in_stream() {
return max_log_gap_in_stream_;
}

void reset_stream() {
last_streamed_log_idx_.store(0);
}

bool start_append_entry() {
ulong expected = 0;
return flying_append_entry_request_.compare_exchange_strong(expected, 1);
}

void add_append_entry_request() {
flying_append_entry_request_.fetch_add(1);
}

void try_set_free(msg_type type);
void try_set_free(msg_type type, bool streaming);

bool is_lost() const { return lost_by_leader_; }
void set_lost() { lost_by_leader_ = true; }
Expand All @@ -346,6 +328,7 @@ private:
ptr<rpc_client> my_rpc_client,
ptr<req_msg>& req,
ptr<rpc_result>& pending_result,
bool streaming,
ptr<resp_msg>& resp,
ptr<rpc_exception>& err);

Expand Down Expand Up @@ -558,13 +541,6 @@ private:
*/
std::atomic<ulong> last_streamed_log_idx_;

int32 max_log_gap_in_stream_;

/**
* if `true`, this peer is in stream mode.
*/
std::atomic<ulong> flying_append_entry_request_;

/**
* Logger instance.
*/
Expand Down
2 changes: 1 addition & 1 deletion include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ protected:
void request_vote(bool force_vote);
void request_append_entries();
bool request_append_entries(ptr<peer> p);
bool send_request(ptr<peer>& p, ptr<req_msg>& msg, rpc_handler& m_handler);
bool send_request(ptr<peer>& p, ptr<req_msg>& msg, rpc_handler& m_handler, bool streaming = false);
void handle_peer_resp(ptr<resp_msg>& resp, ptr<rpc_exception>& err);
void handle_append_entries_resp(resp_msg& resp);
void handle_install_snapshot_resp(resp_msg& resp);
Expand Down
1 change: 1 addition & 0 deletions scripts/test/runtests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ set -e
./tests/failure_test --abort-on-failure
./tests/asio_service_test --abort-on-failure
./tests/asio_service_stream_test --abort-on-failure
./tests/asio_stream_test --abort-on-failure
45 changes: 19 additions & 26 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ bool raft_server::request_append_entries(ptr<peer> p) {
}
}

if (p->is_streaming()) {
if (ctx_->get_params()->max_log_gap_in_stream_ > 0) {
// If reserved message exists, process it first.
ptr<req_msg> msg = p->get_rsv_msg();
rpc_handler m_handler = p->get_rsv_msg_handler();
Expand All @@ -290,23 +290,12 @@ bool raft_server::request_append_entries(ptr<peer> p) {
if (msg) {
p_tr("send request to %d\n in stream mode, last streamed log idx: %ld",
(int)p->get_id(), last_streamed_log_idx);
if (msg->get_type() == msg_type::append_entries_request) {
// two condition
// 1. streaming is on
// 2. streaming is off, but it is the first request
if (last_streamed_log_idx == 0
&& p->start_append_entry()) {
p_tr("send first request to %d\n in stream mode, "
"start idx: %ld", (int)p->get_id(),
msg->get_last_log_idx());
return send_request(p, msg, m_handler);
} else if (last_streamed_log_idx > 0) {
p_tr("send following request to %d\n in stream mode, "
"start idx: %ld", (int)p->get_id(),
msg->get_last_log_idx());
p->add_append_entry_request();
return send_request(p, msg, m_handler);
}
if (msg->get_type() == msg_type::append_entries_request &&
last_streamed_log_idx > 0) {
p_tr("send following request to %d\n in stream mode, "
"start idx: %ld", (int)p->get_id(),
msg->get_last_log_idx());
return send_request(p, msg, m_handler, true);
} else {
if (p->make_busy()) {
p_tr("send %s request to %d\n in stream mode",
Expand Down Expand Up @@ -381,7 +370,8 @@ bool raft_server::request_append_entries(ptr<peer> p) {

bool raft_server::send_request(ptr<peer>& p,
ptr<req_msg>& msg,
rpc_handler& m_handler) {
rpc_handler& m_handler,
bool streaming) {
if (!p->is_manual_free()) {
// Actual recovery.
if ( p->get_long_puase_warnings() >=
Expand Down Expand Up @@ -493,9 +483,11 @@ ptr<req_msg> raft_server::create_append_entries_req(ptr<peer>& pp ,ulong last_st
// return nullptr to indicate such errors.
ulong end_idx = std::min( cur_nxt_idx,
last_log_idx + 1 + ctx_->get_params()->max_append_size_ );
if (last_streamed_log_idx > 0) {
end_idx = std::min( end_idx, last_log_idx + (p.get_max_log_gap_in_stream() -
(last_streamed_log_idx - p.get_next_log_idx() + 1)) );
// max_acceptable_stream_log may be smaller than 0
ulong max_acceptable_stream_log = ctx_->get_params()->max_log_gap_in_stream_ -
(last_streamed_log_idx - p.get_next_log_idx() + 1);
if (last_streamed_log_idx > 0 && max_acceptable_stream_log > 0) {
end_idx = std::min( end_idx, last_log_idx + max_acceptable_stream_log );
}

// NOTE: If this is a retry, probably the follower is down.
Expand Down Expand Up @@ -1097,13 +1089,14 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
commit( committed_index );

// Try enable stream here
ulong accepted_precommit_idx = resp.get_next_idx() +
p->get_max_log_gap_in_stream();
if (p->is_streaming() &&
int32 max_gap_in_stream = ctx_->get_params()->max_log_gap_in_stream_;
ulong acceptable_precommit_idx = resp.get_next_idx() +
max_gap_in_stream;
if (max_gap_in_stream > 0 &&
p->get_last_streamed_log_idx() == 0 &&
resp.get_next_idx() > 0 &&
p->get_last_sent_idx() < resp.get_next_idx() &&
precommit_index_ < accepted_precommit_idx) {
precommit_index_ < acceptable_precommit_idx) {
p->set_last_streamed_log_idx(0, resp.get_next_idx() - 1);
}

Expand Down
21 changes: 10 additions & 11 deletions src/peer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ namespace nuraft {

void peer::send_req( ptr<peer> myself,
ptr<req_msg>& req,
rpc_handler& handler )
rpc_handler& handler,
bool streaming )
{
if (abandoned_) {
p_er("peer %d has been shut down, cannot send request",
Expand Down Expand Up @@ -63,6 +64,7 @@ void peer::send_req( ptr<peer> myself,
rpc_local,
req,
pending,
streaming,
std::placeholders::_1,
std::placeholders::_2 );
if (rpc_local) {
Expand All @@ -80,6 +82,7 @@ void peer::handle_rpc_result( ptr<peer> myself,
ptr<rpc_client> my_rpc_client,
ptr<req_msg>& req,
ptr<rpc_result>& pending_result,
bool streaming,
ptr<resp_msg>& resp,
ptr<rpc_exception>& err )
{
Expand Down Expand Up @@ -117,7 +120,7 @@ void peer::handle_rpc_result( ptr<peer> myself,
// WARNING:
// `set_free()` should be protected by `rpc_protector_`, otherwise
// it may free the peer even though new RPC client is already created.
try_set_free(req->get_type());
try_set_free(req->get_type(), streaming);
}

reset_active_timer();
Expand Down Expand Up @@ -152,7 +155,8 @@ void peer::handle_rpc_result( ptr<peer> myself,
uint64_t given_rpc_id = my_rpc_client ? my_rpc_client->get_id() : 0;
if (cur_rpc_id == given_rpc_id) {
rpc_.reset();
try_set_free(req->get_type());
reset_stream();
try_set_free(req->get_type(), streaming);
} else {
// WARNING (MONSTOR-9378):
// RPC client has been reset before this request returns
Expand All @@ -171,7 +175,7 @@ void peer::handle_rpc_result( ptr<peer> myself,
}
}

void peer::try_set_free(msg_type type) {
void peer::try_set_free(msg_type type, bool streaming) {
const static std::unordered_set<int> msg_types_to_free( {
// msg_type::append_entries_request,
msg_type::install_snapshot_request,
Expand All @@ -188,12 +192,8 @@ void peer::try_set_free(msg_type type) {
set_free();
}

if (type == msg_type::append_entries_request) {
if (is_streaming()) {
flying_append_entry_request_.fetch_sub(1);
} else {
set_free();
}
if (type == msg_type::append_entries_request && !streaming) {
set_free();
}
}

Expand Down Expand Up @@ -241,7 +241,6 @@ bool peer::recreate_rpc(ptr<srv_config>& config,
reset_active_timer();

reset_stream();
flying_append_entry_request_.store(0);
set_free();
set_manual_free();
return true;
Expand Down

0 comments on commit 60b3781

Please sign in to comment.