Skip to content

Commit

Permalink
Merge pull request #167 from EspressoSystems/fix/multi-leaf-decide
Browse files Browse the repository at this point in the history
Fix LedgerLog to handle multiple inserts between commits.
  • Loading branch information
jbearer committed Aug 7, 2023
2 parents e45122f + f1574d8 commit d5e9dae
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions src/ledger_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ pub(crate) struct LedgerLog<T: Serialize + DeserializeOwned> {
cache_size: usize,
cache: VecDeque<Option<T>>,
store: AppendLog<BincodeLoadStore<Option<T>>>,
// Keep track of the number of appended objects which have not yet been committed. We need this
// to detect when we are inserting at the end of the log or in the middle, as the two casese are
// handled differently and `self.store.iter().len()` does not update until a new version is
// committed.
pending_inserts: usize,

// Send handle for a channel where we stream resource.
stream: BroadcastSender<T>,
Expand Down Expand Up @@ -54,6 +59,7 @@ impl<T: Serialize + DeserializeOwned + Clone> LedgerLog<T> {
file_pattern,
1u64 << 20, // 1 MB
)?,
pending_inserts: 0,
stream: channel().0,
stream_pos: 0,
})
Expand Down Expand Up @@ -110,6 +116,7 @@ impl<T: Serialize + DeserializeOwned + Clone> LedgerLog<T> {
cache_size,
cache,
store,
pending_inserts: 0,
stream: channel().0,
stream_pos,
})
Expand All @@ -126,6 +133,7 @@ impl<T: Serialize + DeserializeOwned + Clone> LedgerLog<T> {

pub(crate) fn store_resource(&mut self, resource: Option<T>) -> Result<(), PersistenceError> {
self.store.store_resource(&resource)?;
self.pending_inserts += 1;
if self.cache.len() >= self.cache_size {
self.cache.pop_front();
self.cache_start += 1;
Expand All @@ -140,7 +148,7 @@ impl<T: Serialize + DeserializeOwned + Clone> LedgerLog<T> {
{
// If there are missing objects between what we currently have and `object`, pad with
// placeholders.
let len = self.store.iter().len();
let len = self.store.iter().len() + self.pending_inserts;
let target_len = std::cmp::max(index, len);
for i in len..target_len {
tracing::debug!("storing placeholders for position {}/{target_len}", len + i);
Expand Down Expand Up @@ -173,6 +181,7 @@ impl<T: Serialize + DeserializeOwned + Clone> LedgerLog<T> {
pub(crate) async fn commit_version(&mut self) -> Result<(), PersistenceError> {
tracing::debug!("committing new version of LedgerLog");
self.store.commit_version()?;
self.pending_inserts = 0;

// Broadcast any newly-appended objects which extend the in-order available prefix.
let mut i = self.stream_pos;
Expand All @@ -193,7 +202,9 @@ impl<T: Serialize + DeserializeOwned + Clone> LedgerLog<T> {
}

pub(crate) fn revert_version(&mut self) -> Result<(), PersistenceError> {
self.store.revert_version()
self.store.revert_version()?;
self.pending_inserts = 0;
Ok(())
}
}

Expand Down

0 comments on commit d5e9dae

Please sign in to comment.