Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional CRC per log entry. #454

Merged
merged 4 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ set(RAFT_CORE
${ROOT_SRC}/handle_user_cmd.cxx
${ROOT_SRC}/handle_vote.cxx
${ROOT_SRC}/launcher.cxx
${ROOT_SRC}/log_entry.cxx
${ROOT_SRC}/peer.cxx
${ROOT_SRC}/raft_server.cxx
${ROOT_SRC}/snapshot.cxx
Expand Down
5 changes: 4 additions & 1 deletion examples/in_memory_log_store.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ ptr<log_entry> inmem_log_store::make_clone(const ptr<log_entry>& entry) {
( entry->get_term(),
buffer::clone( entry->get_buf() ),
entry->get_val_type(),
entry->get_timestamp() );
entry->get_timestamp(),
entry->has_crc32(),
entry->get_crc32(),
false );
return clone;
}

Expand Down
13 changes: 13 additions & 0 deletions include/libnuraft/asio_service_options.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,19 @@ struct asio_service_options {
*/
bool crc_on_entire_message_;

/**
* If `true`, each log entry will contain a CRC checksum of the entry's
* payload.
*
* To support this feature, the log store implementation should be able to
* store and retrieve the CRC checksum when it reads log entries.
*
* This feature is not backward compatible. To enable this feature, there
* should not be any member running with the old version before supporting
* this flag.
*/
bool crc_on_payload_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialization in the constructor is missing.


/**
* Callback function that will be invoked when the received message is corrupted.
* The first `buffer` contains the raw binary of message header,
Expand Down
32 changes: 25 additions & 7 deletions include/libnuraft/log_entry.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ limitations under the License.
#include "buffer.hxx"
#include "log_val_type.hxx"
#include "ptr.hxx"
#include <cstdint>

#ifdef _NO_EXCEPTION
#include <cassert>
Expand All @@ -38,12 +39,10 @@ public:
log_entry(ulong term,
const ptr<buffer>& buff,
log_val_type value_type = log_val_type::app_log,
uint64_t log_timestamp = 0)
: term_(term)
, value_type_(value_type)
, buff_(buff)
, timestamp_us_(log_timestamp)
{}
uint64_t log_timestamp = 0,
bool has_crc32 = false,
uint32_t crc32 = 0,
bool compute_crc = true);

__nocopy__(log_entry);

Expand Down Expand Up @@ -92,6 +91,18 @@ public:
timestamp_us_ = t;
}

bool has_crc32() const {
return has_crc32_;
}

uint32_t get_crc32() const {
return crc32_;
}

void set_crc32(uint32_t crc) {
crc32_ = crc;
}
Comment on lines +98 to +104
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backward compatibility (for the previous logs that do not have CRC, and they can be mixed with new logs with CRC), we should have one more field indicating if CRC exists or not, let's say has_crc32_.

Otherwise, we cannot distinguish non-existing CRC and CRC=0. Also we need an API returning that flag:

bool has_crc32() const () { ... }

that will be used in asio layer (replacing crc32 != 0).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.


ptr<buffer> serialize() {
buff_->pos(0);
ptr<buffer> buf = buffer::alloc( sizeof(ulong) +
Expand Down Expand Up @@ -135,10 +146,17 @@ private:

/**
* The timestamp (since epoch) when this log entry was generated
* in microseconds. Used only when `log_entry_timestamp_` in
* in microseconds. Used only when `replicate_log_timestamp_` in
* `asio_service_options` is set.
*/
uint64_t timestamp_us_;

/**
* CRC32 checksum of this log entry.
* Used only when `crc_on_payload` in `asio_service_options` is set.
*/
bool has_crc32_;
uint32_t crc32_;
};

}
Expand Down
37 changes: 36 additions & 1 deletion src/asio_service.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ limitations under the License.
// If set, CRC number represents the entire message.
#define CRC_ON_ENTIRE_MESSAGE (0x8)

// If set, each log entry will contain a CRC on the payload.
#define CRC_ON_PAYLOAD (0x10)

// =======================

