Skip to content

Commit

Permalink
Merge branch 'master' into Follower-loss-notification
Browse files Browse the repository at this point in the history
  • Loading branch information
greensky00 committed Jul 18, 2024
2 parents e45952a + 35083c6 commit 480b1ad
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 6 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
timeout-minutes: 30

steps:
- uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # pin@v2
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # pin@v2

- name: Fix kernel mmap rnd bits
# Asan in llvm 14 provided in ubuntu 22.04 is incompatible with
Expand All @@ -52,7 +52,7 @@ jobs:
run: ./github_action_build.sh

- name: Code Coverage Metrics
uses: codecov/codecov-action@125fc84a9a348dbcf27191600683ec096ec9021c # pin@v3
uses: codecov/codecov-action@e28ff129e5465c2c0dcc6f003fc735cb6ae0c673 # pin@v3
with:
verbose: true
files: ./build/raft_cov.info.cleaned
2 changes: 1 addition & 1 deletion include/libnuraft/async.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ limitations under the License.

namespace nuraft {

enum cmd_result_code {
enum cmd_result_code : int32_t {
OK = 0,
CANCELLED = -1,
TIMEOUT = -2,
Expand Down
45 changes: 42 additions & 3 deletions src/asio_service.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ limitations under the License.
// If set, each log entry will contain a CRC on the payload.
#define CRC_ON_PAYLOAD (0x10)

// If set, RPC message (response) includes result code
#define INCLUDE_RESULT_CODE (0x20)

// =======================

namespace nuraft {
Expand Down Expand Up @@ -735,6 +738,7 @@ class rpc_session
try {
ptr<buffer> resp_ctx = resp->get_ctx();
int32 resp_ctx_size = (resp_ctx) ? resp_ctx->size() : 0;
int32 result_code_size = sizeof(int32_t);

uint32_t flags = 0x0;
size_t resp_meta_size = 0;
Expand All @@ -759,6 +763,13 @@ class rpc_session

size_t carried_data_size = resp_meta_size + resp_hint_size + resp_ctx_size;

if (req->get_type() == msg_type::client_request ||
req->get_type() == msg_type::add_server_request ||
req->get_type() == msg_type::remove_server_request) {
flags |= INCLUDE_RESULT_CODE;
carried_data_size += result_code_size;
}

int buf_size = RPC_RESP_HEADER_SIZE + carried_data_size;
ptr<buffer> resp_buf = buffer::alloc(buf_size);
buffer_serializer bs(resp_buf);
Expand Down Expand Up @@ -798,6 +809,11 @@ class rpc_session
bs.put_buffer(*resp_ctx);
}

/* Put result code at the end to avoid breaking backward compatibility */
if (flags & INCLUDE_RESULT_CODE) {
bs.put_i32(resp->get_result_code());
}

aa::write( ssl_enabled_, ssl_socket_, socket_,
asio::buffer(resp_buf->data_begin(), resp_buf->size()),
[this, self, resp_buf]
Expand Down Expand Up @@ -1407,6 +1423,15 @@ class asio_rpc_client
send_timeout_ms,
std::placeholders::_1,
std::placeholders::_2 ) );
if (send_timeout_ms != 0) {
operation_timer_.expires_after
( std::chrono::duration_cast<std::chrono::nanoseconds>
( std::chrono::milliseconds( send_timeout_ms ) ) );
operation_timer_.async_wait(
std::bind( &asio_rpc_client::cancel_socket,
this,
std::placeholders::_1 ) );
}
} else {
ptr<resp_msg> rsp;
ptr<rpc_exception> except
Expand Down Expand Up @@ -1477,6 +1502,7 @@ class asio_rpc_client
std::error_code err,
asio::ip::tcp::resolver::iterator itor)
{
operation_timer_.cancel();
if (!err) {
p_in( "%p connected to %s:%s (as a client)",
this, host_.c_str(), port_.c_str() );
Expand Down Expand Up @@ -1678,8 +1704,9 @@ class asio_rpc_client
size_t bytes_transferred)
{
if ( !(flags & INCLUDE_META) &&
!(flags & INCLUDE_HINT) ) {
// Neither meta nor hint exists,
!(flags & INCLUDE_HINT) &&
!(flags & INCLUDE_RESULT_CODE)) {
// Neither meta nor hint nor result code exists,
// just use the buffer as it is for ctx.
ctx_buf->pos(0);
rsp->set_ctx(ctx_buf);
Expand Down Expand Up @@ -1729,9 +1756,21 @@ class asio_rpc_client
assert(remaining_len >= 0);
if (remaining_len) {
// It has context, read it.
ptr<buffer> actual_ctx = buffer::alloc(remaining_len);
size_t ctx_len = remaining_len;
if (flags & INCLUDE_RESULT_CODE) {
ctx_len -= sizeof(int32_t);
}
ptr<buffer> actual_ctx = buffer::alloc(ctx_len);
bs.get_buffer(actual_ctx);
rsp->set_ctx(actual_ctx);
remaining_len -= ctx_len;
}

// 4) Result code
if (flags & INCLUDE_RESULT_CODE) {
assert((size_t)remaining_len >= sizeof(int32_t));
cmd_result_code res = static_cast<cmd_result_code>(bs.get_i32());
rsp->set_result_code(res);
}

operation_timer_.cancel();
Expand Down

0 comments on commit 480b1ad

Please sign in to comment.