diff --git a/include/libnuraft/async.hxx b/include/libnuraft/async.hxx index 872db482..1ead4de8 100644 --- a/include/libnuraft/async.hxx +++ b/include/libnuraft/async.hxx @@ -36,7 +36,7 @@ limitations under the License. namespace nuraft { -enum cmd_result_code { +enum cmd_result_code : int32_t { OK = 0, CANCELLED = -1, TIMEOUT = -2, diff --git a/src/asio_service.cxx b/src/asio_service.cxx index ff84ce86..f8f2d4f3 100644 --- a/src/asio_service.cxx +++ b/src/asio_service.cxx @@ -124,6 +124,9 @@ limitations under the License. // If set, each log entry will contain a CRC on the payload. #define CRC_ON_PAYLOAD (0x10) +// If set, RPC message (response) includes result code +#define INCLUDE_RESULT_CODE (0x20) + // ======================= namespace nuraft { @@ -735,6 +738,7 @@ class rpc_session try { ptr resp_ctx = resp->get_ctx(); int32 resp_ctx_size = (resp_ctx) ? resp_ctx->size() : 0; + int32 result_code_size = sizeof(int32_t); uint32_t flags = 0x0; size_t resp_meta_size = 0; @@ -759,6 +763,13 @@ class rpc_session size_t carried_data_size = resp_meta_size + resp_hint_size + resp_ctx_size; + if (req->get_type() == msg_type::client_request || + req->get_type() == msg_type::add_server_request || + req->get_type() == msg_type::remove_server_request) { + flags |= INCLUDE_RESULT_CODE; + carried_data_size += result_code_size; + } + int buf_size = RPC_RESP_HEADER_SIZE + carried_data_size; ptr resp_buf = buffer::alloc(buf_size); buffer_serializer bs(resp_buf); @@ -798,6 +809,11 @@ class rpc_session bs.put_buffer(*resp_ctx); } + /* Put result code at the end to avoid breaking backward compatibility */ + if (flags & INCLUDE_RESULT_CODE) { + bs.put_i32(resp->get_result_code()); + } + aa::write( ssl_enabled_, ssl_socket_, socket_, asio::buffer(resp_buf->data_begin(), resp_buf->size()), [this, self, resp_buf] @@ -1688,8 +1704,9 @@ class asio_rpc_client size_t bytes_transferred) { if ( !(flags & INCLUDE_META) && - !(flags & INCLUDE_HINT) ) { - // Neither meta nor hint exists, + !(flags & INCLUDE_HINT) && + !(flags & INCLUDE_RESULT_CODE)) { + // Neither meta nor hint nor result code exists, // just use the buffer as it is for ctx. ctx_buf->pos(0); rsp->set_ctx(ctx_buf); @@ -1739,9 +1756,21 @@ class asio_rpc_client assert(remaining_len >= 0); if (remaining_len) { // It has context, read it. - ptr actual_ctx = buffer::alloc(remaining_len); + size_t ctx_len = remaining_len; + if (flags & INCLUDE_RESULT_CODE) { + ctx_len -= sizeof(int32_t); + } + ptr actual_ctx = buffer::alloc(ctx_len); bs.get_buffer(actual_ctx); rsp->set_ctx(actual_ctx); + remaining_len -= ctx_len; + } + + // 4) Result code + if (flags & INCLUDE_RESULT_CODE) { + assert((size_t)remaining_len >= sizeof(int32_t)); + cmd_result_code res = static_cast(bs.get_i32()); + rsp->set_result_code(res); } operation_timer_.cancel();