diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index dafb4878b..7bfab27c0 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -650,6 +650,9 @@ where C: RaftTypeConfig self.state.accept_io(IOId::new_log_io(vote.into_committed(), last_log_id)); + // No need to submit UpdateIOProgress command, + // IO progress is updated by the new blank log + self.leader_handler() .unwrap() .leader_append_entries(vec![C::Entry::new_blank(LogId::::default())]); diff --git a/openraft/src/engine/handler/vote_handler/become_leader_test.rs b/openraft/src/engine/handler/vote_handler/become_leader_test.rs index 684a266a2..b7d5a7fb5 100644 --- a/openraft/src/engine/handler/vote_handler/become_leader_test.rs +++ b/openraft/src/engine/handler/vote_handler/become_leader_test.rs @@ -12,6 +12,7 @@ use crate::engine::ReplicationProgress; use crate::entry::RaftEntry; use crate::log_id_range::LogIdRange; use crate::progress::entry::ProgressEntry; +use crate::raft_state::IOId; use crate::replication::request::Replicate; use crate::testing::log_id; use crate::type_config::alias::EntryOf; @@ -57,6 +58,10 @@ fn test_become_leader() -> anyhow::Result<()> { assert_eq!(ServerState::Leader, eng.state.server_state); assert_eq!(eng.output.take_commands(), vec![ + Command::UpdateIOProgress { + when: None, + io_id: IOId::new_log_io(Vote::new(2, 1).into_committed(), None) + }, Command::RebuildReplicationStreams { targets: vec![ReplicationProgress(0, ProgressEntry::empty(0))] }, diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index a64d225a1..abdd1f52e 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -196,6 +196,11 @@ where C: RaftTypeConfig self.state.accept_io(IOId::new_log_io(leader_vote, last_log_id)); + self.output.push_command(Command::UpdateIOProgress { + when: None, + io_id: IOId::new_log_io(leader_vote, last_log_id), + }); + self.server_state_handler().update_server_state_if_changed(); let mut rh = self.replication_handler(); diff --git a/openraft/src/engine/tests/startup_test.rs b/openraft/src/engine/tests/startup_test.rs index b73402036..cb4d66812 100644 --- a/openraft/src/engine/tests/startup_test.rs +++ b/openraft/src/engine/tests/startup_test.rs @@ -13,6 +13,7 @@ use crate::entry::RaftEntry; use crate::log_id_range::LogIdRange; use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; +use crate::raft_state::IOId; use crate::replication::request::Replicate; use crate::testing::log_id; use crate::type_config::TypeConfigExt; @@ -44,6 +45,7 @@ fn eng() -> Engine { eng } +/// It is a Leader but not yet append any logs. #[test] fn test_startup_as_leader_without_logs() -> anyhow::Result<()> { let mut eng = eng(); @@ -67,7 +69,10 @@ fn test_startup_as_leader_without_logs() -> anyhow::Result<()> { assert_eq!(leader.last_log_id(), Some(&log_id(2, 2, 4))); assert_eq!( vec![ - // + Command::UpdateIOProgress { + when: None, + io_id: IOId::new_log_io(Vote::new(2, 2).into_committed(), Some(log_id(1, 1, 3))) + }, Command::RebuildReplicationStreams { targets: vec![ReplicationProgress(3, ProgressEntry { matching: None, @@ -115,7 +120,10 @@ fn test_startup_as_leader_with_proposed_logs() -> anyhow::Result<()> { assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 6))); assert_eq!( vec![ - // + Command::UpdateIOProgress { + when: None, + io_id: IOId::new_log_io(Vote::new(1, 2).into_committed(), Some(log_id(1, 2, 6))) + }, Command::RebuildReplicationStreams { targets: vec![ReplicationProgress(3, ProgressEntry { matching: None, diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index 36595fc51..4a81d0508 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -118,7 +118,7 @@ where let last_log_id = last_leader_log_id.last().copied(); - let mut leader = Self { + let leader = Self { transfer_to: None, committed_vote: vote, next_heartbeat: C::now(), @@ -130,11 +130,6 @@ where clock_progress: VecProgress::new(quorum_set, learner_ids, || None), }; - // Update progress for this Leader. - // Note that Leader not being a voter is allowed. - let leader_node_id = vote.leader_id().voted_for().unwrap(); - let _ = leader.progress.update(&leader_node_id, ProgressEntry::new(last_log_id)); - leader } diff --git a/stores/memstore/src/lib.rs b/stores/memstore/src/lib.rs index 566d715c2..9de0315cd 100644 --- a/stores/memstore/src/lib.rs +++ b/stores/memstore/src/lib.rs @@ -9,6 +9,8 @@ use std::collections::HashMap; use std::fmt::Debug; use std::io::Cursor; use std::ops::RangeBounds; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; @@ -139,6 +141,11 @@ impl BlockConfig { pub struct MemLogStore { last_purged_log_id: RwLock>>, + /// Saving committed log id is optional in Openraft. + /// + /// This flag switches on the saving for testing purposes. + pub enable_saving_committed: AtomicBool, + committed: RwLock>>, /// The Raft log. Logs are stored in serialized json. @@ -157,6 +164,7 @@ impl MemLogStore { Self { last_purged_log_id: RwLock::new(None), + enable_saving_committed: AtomicBool::new(true), committed: RwLock::new(None), log, block, @@ -350,13 +358,23 @@ impl RaftLogStorage for Arc { } async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { - tracing::debug!(?committed, "save_committed"); + let enabled = self.enable_saving_committed.load(Ordering::Relaxed); + tracing::debug!(?committed, "save_committed, enabled: {}", enabled); + if !enabled { + return Ok(()); + } let mut c = self.committed.write().await; *c = committed; Ok(()) } async fn read_committed(&mut self) -> Result>, StorageError> { + let enabled = self.enable_saving_committed.load(Ordering::Relaxed); + tracing::debug!("read_committed, enabled: {}", enabled); + if !enabled { + return Ok(None); + } + Ok(*self.committed.read().await) } diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 47f5cd5a3..e7b0b4aed 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -266,6 +266,9 @@ pub struct TypedRaftRouter { #[allow(clippy::type_complexity)] nodes: Arc>>, + /// Whether to save the committed entries to the RaftLogStorage. + pub enable_saving_committed: bool, + /// Whether to fail a network RPC that is sent from/to a node. /// And it defines what kind of error to return. fail_rpc: Arc>>, @@ -315,6 +318,7 @@ impl Builder { TypedRaftRouter { config: self.config, nodes: Default::default(), + enable_saving_committed: true, fail_rpc: Default::default(), send_delay: Arc::new(AtomicU64::new(send_delay)), append_entries_quota: Arc::new(Mutex::new(None)), @@ -474,6 +478,7 @@ impl TypedRaftRouter { pub fn new_store(&mut self) -> (MemLogStore, MemStateMachine) { let (log, sm) = openraft_memstore::new_mem_store(); + log.enable_saving_committed.store(self.enable_saving_committed, Ordering::Relaxed); (log, sm) } diff --git a/tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs b/tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs index c5e588a8d..c69310436 100644 --- a/tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs +++ b/tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs @@ -13,6 +13,9 @@ use crate::fixtures::RaftRouter; /// A single leader should re-apply all logs upon startup, /// because itself is a quorum. +/// +/// This test disables save_committed() to ensure that logs are still re-applied because the leader +/// itself forms a quorum. #[tracing::instrument] #[test_harness::test(harness = ut_harness)] async fn single_leader_restart_re_apply_logs() -> anyhow::Result<()> { @@ -25,6 +28,7 @@ async fn single_leader_restart_re_apply_logs() -> anyhow::Result<()> { ); let mut router = RaftRouter::new(config.clone()); + router.enable_saving_committed = false; tracing::info!("--- bring up cluster of 1 node"); let mut log_index = router.new_cluster(btreeset! {0}, btreeset! {}).await?;