diff --git a/memstore/Cargo.toml b/memstore/Cargo.toml index 45fa8df4d..2cfb9428a 100644 --- a/memstore/Cargo.toml +++ b/memstore/Cargo.toml @@ -14,7 +14,7 @@ license = { workspace = true } repository = { workspace = true } [dependencies] -openraft = { path= "../openraft", version = "0.8.4", features=["serde"] } +openraft = { path= "../openraft", version = "0.8.4", features=["serde", "bt"] } serde = { workspace = true } serde_json = { workspace = true } diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 9d7a079d2..cc6f10740 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -115,6 +115,8 @@ pub enum BlockOperation { pub struct MemStore { last_purged_log_id: RwLock>>, + committed: RwLock>>, + /// The Raft log. Logs are stored in serialized json. log: RwLock>, @@ -142,6 +144,7 @@ impl MemStore { Self { last_purged_log_id: RwLock::new(None), + committed: RwLock::new(None), log, sm, block: Mutex::new(BTreeMap::new()), @@ -322,6 +325,17 @@ impl RaftStorage for Arc { Ok(*self.vote.read().await) } + async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { + tracing::debug!(?committed, "save_committed"); + let mut c = self.committed.write().await; + *c = committed; + Ok(()) + } + + async fn read_committed(&mut self) -> Result>, StorageError> { + Ok(*self.committed.read().await) + } + async fn last_applied_state( &mut self, ) -> Result<(Option>, StoredMembership), StorageError> { diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 6f519a0ff..3489d7f31 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1583,6 +1583,7 @@ where ref already_committed, ref upto, } => { + self.log_store.save_committed(Some(*upto)).await?; self.apply_to_state_machine(seq, already_committed.next_index(), upto.index).await?; } Command::Replicate { req, target } => { diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index dd6352e22..8717c8ea1 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -154,6 +154,7 @@ where C: RaftTypeConfig Command::StateMachine { .. } => CommandKind::StateMachine, // Apply is firstly handled by RaftCore, then forwarded to state machine worker. + // TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log Command::Apply { .. } => CommandKind::Main, } } @@ -168,6 +169,7 @@ where C: RaftTypeConfig Command::AppendEntry { .. } => None, Command::AppendInputEntries { .. } => None, Command::ReplicateCommitted { .. } => None, + // TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log Command::Apply { .. } => None, Command::Replicate { .. } => None, Command::RebuildReplicationStreams { .. } => None, diff --git a/openraft/src/storage/adapter.rs b/openraft/src/storage/adapter.rs index 9a72e0426..525fedebb 100644 --- a/openraft/src/storage/adapter.rs +++ b/openraft/src/storage/adapter.rs @@ -138,6 +138,14 @@ where S::read_vote(self.storage_mut().await.deref_mut()).await } + async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { + S::save_committed(self.storage_mut().await.deref_mut(), committed).await + } + + async fn read_committed(&mut self) -> Result>, StorageError> { + S::read_committed(self.storage_mut().await.deref_mut()).await + } + async fn get_log_reader(&mut self) -> Self::LogReader { S::get_log_reader(self.storage_mut().await.deref_mut()).await } diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index 66be9b78e..a3863f10d 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -1,11 +1,13 @@ use std::marker::PhantomData; use std::sync::Arc; +use crate::display_ext::DisplayOptionExt; use crate::engine::LogIdList; use crate::entry::RaftPayload; use crate::log_id::RaftLogId; use crate::raft_state::IOState; use crate::raft_state::LogIOId; +use crate::storage::RaftLogReaderExt; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; use crate::utime::UTime; @@ -64,10 +66,42 @@ where let vote = self.log_store.read_vote().await?; let vote = vote.unwrap_or_default(); + let mut committed = self.log_store.read_committed().await?; + let st = self.log_store.get_log_state().await?; let mut last_purged_log_id = st.last_purged_log_id; let mut last_log_id = st.last_log_id; - let (last_applied, _) = self.state_machine.applied_state().await?; + + let (mut last_applied, _) = self.state_machine.applied_state().await?; + + tracing::info!( + vote = display(&vote), + last_purged_log_id = display(last_purged_log_id.display()), + last_applied = display(last_applied.display()), + committed = display(committed.display()), + last_log_id = display(last_log_id.display()), + "get_initial_state" + ); + + // TODO: It is possible `committed < last_applied` because when installing snapshot, + // new committed should be saved, but not yet. + if committed < last_applied { + committed = last_applied; + } + + // Re-apply log entries to recover SM to latest state. + if last_applied < committed { + let start = last_applied.next_index(); + let end = committed.next_index(); + + tracing::info!("re-apply log {}..{} to state machine", start, end); + + let entries = self.log_store.get_log_entries(start..end).await?; + self.state_machine.apply(entries).await?; + + last_applied = committed; + } + let mem_state = self.get_membership().await?; // Clean up dirty state: snapshot is installed but logs are not cleaned. @@ -75,8 +109,20 @@ where self.log_store.purge(last_applied.unwrap()).await?; last_log_id = last_applied; last_purged_log_id = last_applied; + + tracing::info!( + "Clean the hole between last_log_id({}) and last_applied({}) by purging logs to {}", + last_log_id.display(), + last_applied.display(), + last_applied.display(), + ); } + tracing::info!( + "load key log ids from ({},{}]", + last_purged_log_id.display(), + last_log_id.display() + ); let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self.log_store).await?; // TODO: `flushed` is not set. diff --git a/openraft/src/storage/mod.rs b/openraft/src/storage/mod.rs index a138b6eb6..36156af04 100644 --- a/openraft/src/storage/mod.rs +++ b/openraft/src/storage/mod.rs @@ -208,6 +208,33 @@ where C: RaftTypeConfig async fn read_vote(&mut self) -> Result>, StorageError>; + /// Saves the last committed log id to storage. + /// + /// # Optional feature + /// + /// If the state machine flushes state to disk before returning from `apply()`, + /// then the application does not need to implement this method. + /// + /// Otherwise, i.e., the state machine just relies on periodical snapshot to persist state to + /// disk: + /// + /// - If the `committed` log id is saved, the state machine will be recovered to the state + /// corresponding to this `committed` log id upon system startup, i.e., the state at the point + /// when the committed log id was applied. + /// + /// - If the `committed` log id is not saved, Openraft will just recover the state machine to + /// the state of the last snapshot taken. + async fn save_committed(&mut self, _committed: Option>) -> Result<(), StorageError> { + // By default `committed` log id is not saved + Ok(()) + } + + /// Return the last saved committed log id by [`Self::save_committed`]. + async fn read_committed(&mut self) -> Result>, StorageError> { + // By default `committed` log id is not saved and this method just return None. + Ok(None) + } + // --- Log /// Returns the last deleted log id and the last log id. diff --git a/openraft/src/storage/v2.rs b/openraft/src/storage/v2.rs index 4087ec80c..96bd8ceae 100644 --- a/openraft/src/storage/v2.rs +++ b/openraft/src/storage/v2.rs @@ -77,6 +77,33 @@ where C: RaftTypeConfig /// Return the last saved vote by [`Self::save_vote`]. async fn read_vote(&mut self) -> Result>, StorageError>; + /// Saves the last committed log id to storage. + /// + /// # Optional feature + /// + /// If the state machine flushes state to disk before returning from `apply()`, + /// then the application does not need to implement this method. + /// + /// Otherwise, i.e., the state machine just relies on periodical snapshot to persist state to + /// disk: + /// + /// - If the `committed` log id is saved, the state machine will be recovered to the state + /// corresponding to this `committed` log id upon system startup, i.e., the state at the point + /// when the committed log id was applied. + /// + /// - If the `committed` log id is not saved, Openraft will just recover the state machine to + /// the state of the last snapshot taken. + async fn save_committed(&mut self, _committed: Option>) -> Result<(), StorageError> { + // By default `committed` log id is not saved + Ok(()) + } + + /// Return the last saved committed log id by [`Self::save_committed`]. + async fn read_committed(&mut self) -> Result>, StorageError> { + // By default `committed` log id is not saved and this method just return None. + Ok(None) + } + /// Append log entries and call the `callback` once logs are persisted on disk. /// /// It should returns immediately after saving the input log entries in memory, and calls the diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index 797b61822..50af89c94 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -91,6 +91,7 @@ where run_fut(run_test(builder, Self::get_initial_state_last_log_gt_sm))?; run_fut(run_test(builder, Self::get_initial_state_last_log_lt_sm))?; run_fut(run_test(builder, Self::get_initial_state_log_ids))?; + run_fut(run_test(builder, Self::get_initial_state_re_apply_committed))?; run_fut(run_test(builder, Self::save_vote))?; run_fut(run_test(builder, Self::get_log_entries))?; run_fut(run_test(builder, Self::try_get_log_entry))?; @@ -593,6 +594,43 @@ where Ok(()) } + /// Test if committed logs are re-applied. + pub async fn get_initial_state_re_apply_committed( + mut store: LS, + mut sm: SM, + ) -> Result<(), StorageError> { + Self::default_vote(&mut store).await?; + + append(&mut store, [ + blank_ent_0::(1, 2), + blank_ent_0::(1, 3), + blank_ent_0::(1, 4), + blank_ent_0::(1, 5), + ]) + .await?; + store.purge(log_id_0(1, 1)).await?; + + apply(&mut sm, [blank_ent_0::(1, 2)]).await?; + + store.save_committed(Some(log_id_0(1, 4))).await?; + let got = store.read_committed().await?; + if got.is_none() { + tracing::info!("This implementation does not store committed log id, skip test re-applying committed logs"); + return Ok(()); + } + + let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; + + assert_eq!(Some(&log_id_0(1, 4)), initial.io_applied(), "last_applied is updated"); + assert_eq!( + Some(log_id_0(1, 4)), + sm.applied_state().await?.0, + "last_applied is updated" + ); + + Ok(()) + } + pub async fn save_vote(mut store: LS, mut sm: SM) -> Result<(), StorageError> { store.save_vote(&Vote::new(100, NODE_ID.into())).await?; diff --git a/tests/tests/log_store/main.rs b/tests/tests/log_store/main.rs new file mode 100644 index 000000000..7e6af4778 --- /dev/null +++ b/tests/tests/log_store/main.rs @@ -0,0 +1,11 @@ +#![cfg_attr(feature = "bt", feature(error_generic_member_access))] +#![cfg_attr(feature = "bt", feature(provide_any))] + +#[macro_use] +#[path = "../fixtures/mod.rs"] +mod fixtures; + +// The number indicate the preferred running order for these case. +// The later tests may depend on the earlier ones. + +mod t10_save_committed; diff --git a/tests/tests/log_store/t10_save_committed.rs b/tests/tests/log_store/t10_save_committed.rs new file mode 100644 index 000000000..415896d05 --- /dev/null +++ b/tests/tests/log_store/t10_save_committed.rs @@ -0,0 +1,46 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use maplit::btreeset; +use openraft::storage::RaftLogStorage; +use openraft::testing::log_id; +use openraft::Config; + +use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::RaftRouter; + +/// Before applying log, write `committed` log id to log store. +#[async_entry::test(worker_threads = 4, init = "init_default_ut_tracing()", tracing_span = "debug")] +async fn write_committed_log_id_to_log_store() -> Result<()> { + let config = Arc::new( + Config { + enable_tick: false, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + + tracing::info!("--- initializing cluster"); + let mut log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?; + + log_index += router.client_request_many(0, "0", 10).await?; + + for i in [0, 1, 2] { + router.wait(&i, timeout()).log(Some(log_index), "write logs").await?; + } + + for id in [0, 1, 2] { + let (_, mut ls, _) = router.remove_node(id).unwrap(); + let committed = ls.read_committed().await?; + assert_eq!(Some(log_id(1, 0, log_index)), committed, "node-{} committed", id); + } + + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(1000)) +}