namespace nuraft {
Expand Down Expand Up @@ -596,6 +599,9 @@ class rpc_session
if (flags_ & INCLUDE_LOG_TIMESTAMP) {
LOG_ENTRY_SIZE += 8;
}
if (flags_ & CRC_ON_PAYLOAD) {
LOG_ENTRY_SIZE += 5;
}

while (log_ctx_size > ss.pos()) {
if (log_ctx_size - ss.pos() < LOG_ENTRY_SIZE) {
Expand All @@ -613,6 +619,8 @@ class rpc_session
ulong term = ss.get_u64();
log_val_type val_type = (log_val_type)ss.get_u8();
uint64_t timestamp = (flags_ & INCLUDE_LOG_TIMESTAMP) ? ss.get_u64() : 0;
bool has_crc32 = (flags_ & CRC_ON_PAYLOAD) ? (ss.get_u8() != 0) : false;
uint32_t crc32 = (flags_ & CRC_ON_PAYLOAD) ? ss.get_u32() : 0;

size_t val_size = ss.get_i32();
if (log_ctx_size - ss.pos() < val_size) {
Expand All @@ -632,7 +640,26 @@ class rpc_session
ptr<buffer> buf( buffer::alloc(val_size) );
ss.get_buffer(buf);
ptr<log_entry> entry(
cs_new<log_entry>(term, buf, val_type, timestamp) );
cs_new<log_entry>(term, buf, val_type, timestamp, has_crc32, crc32, false) );

if ((flags_ & CRC_ON_PAYLOAD) && has_crc32) {
// Verify CRC.
uint32_t crc_payload = crc32_8( buf->data_begin(),
buf->size(),
0 );
if (crc_payload != crc32) {
p_er("log entry CRC mismatch: local calculation %x, "
"from message %x", crc_payload, crc32);

if (impl_->get_options().corrupted_msg_handler_) {
impl_->get_options().corrupted_msg_handler_(header_, log_ctx);
}

this->stop();
return;
}
}

req->log_entries().push_back(entry);
}
}
Expand Down Expand Up @@ -1207,6 +1234,10 @@ class asio_rpc_client
LOG_ENTRY_SIZE += 8;
flags |= INCLUDE_LOG_TIMESTAMP;
}
if (impl_->get_options().crc_on_payload_) {
LOG_ENTRY_SIZE += 5;
flags |= CRC_ON_PAYLOAD;
}

for (auto& entry: req->log_entries()) {
ptr<log_entry>& le = entry;
Expand All @@ -1226,6 +1257,10 @@ class asio_rpc_client
if (impl_->get_options().replicate_log_timestamp_) {
ss.put_u64( le->get_timestamp() );
}
if (impl_->get_options().crc_on_payload_) {
ss.put_u8(le->has_crc32() ? 1 : 0);
ss.put_u32(le->get_crc32());
}
ss.put_i32( le->get_buf().size() );
ss.put_raw( le->get_buf().data_begin(), le->get_buf().size() );
#endif
Expand Down
26 changes: 26 additions & 0 deletions src/log_entry.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#include "crc32.hxx"
#include "log_entry.hxx"

namespace nuraft {
log_entry::log_entry(ulong term,
const ptr<buffer>& buff,
log_val_type value_type,
uint64_t log_timestamp,
bool has_crc32,
uint32_t crc32,
bool compute_crc)
: term_(term)
, value_type_(value_type)
, buff_(buff)
, timestamp_us_(log_timestamp)
, has_crc32_(has_crc32)
, crc32_(crc32)
{
if (!buff_ && !has_crc32 && compute_crc) {
has_crc32_ = true;
crc32_ = crc32_8( buff->data_begin(),
buff->size(),
0 );
}
}
}
1 change: 1 addition & 0 deletions tests/unit/raft_package_asio.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public:

if (useCrcOnEntireMessage) {
asio_opt.crc_on_entire_message_ = true;
asio_opt.crc_on_payload_ = true;
asio_opt.corrupted_msg_handler_ =
[&](ptr<buffer> header, ptr<buffer> ctx){
abort();
Expand Down