diff --git a/CMakeLists.txt b/CMakeLists.txt index c14f9d34..a01e80a4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -254,6 +254,7 @@ set(RAFT_CORE ${ROOT_SRC}/handle_user_cmd.cxx ${ROOT_SRC}/handle_vote.cxx ${ROOT_SRC}/launcher.cxx + ${ROOT_SRC}/log_entry.cxx ${ROOT_SRC}/peer.cxx ${ROOT_SRC}/raft_server.cxx ${ROOT_SRC}/snapshot.cxx diff --git a/examples/in_memory_log_store.cxx b/examples/in_memory_log_store.cxx index 78e56493..930281d5 100644 --- a/examples/in_memory_log_store.cxx +++ b/examples/in_memory_log_store.cxx @@ -54,7 +54,10 @@ ptr inmem_log_store::make_clone(const ptr& entry) { ( entry->get_term(), buffer::clone( entry->get_buf() ), entry->get_val_type(), - entry->get_timestamp() ); + entry->get_timestamp(), + entry->has_crc32(), + entry->get_crc32(), + false ); return clone; } diff --git a/include/libnuraft/asio_service_options.hxx b/include/libnuraft/asio_service_options.hxx index 16e4dce7..b1ef5533 100644 --- a/include/libnuraft/asio_service_options.hxx +++ b/include/libnuraft/asio_service_options.hxx @@ -123,6 +123,7 @@ struct asio_service_options { , custom_resolver_(nullptr) , replicate_log_timestamp_(false) , crc_on_entire_message_(false) + , crc_on_payload_(false) , corrupted_msg_handler_(nullptr) {} @@ -243,7 +244,7 @@ struct asio_service_options { * restore the timestamp when it reads log entries. * * This feature is not backward compatible. To enable this feature, there - * should not be any member running with old version before supprting + * should not be any member running with old version before supporting * this flag. */ bool replicate_log_timestamp_; @@ -254,6 +255,19 @@ struct asio_service_options { */ bool crc_on_entire_message_; + /** + * If `true`, each log entry will contain a CRC checksum of the entry's + * payload. + * + * To support this feature, the log store implementation should be able to + * store and retrieve the CRC checksum when it reads log entries. + * + * This feature is not backward compatible. To enable this feature, there + * should not be any member running with the old version before supporting + * this flag. + */ + bool crc_on_payload_; + /** * Callback function that will be invoked when the received message is corrupted. * The first `buffer` contains the raw binary of message header, diff --git a/include/libnuraft/log_entry.hxx b/include/libnuraft/log_entry.hxx index c371b060..f620fa44 100644 --- a/include/libnuraft/log_entry.hxx +++ b/include/libnuraft/log_entry.hxx @@ -25,6 +25,7 @@ limitations under the License. #include "buffer.hxx" #include "log_val_type.hxx" #include "ptr.hxx" +#include #ifdef _NO_EXCEPTION #include @@ -38,12 +39,10 @@ public: log_entry(ulong term, const ptr& buff, log_val_type value_type = log_val_type::app_log, - uint64_t log_timestamp = 0) - : term_(term) - , value_type_(value_type) - , buff_(buff) - , timestamp_us_(log_timestamp) - {} + uint64_t log_timestamp = 0, + bool has_crc32 = false, + uint32_t crc32 = 0, + bool compute_crc = true); __nocopy__(log_entry); @@ -92,6 +91,18 @@ public: timestamp_us_ = t; } + bool has_crc32() const { + return has_crc32_; + } + + uint32_t get_crc32() const { + return crc32_; + } + + void set_crc32(uint32_t crc) { + crc32_ = crc; + } + ptr serialize() { buff_->pos(0); ptr buf = buffer::alloc( sizeof(ulong) + @@ -135,10 +146,17 @@ private: /** * The timestamp (since epoch) when this log entry was generated - * in microseconds. Used only when `log_entry_timestamp_` in + * in microseconds. Used only when `replicate_log_timestamp_` in * `asio_service_options` is set. */ uint64_t timestamp_us_; + + /** + * CRC32 checksum of this log entry. + * Used only when `crc_on_payload` in `asio_service_options` is set. + */ + bool has_crc32_; + uint32_t crc32_; }; } diff --git a/src/asio_service.cxx b/src/asio_service.cxx index e3e0e0d8..dd99f645 100644 --- a/src/asio_service.cxx +++ b/src/asio_service.cxx @@ -124,6 +124,9 @@ limitations under the License. // If set, CRC number represents the entire message. #define CRC_ON_ENTIRE_MESSAGE (0x8) +// If set, each log entry will contain a CRC on the payload. +#define CRC_ON_PAYLOAD (0x10) + // ======================= namespace nuraft { @@ -596,6 +599,9 @@ class rpc_session if (flags_ & INCLUDE_LOG_TIMESTAMP) { LOG_ENTRY_SIZE += 8; } + if (flags_ & CRC_ON_PAYLOAD) { + LOG_ENTRY_SIZE += 5; + } while (log_ctx_size > ss.pos()) { if (log_ctx_size - ss.pos() < LOG_ENTRY_SIZE) { @@ -613,6 +619,8 @@ class rpc_session ulong term = ss.get_u64(); log_val_type val_type = (log_val_type)ss.get_u8(); uint64_t timestamp = (flags_ & INCLUDE_LOG_TIMESTAMP) ? ss.get_u64() : 0; + bool has_crc32 = (flags_ & CRC_ON_PAYLOAD) ? (ss.get_u8() != 0) : false; + uint32_t crc32 = (flags_ & CRC_ON_PAYLOAD) ? ss.get_u32() : 0; size_t val_size = ss.get_i32(); if (log_ctx_size - ss.pos() < val_size) { @@ -632,7 +640,26 @@ class rpc_session ptr buf( buffer::alloc(val_size) ); ss.get_buffer(buf); ptr entry( - cs_new(term, buf, val_type, timestamp) ); + cs_new(term, buf, val_type, timestamp, has_crc32, crc32, false) ); + + if ((flags_ & CRC_ON_PAYLOAD) && has_crc32) { + // Verify CRC. + uint32_t crc_payload = crc32_8( buf->data_begin(), + buf->size(), + 0 ); + if (crc_payload != crc32) { + p_er("log entry CRC mismatch: local calculation %x, " + "from message %x", crc_payload, crc32); + + if (impl_->get_options().corrupted_msg_handler_) { + impl_->get_options().corrupted_msg_handler_(header_, log_ctx); + } + + this->stop(); + return; + } + } + req->log_entries().push_back(entry); } } @@ -1207,6 +1234,10 @@ class asio_rpc_client LOG_ENTRY_SIZE += 8; flags |= INCLUDE_LOG_TIMESTAMP; } + if (impl_->get_options().crc_on_payload_) { + LOG_ENTRY_SIZE += 5; + flags |= CRC_ON_PAYLOAD; + } for (auto& entry: req->log_entries()) { ptr& le = entry; @@ -1226,6 +1257,10 @@ class asio_rpc_client if (impl_->get_options().replicate_log_timestamp_) { ss.put_u64( le->get_timestamp() ); } + if (impl_->get_options().crc_on_payload_) { + ss.put_u8(le->has_crc32() ? 1 : 0); + ss.put_u32(le->get_crc32()); + } ss.put_i32( le->get_buf().size() ); ss.put_raw( le->get_buf().data_begin(), le->get_buf().size() ); #endif diff --git a/src/log_entry.cxx b/src/log_entry.cxx new file mode 100644 index 00000000..72c9b20a --- /dev/null +++ b/src/log_entry.cxx @@ -0,0 +1,26 @@ +#include "crc32.hxx" +#include "log_entry.hxx" + +namespace nuraft { +log_entry::log_entry(ulong term, + const ptr& buff, + log_val_type value_type, + uint64_t log_timestamp, + bool has_crc32, + uint32_t crc32, + bool compute_crc) + : term_(term) + , value_type_(value_type) + , buff_(buff) + , timestamp_us_(log_timestamp) + , has_crc32_(has_crc32) + , crc32_(crc32) + { + if (!buff_ && !has_crc32 && compute_crc) { + has_crc32_ = true; + crc32_ = crc32_8( buff->data_begin(), + buff->size(), + 0 ); + } + } +} \ No newline at end of file diff --git a/tests/unit/raft_package_asio.hxx b/tests/unit/raft_package_asio.hxx index 8ba313b2..a66dbb4a 100644 --- a/tests/unit/raft_package_asio.hxx +++ b/tests/unit/raft_package_asio.hxx @@ -125,6 +125,7 @@ public: if (useCrcOnEntireMessage) { asio_opt.crc_on_entire_message_ = true; + asio_opt.crc_on_payload_ = true; asio_opt.corrupted_msg_handler_ = [&](ptr header, ptr ctx){ abort();