Skip to content

Commit

Permalink
Refactor: Improve log entry processing on startup
Browse files Browse the repository at this point in the history
- Implement chunk-based reading of committed log entries when
  re-applying to state machine upon startup.

- Add validation for log entry indexes, to avoid applying wrong entries
  to state machine.
  • Loading branch information
drmingdrmer committed Sep 14, 2024
1 parent 681d04d commit cd91cee
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 4 deletions.
57 changes: 53 additions & 4 deletions openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;

use anyerror::AnyError;
use validit::Valid;

use crate::display_ext::DisplayOptionExt;
Expand Down Expand Up @@ -93,10 +94,7 @@ where
let start = last_applied.next_index();
let end = committed.next_index();

tracing::info!("re-apply log {}..{} to state machine", start, end);

let entries = log_reader.try_get_log_entries(start..end).await?;
self.state_machine.apply(entries).await?;
self.reapply_committed(start, end).await?;

last_applied = committed;
}
Expand Down Expand Up @@ -170,6 +168,57 @@ where
})
}

/// Read log entries from [`RaftLogReader`] in chunks, and apply them to the state machine.
pub(crate) async fn reapply_committed(&mut self, mut start: u64, end: u64) -> Result<(), StorageError<C>> {
let chunk_size = 64;

tracing::info!(
"re-apply log [{}..{}) in {} item chunks to state machine",
chunk_size,
start,
end
);

let mut log_reader = self.log_store.get_log_reader().await;

while start < end {
let chunk_end = std::cmp::min(end, start + chunk_size);
let entries = log_reader.try_get_log_entries(start..chunk_end).await?;

let first = entries.first().map(|x| x.get_log_id().index);
let last = entries.last().map(|x| x.get_log_id().index);

let make_err = || {
let err = AnyError::error(format!(
"Failed to get log entries, expected index: [{}, {}), got [{:?}, {:?})",
start, chunk_end, first, last
));

tracing::error!("{}", err);
err
};

if first != Some(start) {
return Err(StorageError::read_log_at_index(start, make_err()));
}
if last != Some(chunk_end - 1) {
return Err(StorageError::read_log_at_index(chunk_end - 1, make_err()));
}

tracing::info!(
"re-apply {} log entries: [{}, {}),",
chunk_end - start,
start,
chunk_end
);
self.state_machine.apply(entries).await?;

start = chunk_end;
}

Ok(())
}

/// Returns the last 2 membership config found in log or state machine.
///
/// A raft node needs to store at most 2 membership config log:
Expand Down
1 change: 1 addition & 0 deletions stores/memstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tracing = { workspace = true }
[dev-dependencies]

[features]
bt = ["openraft/bt"]

[package.metadata.docs.rs]
all-features = true
3 changes: 3 additions & 0 deletions stores/rocksstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,8 @@ tracing = { workspace = true }
[dev-dependencies]
tempfile = { version = "3.4.0" }

[features]
bt = ["openraft/bt"]

[package.metadata.docs.rs]
all-features = true
3 changes: 3 additions & 0 deletions stores/sledstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,8 @@ tracing = { workspace = true }
[dev-dependencies]
tempfile = { version = "3.4.0" }

[features]
bt = ["openraft/bt"]

[package.metadata.docs.rs]
all-features = true

0 comments on commit cd91cee

Please sign in to comment.