diff --git a/src/ledger_log.rs b/src/ledger_log.rs index f729995ea..9dd0ce77e 100644 --- a/src/ledger_log.rs +++ b/src/ledger_log.rs @@ -27,6 +27,11 @@ pub(crate) struct LedgerLog { cache_size: usize, cache: VecDeque>, store: AppendLog>>, + // Keep track of the number of appended objects which have not yet been committed. We need this + // to detect when we are inserting at the end of the log or in the middle, as the two casese are + // handled differently and `self.store.iter().len()` does not update until a new version is + // committed. + pending_inserts: usize, // Send handle for a channel where we stream resource. stream: BroadcastSender, @@ -54,6 +59,7 @@ impl LedgerLog { file_pattern, 1u64 << 20, // 1 MB )?, + pending_inserts: 0, stream: channel().0, stream_pos: 0, }) @@ -110,6 +116,7 @@ impl LedgerLog { cache_size, cache, store, + pending_inserts: 0, stream: channel().0, stream_pos, }) @@ -126,6 +133,7 @@ impl LedgerLog { pub(crate) fn store_resource(&mut self, resource: Option) -> Result<(), PersistenceError> { self.store.store_resource(&resource)?; + self.pending_inserts += 1; if self.cache.len() >= self.cache_size { self.cache.pop_front(); self.cache_start += 1; @@ -140,7 +148,7 @@ impl LedgerLog { { // If there are missing objects between what we currently have and `object`, pad with // placeholders. - let len = self.store.iter().len(); + let len = self.store.iter().len() + self.pending_inserts; let target_len = std::cmp::max(index, len); for i in len..target_len { tracing::debug!("storing placeholders for position {}/{target_len}", len + i); @@ -173,6 +181,7 @@ impl LedgerLog { pub(crate) async fn commit_version(&mut self) -> Result<(), PersistenceError> { tracing::debug!("committing new version of LedgerLog"); self.store.commit_version()?; + self.pending_inserts = 0; // Broadcast any newly-appended objects which extend the in-order available prefix. let mut i = self.stream_pos; @@ -193,7 +202,9 @@ impl LedgerLog { } pub(crate) fn revert_version(&mut self) -> Result<(), PersistenceError> { - self.store.revert_version() + self.store.revert_version()?; + self.pending_inserts = 0; + Ok(()) } }