diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index a1defb3a2..d188654a2 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -2,6 +2,7 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; +use anyerror::AnyError; use validit::Valid; use crate::display_ext::DisplayOptionExt; @@ -93,10 +94,7 @@ where let start = last_applied.next_index(); let end = committed.next_index(); - tracing::info!("re-apply log {}..{} to state machine", start, end); - - let entries = log_reader.try_get_log_entries(start..end).await?; - self.state_machine.apply(entries).await?; + self.reapply_committed(start, end).await?; last_applied = committed; } @@ -170,6 +168,57 @@ where }) } + /// Read log entries from [`RaftLogReader`] in chunks, and apply them to the state machine. + pub(crate) async fn reapply_committed(&mut self, mut start: u64, end: u64) -> Result<(), StorageError> { + let chunk_size = 64; + + tracing::info!( + "re-apply log [{}..{}) in {} item chunks to state machine", + chunk_size, + start, + end + ); + + let mut log_reader = self.log_store.get_log_reader().await; + + while start < end { + let chunk_end = std::cmp::min(end, start + chunk_size); + let entries = log_reader.try_get_log_entries(start..chunk_end).await?; + + let first = entries.first().map(|x| x.get_log_id().index); + let last = entries.last().map(|x| x.get_log_id().index); + + let make_err = || { + let err = AnyError::error(format!( + "Failed to get log entries, expected index: [{}, {}), got [{:?}, {:?})", + start, chunk_end, first, last + )); + + tracing::error!("{}", err); + err + }; + + if first != Some(start) { + return Err(StorageError::read_log_at_index(start, make_err())); + } + if last != Some(chunk_end - 1) { + return Err(StorageError::read_log_at_index(chunk_end - 1, make_err())); + } + + tracing::info!( + "re-apply {} log entries: [{}, {}),", + chunk_end - start, + start, + chunk_end + ); + self.state_machine.apply(entries).await?; + + start = chunk_end; + } + + Ok(()) + } + /// Returns the last 2 membership config found in log or state machine. /// /// A raft node needs to store at most 2 membership config log: diff --git a/stores/memstore/Cargo.toml b/stores/memstore/Cargo.toml index 37e4f072f..c3041f5ac 100644 --- a/stores/memstore/Cargo.toml +++ b/stores/memstore/Cargo.toml @@ -24,6 +24,7 @@ tracing = { workspace = true } [dev-dependencies] [features] +bt = ["openraft/bt"] [package.metadata.docs.rs] all-features = true diff --git a/stores/rocksstore/Cargo.toml b/stores/rocksstore/Cargo.toml index 00d06be00..ba96dff24 100644 --- a/stores/rocksstore/Cargo.toml +++ b/stores/rocksstore/Cargo.toml @@ -32,5 +32,8 @@ tracing = { workspace = true } [dev-dependencies] tempfile = { version = "3.4.0" } +[features] +bt = ["openraft/bt"] + [package.metadata.docs.rs] all-features = true diff --git a/stores/sledstore/Cargo.toml b/stores/sledstore/Cargo.toml index 75a9fef77..108207d86 100644 --- a/stores/sledstore/Cargo.toml +++ b/stores/sledstore/Cargo.toml @@ -26,5 +26,8 @@ tracing = { workspace = true } [dev-dependencies] tempfile = { version = "3.4.0" } +[features] +bt = ["openraft/bt"] + [package.metadata.docs.rs] all-features = true