diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 25c498c..79c4953 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -2,7 +2,7 @@ name: Rust on: push: - branches: [ "master" ] + branches: [ "master", "develop" ] pull_request: branches: [ "master" ] diff --git a/fin_db/Cargo.toml b/fin_db/Cargo.toml index 8d2e92a..74c48b4 100644 --- a/fin_db/Cargo.toml +++ b/fin_db/Cargo.toml @@ -6,8 +6,8 @@ edition = "2021" [dependencies] ruc = "1.0" -fmerk = "0.1" +fmerk = { git = "https://github.com/FindoraNetwork/fmerk.git", tag = "v2.1.1"} storage = { path = "../storage", version = "0.2" } [features] -iterator = ["storage/iterator"] \ No newline at end of file +iterator = ["storage/iterator"] diff --git a/fin_db/src/lib.rs b/fin_db/src/lib.rs index 8cea604..7ab00ee 100644 --- a/fin_db/src/lib.rs +++ b/fin_db/src/lib.rs @@ -1,4 +1,8 @@ -use fmerk::{rocksdb, tree::Tree, BatchEntry, Merk, Op}; +use fmerk::{ + rocksdb::{self}, + tree::Tree, + BatchEntry, Merk, Op, +}; use ruc::*; use std::path::{Path, PathBuf}; use storage::db::{DbIter, IterOrder, KVBatch, KValue, MerkleDB}; @@ -28,13 +32,15 @@ impl FinDB { /// /// path, one will be created. pub fn open>(path: P) -> Result { - let db = Merk::open(path).map_err(|_| eg!("Failed to open db"))?; + let db = Merk::open(path).map_err(|e| eg!("Failed to open db {}", e))?; Ok(Self { db }) } /// Closes db and deletes all data from disk. pub fn destroy(self) -> Result<()> { - self.db.destroy().map_err(|_| eg!("Failed to destory db")) + self.db + .destroy() + .map_err(|e| eg!("Failed to destory db {}", e)) } } @@ -52,14 +58,14 @@ impl MerkleDB for FinDB { fn get(&self, key: &[u8]) -> Result>> { self.db .get(key) - .map_err(|_| eg!("Failed to get data from db")) + .map_err(|e| eg!("Failed to get data from db {}", e)) } /// Gets an auxiliary value. fn get_aux(&self, key: &[u8]) -> Result>> { self.db .get_aux(key) - .map_err(|_| eg!("Failed to get aux from db")) + .map_err(|e| eg!("Failed to get aux from db {}", e)) } /// Puts a batch of KVs @@ -93,17 +99,26 @@ impl MerkleDB for FinDB { IterOrder::Desc => Box::new(self.db.iter_opt_aux(rocksdb::IteratorMode::End, readopts)), } } + fn db_all_iterator(&self, order: IterOrder) -> DbIter<'_> + { + let readopts = rocksdb::ReadOptions::default(); + match order { + IterOrder::Asc => Box::new(self.db.iter_opt(rocksdb::IteratorMode::Start, readopts)), + IterOrder::Desc => Box::new(self.db.iter_opt(rocksdb::IteratorMode::End, readopts)), + } + } + /// Commits changes. fn commit(&mut self, aux: KVBatch, flush: bool) -> Result<()> { let batch_aux = to_batch(aux); self.db .commit(batch_aux.as_ref()) - .map_err(|_| eg!("Failed to commit to db"))?; + .map_err(|e| eg!("Failed to commit to db {}", e))?; if flush { self.db .flush() - .map_err(|_| eg!("Failed to flush memtables"))?; + .map_err(|e| eg!("Failed to flush memtables {}", e))?; } Ok(()) } @@ -112,7 +127,7 @@ impl MerkleDB for FinDB { fn snapshot>(&self, path: P) -> Result<()> { self.db .snapshot(path) - .map_err(|_| eg!("Failed to take snapshot"))?; + .map_err(|e| eg!("Failed to take snapshot {}", e))?; Ok(()) } @@ -121,6 +136,10 @@ impl MerkleDB for FinDB { let kv = Tree::decode(kv_pair.0.to_vec(), &kv_pair.1); (kv.key().to_vec(), kv.value().to_vec()) } + + fn clean_aux(&mut self) -> Result<()> { + self.db.clean_aux().map_err(|e| eg!(e)) + } } /// Rocks db @@ -246,6 +265,15 @@ impl MerkleDB for RocksDB { self.iter(lower, upper, order) } + fn db_all_iterator(&self, order: IterOrder) -> DbIter<'_> + { + let readopts = rocksdb::ReadOptions::default(); + match order { + IterOrder::Asc => Box::new(self.iter_opt(rocksdb::IteratorMode::Start, readopts)), + IterOrder::Desc => Box::new(self.iter_opt(rocksdb::IteratorMode::End, readopts)), + } + } + /// Commits changes. fn commit(&mut self, kvs: KVBatch, flush: bool) -> Result<()> { // write batch @@ -255,7 +283,7 @@ impl MerkleDB for RocksDB { if flush { self.db .flush() - .map_err(|_| eg!("Failed to flush memtables"))?; + .map_err(|e| eg!("Failed to flush memtables {}", e))?; } Ok(()) @@ -273,4 +301,19 @@ impl MerkleDB for RocksDB { fn decode_kv(&self, kv_pair: (Box<[u8]>, Box<[u8]>)) -> KValue { (kv_pair.0.to_vec(), kv_pair.1.to_vec()) } + + fn clean_aux(&mut self) -> Result<()> { + // let state_cf = self.db.cf_handle(CF_STATE).unwrap(); + // let mut batch = rocksdb::WriteBatch::default(); + // for (key, _) in self.db.iterator_cf(state_cf, IteratorMode::Start) { + // batch.delete_cf(state_cf, key); + // } + + // let mut opts = rocksdb::WriteOptions::default(); + // opts.set_sync(false); + // self.db.write_opt(batch, &opts).c(d!())?; + // self.db.flush_cf(state_cf).c(d!())?; + + Ok(()) + } } diff --git a/mem_db/src/lib.rs b/mem_db/src/lib.rs index 6d9e0c9..ee71020 100644 --- a/mem_db/src/lib.rs +++ b/mem_db/src/lib.rs @@ -84,6 +84,28 @@ impl MerkleDB for MemoryDB { Ok(()) } + fn db_all_iterator(&self, order: IterOrder) -> DbIter<'_> + { + let lower_key: &[u8] = b"0"; + + let lower = lower_key.to_vec().into_boxed_slice(); + let upper = lower_key.to_vec().into_boxed_slice(); + + match order { + IterOrder::Asc => Box::new( + self.inner + .range::, _>((Included(&lower), Excluded(&upper))) + .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone()))), + ), + IterOrder::Desc => Box::new( + self.inner + .range::, _>((Included(&lower), Excluded(&upper))) + .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone()))) + .rev(), + ), + } + } + fn iter(&self, lower: &[u8], upper: &[u8], order: IterOrder) -> DbIter<'_> { let lower = lower.to_vec().into_boxed_slice(); let upper = upper.to_vec().into_boxed_slice(); @@ -142,6 +164,11 @@ impl MerkleDB for MemoryDB { fn decode_kv(&self, kv_pair: (Box<[u8]>, Box<[u8]>)) -> KValue { (kv_pair.0.to_vec(), kv_pair.1.to_vec()) } + + fn clean_aux(&mut self) -> Result<()> { + self.aux.clear(); + Ok(()) + } } impl Drop for MemoryDB { diff --git a/storage/Cargo.toml b/storage/Cargo.toml index e8759f9..39723aa 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -17,4 +17,6 @@ temp_db = { path = "../temp_db", version = "0.2" } mem_db = { path = "../mem_db", version = "0.2" } [features] +default = [ "optimize_get_ver" ] iterator = [] +optimize_get_ver = [] diff --git a/storage/src/db.rs b/storage/src/db.rs index abf9127..2196600 100644 --- a/storage/src/db.rs +++ b/storage/src/db.rs @@ -9,6 +9,7 @@ pub type KVEntry = (StoreKey, Option>); pub type KVBatch = Vec; pub type DbIter<'a> = Box, Box<[u8]>)> + 'a>; +#[derive(Debug)] pub enum IterOrder { Asc, Desc, @@ -28,13 +29,18 @@ pub trait MerkleDB { fn iter_aux(&self, lower: &[u8], upper: &[u8], order: IterOrder) -> DbIter<'_>; + fn db_all_iterator(&self, order: IterOrder) -> DbIter<'_>; + fn commit(&mut self, kvs: KVBatch, flush: bool) -> Result<()>; fn snapshot>(&self, path: P) -> Result<()>; fn decode_kv(&self, kv_pair: (Box<[u8]>, Box<[u8]>)) -> KValue; + #[inline] fn as_mut(&mut self) -> &mut Self { self } + + fn clean_aux(&mut self) -> Result<()>; } diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 52118d5..fb10f64 100755 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -1,4 +1,111 @@ /// The merkle db +/// +#[deny( +////The following are allowed by default lints according to +////https://doc.rust-lang.org/rustc/lints/listing/allowed-by-default.html +absolute_paths_not_starting_with_crate, +//box_pointers, async trait must use it +//elided_lifetimes_in_paths, //allow anonymous lifetime +explicit_outlives_requirements, +keyword_idents, +macro_use_extern_crate, +meta_variable_misuse, +missing_abi, +//missing_copy_implementations, +//missing_debug_implementations, +//missing_docs, +//must_not_suspend, unstable +//non_ascii_idents, +//non_exhaustive_omitted_patterns, unstable +noop_method_call, +pointer_structural_match, +rust_2021_incompatible_closure_captures, +rust_2021_incompatible_or_patterns, +//rust_2021_prefixes_incompatible_syntax, +rust_2021_prelude_collisions, +single_use_lifetimes, +trivial_casts, +trivial_numeric_casts, +unreachable_pub, +unsafe_code, +unsafe_op_in_unsafe_fn, +stable_features, +//unused_crate_dependencies +unused_extern_crates, +unused_import_braces, +unused_lifetimes, +unused_qualifications, +unused_results, +variant_size_differences, +warnings, //treat all warnings as errors +clippy::all, +//clippy::pedantic, +clippy::cargo, +//The followings are selected restriction lints for rust 1.57 +clippy::as_conversions, +clippy::clone_on_ref_ptr, +clippy::create_dir, +clippy::dbg_macro, +clippy::decimal_literal_representation, +//clippy::default_numeric_fallback, too verbose when dealing with numbers +clippy::disallowed_script_idents, +clippy::else_if_without_else, +//clippy::exhaustive_enums, +clippy::exhaustive_structs, +clippy::exit, +clippy::expect_used, +clippy::filetype_is_file, +clippy::float_arithmetic, +clippy::float_cmp_const, +clippy::get_unwrap, +clippy::if_then_some_else_none, +//clippy::implicit_return, it's idiomatic Rust code. +clippy::indexing_slicing, +//clippy::inline_asm_x86_att_syntax, +//clippy::inline_asm_x86_intel_syntax, +clippy::integer_arithmetic, +clippy::integer_division, +clippy::let_underscore_must_use, +clippy::lossy_float_literal, +clippy::map_err_ignore, +clippy::mem_forget, +//clippy::missing_docs_in_private_items, +clippy::missing_enforced_import_renames, +clippy::missing_inline_in_public_items, +//clippy::mod_module_files, mod.rs file lis used +clippy::modulo_arithmetic, +clippy::multiple_inherent_impl, +//clippy::panic, allow in application code +//clippy::panic_in_result_fn, not necessary as panic is banned +clippy::pattern_type_mismatch, +clippy::print_stderr, +clippy::print_stdout, +clippy::rc_buffer, +clippy::rc_mutex, +clippy::rest_pat_in_fully_bound_structs, +clippy::same_name_method, +clippy::self_named_module_files, +//clippy::shadow_reuse, it’s a common pattern in Rust code +//clippy::shadow_same, it’s a common pattern in Rust code +clippy::shadow_unrelated, +clippy::str_to_string, +clippy::string_add, +clippy::string_to_string, +clippy::todo, +clippy::unimplemented, +clippy::unnecessary_self_imports, +clippy::unneeded_field_pattern, +//clippy::unreachable, allow unreachable panic,which is out of expectation +clippy::unwrap_in_result, +clippy::unwrap_used, +//clippy::use_debug, debug is allow for debug log +clippy::verbose_file_reads, +clippy::wildcard_enum_match_arm, +)] +#[allow( +clippy::panic, //allow debug_assert,panic in production code +clippy::multiple_crate_versions, //caused by the dependency, can't be fixed +)] pub mod db; pub mod state; pub mod store; diff --git a/storage/src/state/cache.rs b/storage/src/state/cache.rs index d653f15..b8dcee1 100644 --- a/storage/src/state/cache.rs +++ b/storage/src/state/cache.rs @@ -215,9 +215,8 @@ impl SessionedCache { return true; } for delta in self.stack.iter().rev() { - match delta.get(key).map(|v| v.is_none()) { - Some(v) => return v, - None => {} + if let Some(v) = delta.get(key).map(|v| v.is_none()) { + return v; } } if self.base.get(key) == Some(&None) { diff --git a/storage/src/state/chain_state.rs b/storage/src/state/chain_state.rs index 973eb5e..a638e84 100644 --- a/storage/src/state/chain_state.rs +++ b/storage/src/state/chain_state.rs @@ -9,11 +9,21 @@ use crate::{ store::Prefix, }; use ruc::*; -use std::{ops::Range, path::Path, str}; +use std::{ + cmp::Ordering, + collections::{BTreeMap, VecDeque}, + ops::Range, + path::Path, + str, +}; const HEIGHT_KEY: &[u8; 6] = b"Height"; +const BASE_HEIGHT_KEY: &[u8; 10] = b"BaseHeight"; +const SNAPSHOT_KEY: &[u8; 8] = b"Snapshot"; const AUX_VERSION: &[u8; 10] = b"AuxVersion"; -const CUR_AUX_VERSION: u64 = 0x01; +const AUX_VERSION_00: u64 = 0x00; +const AUX_VERSION_01: u64 = 0x01; +const AUX_VERSION_02: u64 = 0x02; const SPLIT_BGN: &str = "_"; const TOMBSTONE: [u8; 1] = [206u8]; @@ -23,15 +33,36 @@ pub const HASH_LENGTH: usize = 32; /// A zero-filled `Hash`. same with fmerk. pub const NULL_HASH: [u8; HASH_LENGTH] = [0; HASH_LENGTH]; +#[derive(Debug, Clone)] +pub struct SnapShotInfo { + pub start: u64, + pub end: u64, + pub count: u64, +} + /// Concrete ChainState struct containing a reference to an instance of MerkleDB, a name and /// current tree height. pub struct ChainState { name: String, ver_window: u64, + interval: u64, + snapshot_info: VecDeque, + // the min height of the versioned keys + min_height: u64, + pinned_height: BTreeMap, version: u64, db: D, } +/// Configurable options +#[derive(Default, Clone, Debug)] +pub struct ChainStateOpts { + pub name: Option, + pub ver_window: u64, + pub interval: u64, + pub cleanup_aux: bool, +} + /// Implementation of of the concrete ChainState struct impl ChainState { /// Creates a new instance of the ChainState. @@ -41,30 +72,151 @@ impl ChainState { /// /// Returns the implicit struct pub fn new(db: D, name: String, ver_window: u64) -> Self { - let mut db_name = String::from("chain-state"); - if !name.is_empty() { - db_name = name; + let opts = ChainStateOpts { + name: if name.is_empty() { None } else { Some(name) }, + ver_window, + cleanup_aux: false, + ..Default::default() + }; + + Self::create_with_opts(db, opts) + } + /// Create a new instance of ChainState with user specified options + /// + pub fn create_with_opts(db: D, opts: ChainStateOpts) -> Self { + let db_name = opts.name.unwrap_or_else(|| String::from("chain-state")); + + if opts.interval == 1 { + panic!("snapshot interval cannot be One") + } + + if opts.ver_window < opts.interval { + panic!("version window is smaller than snapshot interval"); + } + // ver_window is larger than snapshot_interval + // ver_window should align at snapshot_interval + if opts.interval != 0 && opts.ver_window % opts.interval != 0 { + panic!("ver_window should align at snapshot interval"); + } + + if opts.ver_window == 0 && opts.cleanup_aux { + panic!("perform an cleanup_aux and construct base on a no-version chain"); } let mut cs = ChainState { name: db_name, - ver_window, + ver_window: opts.ver_window, + interval: opts.interval, + snapshot_info: Default::default(), + min_height: 0, + pinned_height: Default::default(), version: Default::default(), db, }; - cs.version = cs.get_aux_version().expect("Need a valid version"); + if opts.cleanup_aux { + cs.clean_aux().unwrap(); + // move all keys to base + cs.construct_base(); + } - cs.clean_aux_db(); + let mut base_height = None; + let mut prev_interval = 0; + match cs.get_aux_version().expect("Need a valid version") { + None => { + // initializing + // version will be updated in `commit_db_with_meta` + cs.version = AUX_VERSION_00; + } + Some(AUX_VERSION_01) => { + // Version_01 + // 1. versioned keys are seperated into two sections: `base` and `VER` + + let h = cs.height().expect("Failed to get height"); + base_height = match h.cmp(&opts.ver_window) { + Ordering::Greater => Some(h.saturating_sub(opts.ver_window)), + _ => None, + }; + // version will be updated in `commit_db_with_meta` + cs.version = AUX_VERSION_01; + } + Some(AUX_VERSION_02) => { + // Version_02 + // 1. `base_height` is persistent in aux db + // 2. add snapshots for speedup get_ver + base_height = cs + .base_height() + .expect("Failed to read base_height from aux db"); + prev_interval = cs + .snapshot_meta() + .expect("Failed to read snapshot meta from aux db") + .expect("missing snapshot meta"); + + cs.version = AUX_VERSION_02; + } + Some(_) => { + panic!("Invalid db version"); + } + } + + println!( + "{} {} {:?} {}", + cs.name, cs.version, base_height, prev_interval + ); + + let mut batch = KVBatch::new(); + cs.clean_aux_db(&mut base_height, &mut batch); + cs.build_snapshots(base_height, prev_interval, opts.interval, &mut batch); + cs.commit_db_with_meta(batch); cs } + /// Pin the ChainState at specified height + /// + pub fn pin_at(&mut self, height: u64) -> Result<()> { + let current = self.height()?; + if current < height { + return Err(eg!("pin at future height")); + } + if height < self.min_height { + return Err(eg!("pin at too old height")); + } + if self.ver_window == 0 { + return Err(eg!("pin on non-versioned chain")); + } + + let entry = self.pinned_height.entry(height).or_insert(0); + *entry = entry.saturating_add(1); + Ok(()) + } + + /// Unpin the ChainState at specified height + /// + pub fn unpin_at(&mut self, height: u64) { + let remove = match self.pinned_height.get_mut(&height) { + Some(count) if *count > 0 => { + *count = count.saturating_sub(1); + *count == 0 + } + _ => unreachable!(), + }; + if remove { + assert_eq!(self.pinned_height.remove(&height), Some(0)); + } + } + /// Gets a value for the given key from the primary data section in RocksDB pub fn get(&self, key: &[u8]) -> Result>> { self.db.get(key) } + // ver_window == 0 -> ver_window = 100 + // current height = 10000, cf internal + // [0,10000] -> base prefix saved to aux + + // [9900, 10000] versioned, [0,9899] -> base prefix saved to aux + /// Gets a value for the given key from the auxiliary data section in RocksDB. /// /// This section of data is not used for root hash calculations. @@ -75,15 +227,16 @@ impl ChainState { /// Get aux database version /// /// The default version is ox00 - pub fn get_aux_version(&self) -> Result { + fn get_aux_version(&self) -> Result> { if let Some(version) = self.get_aux(AUX_VERSION.to_vec().as_ref())? { let ver_str = String::from_utf8(version).c(d!("Invalid aux version string"))?; - return ver_str + let ver = ver_str .parse::() - .c(d!("aux version should be a valid 64-bit long integer")); + .c(d!("aux version should be a valid 64-bit long integer"))?; + Ok(Some(ver)) + } else { + Ok(None) } - - Ok(0x00) } /// Iterates MerkleDB for a given range of keys. @@ -113,6 +266,24 @@ impl ChainState { true } + pub fn all_iterator(&self, order: IterOrder, func: &mut dyn FnMut(KValue) -> bool) -> bool { + // Get DB iterator + let mut db_iter = self.db.db_all_iterator(order); + let mut stop = false; + + // Loop through each entry in range + while !stop { + let kv_pair = match db_iter.next() { + Some(result) => result, + None => break, + }; + + let entry = self.db.decode_kv(kv_pair); + stop = func(entry); + } + true + } + /// Iterates MerkleDB allocated in auxiliary section for a given range of keys. /// /// Executes a closure passed as a parameter with the corresponding key value pairs. @@ -176,7 +347,6 @@ impl ChainState { //Build range keys for window limits let pruning_height = Self::height_str(height - self.ver_window - 1); let pruning_prefix = Prefix::new("VER".as_bytes()).push(pruning_height.as_bytes()); - // move key-value pairs of left window side to baseline self.iterate_aux( &pruning_prefix.begin(), @@ -207,7 +377,7 @@ impl ChainState { /// prefixed to each key. /// /// This is to keep a versioned history of KV pairs. - fn build_aux_batch(&self, height: u64, batch: &[KVEntry]) -> Result { + fn build_aux_batch(&mut self, height: u64, batch: &[KVEntry]) -> Result { let mut aux_batch = KVBatch::new(); if self.ver_window != 0 { // Copy keys from batch to aux batch while prefixing them with the current height @@ -222,7 +392,32 @@ impl ChainState { .collect(); // Prune Aux data in the db - self.prune_aux_batch(height, &mut aux_batch)?; + let upper = self.pinned_height.keys().min().map_or(height, |min| *min); + let last_upper = self.min_height.saturating_add(self.ver_window); + // the versioned keys before H = upper - ver_window - 1 are moved to base, H is included + for h in last_upper..=upper { + self.prune_aux_batch(h, &mut aux_batch)?; + } + + let last_min_height = self.min_height; + // update the left side of version window + self.min_height = if upper > self.ver_window { + upper.saturating_sub(self.ver_window) + } else { + // we only build base if height > ver_window + 0 + }; + if last_min_height > self.min_height { + self.min_height = last_min_height; + } else if self.min_height > 0 { + // Store the base height in auxiliary batch + aux_batch.push(( + BASE_HEIGHT_KEY.to_vec(), + Some((self.min_height - 1).to_string().into_bytes()), + )); + } + + self.build_snapshots_at_height(height, last_min_height, &mut aux_batch); } // Store the current height in auxiliary batch @@ -342,6 +537,32 @@ impl ChainState { Ok(0u64) } + // Get max height of keys stored in `base` + fn base_height(&self) -> Result> { + let height = self.db.get_aux(BASE_HEIGHT_KEY).c(d!())?; + if let Some(value) = height { + let height_str = String::from_utf8(value).c(d!())?; + let height = height_str.parse::().c(d!())?; + + Ok(Some(height)) + } else { + Ok(None) + } + } + + // Get the snapshot metadata + fn snapshot_meta(&self) -> Result> { + let raw = self.db.get_aux(SNAPSHOT_KEY).c(d!())?; + if let Some(value) = raw { + let meta_str = String::from_utf8(value).c(d!())?; + let meta = meta_str.parse::().c(d!())?; + + Ok(Some(meta)) + } else { + Ok(None) + } + } + /// Build a prefix for a versioned key pub fn versioned_key_prefix(height: u64) -> Prefix { Prefix::new("VER".as_bytes()).push(Self::height_str(height).as_bytes()) @@ -360,8 +581,13 @@ impl ChainState { format!("{:020}", height) } + /// Build a prefix for a snapshot key + pub(crate) fn snapshot_key_prefix(height: u64) -> Prefix { + Prefix::new("SNAPSHOT".as_bytes()).push(Self::height_str(height).as_bytes()) + } + /// Build a prefix for a base key - pub fn base_key_prefix() -> Prefix { + pub(crate) fn base_key_prefix() -> Prefix { Prefix::new("BASE".as_bytes()).push(Self::height_str(0).as_bytes()) } @@ -382,16 +608,6 @@ impl ChainState { Ok(key[2..].join(SPLIT_BGN)) } - /// Returns the Name of the ChainState - pub fn name(&self) -> &str { - self.name.as_str() - } - - /// This function will prune the tree of spent transaction outputs to reduce memory usage - pub fn prune_tree() { - unimplemented!() - } - /// Build the chain-state from height 1 to height H /// /// Returns a batch with KV pairs valid at height H @@ -400,11 +616,26 @@ impl ChainState { /// - Option-1: Considering renaming as `build_state_delta()` in future /// - Option-2: Considering add a flag `delta_or_full` parameter in future pub fn build_state(&self, height: u64, prefix: Option) -> KVBatch { + self.build_state_to(None, height, prefix, false) + } + + // height range is [s, e] + // build versioned keys between [s,e] and save them under `prefix` + fn build_state_to( + &self, + s: Option, + e: u64, + prefix: Option, + keep_tombstone: bool, + ) -> KVBatch { //New map to store KV pairs let mut map = KVMap::new(); let lower = Prefix::new("VER".as_bytes()); - let upper = Prefix::new("VER".as_bytes()).push(Self::height_str(height + 1).as_bytes()); + if let Some(start) = s { + lower.push(Self::height_str(start).as_bytes()); + } + let upper = Prefix::new("VER".as_bytes()).push(Self::height_str(e + 1).as_bytes()); self.iterate_aux( lower.begin().as_ref(), @@ -416,34 +647,150 @@ impl ChainState { return false; } //If value was deleted in the version history, delete it in the map - if v.eq(&TOMBSTONE) { + if !keep_tombstone && v.eq(&TOMBSTONE) { map.remove(raw_key.as_bytes()); } else { //update map with current KV - map.insert(raw_key.as_bytes().to_vec(), Some(v)); + if let Some(prefix) = &prefix { + map.insert(prefix.push(raw_key.as_bytes()).as_ref().to_vec(), Some(v)); + } else { + map.insert(raw_key.as_bytes().to_vec(), Some(v)); + } } false }, ); - if let Some(prefix) = prefix { - let kvs: Vec<_> = map - .into_iter() - .map(|(k, v)| (prefix.push(&k).as_ref().to_vec(), v)) - .collect(); - kvs - } else { - map.into_iter().collect::>() + map.into_iter().collect::>() + } + + // Remove versioned keys before `height(included)` + // Need a `commit` to actually remove these keys from persistent storage + fn remove_versioned_keys_before(&self, height: u64) -> KVBatch { + //Define upper and lower bounds for iteration + let lower = Prefix::new("VER".as_bytes()); + let upper = Prefix::new("VER".as_bytes()).push(Self::height_str(height + 1).as_bytes()); + + //Create an empty batch + let mut batch = KVBatch::new(); + + //Iterate aux data and delete keys within bounds + self.iterate_aux( + lower.begin().as_ref(), + upper.as_ref(), + IterOrder::Asc, + &mut |(k, _v)| -> bool { + //Delete the key from aux db + batch.push((k, None)); + false + }, + ); + + batch + } + + /// Get the value of a key at a given height + /// + /// Returns the value of the given key at a particular height + /// Returns None if the key was deleted or invalid at height H + #[cfg(feature = "optimize_get_ver")] + pub fn get_ver(&self, key: &[u8], height: u64) -> Result>> { + if self.ver_window == 0 { + return Err(eg!("non-versioned chain")); + } + + let cur_height = self.height().c(d!("error reading current height"))?; + + if self.interval != 0 { + let height = if cur_height <= height { + cur_height + } else { + height + }; + return self.find_versioned_key_with_snapshots(key, height); + } + //Make sure that this key exists to avoid expensive query + let val = self.get(key).c(d!("error getting value"))?; + if val.is_none() { + return Ok(None); + } + + //Need to set lower and upper bound as the height can get very large + let mut lower_bound = 1; + let upper_bound = height; + if height >= cur_height { + return Ok(val); + } + if cur_height > self.ver_window { + lower_bound = cur_height.saturating_sub(self.ver_window); + } + + if lower_bound > self.min_height { + lower_bound = self.min_height } + + match lower_bound.cmp(&height.saturating_add(1)) { + Ordering::Greater => { + // The keys at querying height are moved to base and override by later height + // We cannot determine version info of the querying key + return Err(eg!("height too old, no versioning info")); + } + Ordering::Equal => { + // Search it in baseline if the querying height is moved to base but not override + let key = Self::base_key(key); + return self.get_aux(&key).c(d!("error reading aux value")); + } + _ => { + // Perform another search in versioned keys + } + } + + // Iterate in descending order from upper bound until a value is found + let mut val: Result>> = Ok(None); + let mut stop = false; + let lower_key = Self::versioned_key(key, lower_bound); + let upper_key = Self::versioned_key(key, upper_bound.saturating_add(1)); + let _ = self.iterate_aux(&lower_key, &upper_key, IterOrder::Desc, &mut |( + ver_k, + v, + )| { + match Self::get_raw_versioned_key(&ver_k) { + Ok(k) => { + if k.as_bytes().eq(key) { + if !v.eq(&TOMBSTONE) { + val = Ok(Some(v)); + } + stop = true; + return true; + } + false + } + Err(e) => { + val = Err(e).c(d!("error reading aux value")); + stop = true; + true + } + } + }); + + if stop { + return val; + } + + // Search it in baseline + let key = Self::base_key(key); + self.get_aux(&key).c(d!("error reading aux value")) } /// Get the value of a key at a given height /// /// Returns the value of the given key at a particular height /// Returns None if the key was deleted or invalid at height H + #[cfg(not(feature = "optimize_get_ver"))] pub fn get_ver(&self, key: &[u8], height: u64) -> Result>> { //Make sure that this key exists to avoid expensive query - if self.get(key).c(d!("error getting value"))?.is_none() { + let val = self.get(key).c(d!("error getting value"))?; + if val.is_none() { return Ok(None); } @@ -452,11 +799,22 @@ impl ChainState { let upper_bound = height; let cur_height = self.height().c(d!("error reading current height"))?; if height >= cur_height { - return self.get(key); + return Ok(val); } if cur_height > self.ver_window { lower_bound = cur_height.saturating_sub(self.ver_window); } + + if lower_bound > self.min_height { + lower_bound = self.min_height + } + + // The keys at querying height are moved to base and override by later height + // So we cannot determine version info of the querying key + if lower_bound > height.saturating_add(1) { + return Err(eg!("height too old, no versioning info")); + } + //Iterate in descending order from upper bound until a value is found for h in (lower_bound..upper_bound.saturating_add(1)).rev() { let key = Self::versioned_key(key, h); @@ -479,71 +837,461 @@ impl ChainState { } } + // simple commit to db + fn commit_db_with_meta(&mut self, mut batch: KVBatch) { + // Update aux version if needed + if self.version != AUX_VERSION_02 { + batch.push(( + AUX_VERSION.to_vec(), + Some(AUX_VERSION_02.to_string().into_bytes()), + )); + } + + //Commit this batch to db + if self.db.commit(batch, true).is_err() { + println!("error building base chain state"); + return; + } + + // Read back to make sure previous commit works well and update in-memory field + self.version = self + .get_aux_version() + .expect("cannot read back version") + .expect("Need a valid version"); + } + + fn build_snapshots_at_height( + &mut self, + height: u64, + last_min_height: u64, + aux_batch: &mut KVBatch, + ) { + if self.interval < 2 { + // snapshot is disabled when interval == 0 + // snapshot interval can not be One + return; + } + + // Versioned keys before height `min_height` have been pruned and moved to `base`, + // if there is a snapshot at height `min_height-1`, it should be removed too. + // This could be multiple removals if `unpin` operations occurred. + for snapshot_at in last_min_height..self.min_height { + if snapshot_at > 0 && snapshot_at % self.interval == 0 { + let mut batch = self.remove_snapshot(snapshot_at); + aux_batch.append(&mut batch); + while let Some(last) = self.snapshot_info.front() { + if last.end <= snapshot_at { + self.snapshot_info.pop_front(); + } else { + break; + } + } + } + } + + // create last snapshot if necessary + if height > 1 && height.saturating_sub(1) % self.interval == 0 { + let e = height.saturating_sub(1); + let s = if e == self.interval { + 0 + } else { + e - self.interval + 1 + }; + let mut batch = self.create_snapshot(s, e); + let count = batch.len() as u64; + self.snapshot_info.push_back(SnapShotInfo { + start: s, + end: e, + count, + }); + aux_batch.append(&mut batch); + } + } + + fn build_snapshots( + &mut self, + base_height: Option, + prev_interval: u64, + interval: u64, + batch: &mut KVBatch, + ) { + let height = self.height().expect("Failed to read chain height"); + + batch.push(( + SNAPSHOT_KEY.to_vec(), + Some(interval.to_string().into_bytes()), + )); + + if prev_interval != 0 && prev_interval != interval { + // remove all previous snapshots + let mut snapshot_at = prev_interval; + while snapshot_at <= height { + let mut snapshot = self.remove_snapshot(snapshot_at); + snapshot_at = snapshot_at.saturating_add(prev_interval); + if !batch.is_empty() { + batch.append(&mut snapshot); + } + } + } + + if interval != 0 { + //create snapshots + let mut s = base_height.map(|v| v.saturating_add(1)).unwrap_or_default(); + let mut e = if s != 0 && s % interval == 0 { + s + } else { + (s / interval + 1) * interval + }; + + while e <= height { + // create snapshot + let count = if prev_interval != interval { + let mut snapshot = self.create_snapshot(s, e); + let count = snapshot.len(); + batch.append(&mut snapshot); + count as u64 + } else { + // reconstruct snapshot info + self.count_in_snapshot(e) + }; + self.snapshot_info.push_back(SnapShotInfo { + start: s, + end: e, + count, + }); + s = e; + e = e.saturating_add(interval); + } + } + } + /// When creating a new chain-state instance, any residual aux data outside the current window /// needs to be cleared as to not waste memory or disrupt the versioning behaviour. - fn clean_aux_db(&mut self) { + fn clean_aux_db(&mut self, base_height: &mut Option, batch: &mut KVBatch) { + // A ChainState with pinned height, should never call this function + assert!(self.pinned_height.is_empty()); + //Get current height - let current_height = self.height().unwrap_or(0); + let current_height = self.height().expect("failed to get chain height"); if current_height == 0 { return; } if current_height < self.ver_window + 1 { + // ver_window is increased, just update the min_height + if let Some(h) = base_height { + self.min_height = h.saturating_add(1); + } return; } - //Get batch for state at H = current_height - ver_window - let mut batch = self.build_state( - current_height - self.ver_window, - Some(Self::base_key_prefix()), - ); - // Update aux version if needed - if self.version != CUR_AUX_VERSION { - batch.push(( + self.min_height = current_height - self.ver_window; + //Get batch for state at H = current_height - ver_window - 1, H is included + let mut base_batch = self.build_state(self.min_height - 1, Some(Self::base_key_prefix())); + let current_base = match base_height { + Some(h) if *h >= self.min_height => { + assert!(base_batch.is_empty()); + self.min_height = h.saturating_add(1); + *h + } + _ => self.min_height.saturating_sub(1), + }; + batch.append(&mut base_batch); + // Store the base height in auxiliary batch + batch.push(( + BASE_HEIGHT_KEY.to_vec(), + Some(current_base.to_string().into_bytes()), + )); + *base_height = Some(current_base); + + // Remove the versioned keys before H = current_height - self.ver_window - 1, H is included. + let mut removed_keys = self.remove_versioned_keys_before(self.min_height - 1); + + batch.append(&mut removed_keys); + } + + fn construct_base(&mut self) { + let height = self.height().expect("Failed to get chain height"); + + let mut batch = vec![ + ( AUX_VERSION.to_vec(), - Some(CUR_AUX_VERSION.to_string().into_bytes()), - )); + Some(AUX_VERSION_02.to_string().into_bytes()), + ), + ( + BASE_HEIGHT_KEY.to_vec(), + Some(height.to_string().into_bytes()), + ), + (SNAPSHOT_KEY.to_vec(), Some(0.to_string().into_bytes())), + ]; + println!("{} construct base {}", self.name, height); + + self.all_iterator(IterOrder::Asc, &mut |(k, v)| -> bool { + let base_key = Self::base_key(&k); + batch.push((base_key, Some(v))); + false + }); + + //Commit this batch to db + self.db + .commit(batch, true) + .expect("error constructing chain base state"); + } + + /// Gets current versioning range of the chain-state + /// + /// returns a range of the current versioning window [lower, upper) + pub fn get_ver_range(&self) -> Result> { + let upper = self.height().c(d!("error reading current height"))?; + let mut lower = 0; + if upper > self.ver_window { + lower = upper.saturating_sub(self.ver_window); } - //Commit this batch at base height H - if self.db.commit(batch, true).is_err() { - println!("error building base chain state"); - return; + if let Some(&pinned) = self.pinned_height.keys().min() { + if pinned < lower { + lower = pinned; + } } - // Read back to make sure previous commit works well and update in-memory field - self.version = self.get_aux_version().expect("cannot read back version"); + Ok(lower..upper) + } - //Define upper and lower bounds for iteration - let lower = Prefix::new("VER".as_bytes()); - let upper = Prefix::new("VER".as_bytes()) - .push(Self::height_str(current_height - self.ver_window).as_bytes()); + pub fn clean_aux(&mut self) -> Result<()> { + let height = self.height().expect("Failed to read chain height"); + let batch = vec![(HEIGHT_KEY.to_vec(), Some(height.to_string().into_bytes()))]; - //Create an empty batch - let mut batch = KVBatch::new(); + self.db.clean_aux()?; + self.db.commit(batch, true) + } + + /// get current pinned height + /// + pub fn current_pinned_height(&self) -> Vec { + self.pinned_height.keys().cloned().collect() + } + + /// Get current version window in database + pub fn current_window(&self) -> Result<(u64, u64)> { + if self.ver_window == 0 { + return Err(eg!("Not supported for an non-versioned chain")); + } + let current = self.height()?; + + Ok((self.min_height, current)) + } + + // create an new snapshot at height `end` including versioned keys in height range [start, end] + // snapshot prefix "SNAPSHOT-{end}" + fn create_snapshot(&self, start: u64, end: u64) -> KVBatch { + self.build_state_to(Some(start), end, Some(Self::snapshot_key_prefix(end)), true) + } + + fn remove_snapshot(&self, height: u64) -> KVBatch { + let mut map = KVMap::new(); + + let lower = Prefix::new("SNAPSHOT".as_bytes()).push(Self::height_str(height).as_bytes()); + let upper = + Prefix::new("SNAPSHOT".as_bytes()).push(Self::height_str(height + 1).as_bytes()); - //Iterate aux data and delete keys within bounds self.iterate_aux( - lower.begin().as_ref(), + lower.as_ref(), + upper.as_ref(), + IterOrder::Asc, + &mut |(k, _)| -> bool { + // Only remove versioned kv pairs + let raw_key = Self::get_raw_versioned_key(&k).unwrap_or_default(); + if raw_key.is_empty() { + return false; + } + // Mark this key to be deleted + map.insert(k, None); + false + }, + ); + + map.into_iter().collect::>() + } + + fn count_in_snapshot(&self, height: u64) -> u64 { + let lower = Prefix::new("SNAPSHOT".as_bytes()).push(Self::height_str(height).as_bytes()); + let upper = + Prefix::new("SNAPSHOT".as_bytes()).push(Self::height_str(height + 1).as_bytes()); + + let mut count = 0u64; + + self.iterate_aux( + lower.as_ref(), upper.as_ref(), IterOrder::Asc, &mut |(k, _v)| -> bool { - //Delete the key from aux db - batch.push((k, None)); + let raw_key = Self::get_raw_versioned_key(&k).unwrap_or_default(); + if raw_key.is_empty() { + return false; + } + count = count.saturating_add(1); false }, ); - //commit aux batch - let _ = self.db.commit(batch, true); + count } - /// Gets current versioning range of the chain-state - /// - /// returns a range of the current versioning window [lower, upper) - pub fn get_ver_range(&self) -> Result> { - let upper = self.height().c(d!("error reading current height"))?; - let mut lower = 1; - if upper > self.ver_window { - lower = upper.saturating_sub(self.ver_window); + fn find_versioned_key_with_range( + &self, + lower: Vec, + upper: Vec, + key: &[u8], + order: IterOrder, + ) -> Result<(bool, Option>)> { + let mut val = Ok((false, None)); + + let _ = self.iterate_aux(&lower, &upper, order, &mut |(ver_k, v)| { + if let Ok(k) = Self::get_raw_versioned_key(&ver_k) { + if k.as_bytes().eq(key) { + val = Ok((true, if !v.eq(&TOMBSTONE) { Some(v) } else { None })); + return true; + } + } + false + }); + val + } + + fn last_snapshot(&self, height: u64) -> Option { + let mut last = None; + for (i, ss) in self.snapshot_info.iter().enumerate() { + if ss.start > height { + assert_eq!(i, 0); + return None; + } else if ss.end == height { + return Some(i); + } else if ss.end < height { + last = Some(i); + continue; + } else { + assert!(ss.start <= height && ss.end > height); + return if i > 0 { + Some(i.saturating_sub(1)) + } else { + None + }; + } } - Ok(lower..upper) + last + } + + /// Get snapshot info + pub fn get_snapshots_info(&self) -> Vec { + self.snapshot_info.iter().cloned().collect() + } + + /// The height of last snapshot before `height(included)` + pub fn last_snapshot_before(&self, height: u64) -> Option { + let interval = self.interval; + if interval >= 2 { + Some(if height % interval == 0 { + height + } else { + height / interval * interval + }) + } else { + None + } + } + + /// The height of oldest snapshot + pub fn oldest_snapshot(&self) -> Option { + let interval = self.interval; + let min_height = self.get_ver_range().ok()?.start; + if interval >= 2 { + Some(if min_height % interval == 0 { + min_height + } else { + (min_height / interval + 1) * interval + }) + } else { + None + } + } + + fn find_versioned_key_with_snapshots( + &self, + key: &[u8], + height: u64, + ) -> Result>> { + // The keys at querying height are moved to base and override by later height + // So we cannot determine version info of the querying key + if self.min_height > height { + return if self.min_height == height.saturating_add(1) { + // search in the `base` + let key = Self::base_key(key); + self.get_aux(&key).c(d!("error reading aux value")) + } else { + Err(eg!("height too old, no versioning info")) + }; + } + + let last = self.last_snapshot(height); + + let s = if let Some(idx) = last { + let ss = self + .snapshot_info + .get(idx) + .ok_or(eg!("cannot find snapshot information!"))?; + ss.end + } else { + // the query height is less than all of the snapshots + self.min_height + }; + + if s <= height { + // search versioned key which beyonds snapshots + let lower = Self::versioned_key(key, s); + let upper = Self::versioned_key(key, height.saturating_add(1)); + let (stop, val) = + self.find_versioned_key_with_range(lower, upper, key, IterOrder::Desc)?; + if stop { + return Ok(val); + } + } + + if let Some(last) = last { + for idx in (0..=last).rev() { + let ss = self + .snapshot_info + .get(idx) + .ok_or(eg!("cannot find snapshot info!"))?; + let height = ss.end; + if ss.count != 0 { + if let Some(v) = + self.get_aux(Self::snapshot_key_prefix(height).push(key).as_ref())? + { + return Ok(if v.eq(&TOMBSTONE) { None } else { Some(v) }); + } + } + } + } + + // search in base + let key = Self::base_key(key); + self.get_aux(&key).c(d!("error reading aux value")) + } + + // hMove all the data before the specified height to base + pub fn height_internal_to_base(&mut self, height: u64) -> Result<()> { + let mut batch = KVBatch::new(); + + self.all_iterator(IterOrder::Asc, &mut |(k, v)| -> bool { + let base_key = Self::base_key(&k); + batch.push((base_key, Some(v))); + false + }); + + batch.push(( + BASE_HEIGHT_KEY.to_vec(), + Some(height.to_string().into_bytes()), + )); + if self.db.commit(batch, true).is_err() { + panic!("error move before a certain height chain state"); + } + Ok(()) } } diff --git a/storage/src/state/mod.rs b/storage/src/state/mod.rs index 7c94c7b..b068e20 100644 --- a/storage/src/state/mod.rs +++ b/storage/src/state/mod.rs @@ -6,7 +6,7 @@ pub mod chain_state; use crate::db::{IterOrder, KValue, MerkleDB}; pub use cache::{KVMap, KVecMap, SessionedCache}; -pub use chain_state::ChainState; +pub use chain_state::{ChainState, ChainStateOpts}; use parking_lot::RwLock; use ruc::*; use std::sync::Arc; @@ -18,6 +18,16 @@ use std::sync::Arc; pub struct State { chain_state: Arc>>, cache: SessionedCache, + height_cap: Option, +} + +impl Drop for State { + fn drop(&mut self) { + if let Some(height) = self.height_cap { + self.chain_state.write().unpin_at(height); + self.height_cap = None; + } + } } impl State { @@ -29,6 +39,7 @@ impl State { Self { chain_state: self.chain_state.clone(), cache: self.cache.clone(), + height_cap: None, } } @@ -50,6 +61,7 @@ impl State { // lock whole State object for now chain_state: cs, cache: SessionedCache::new(is_merkle), + height_cap: None, } } @@ -58,9 +70,20 @@ impl State { State { chain_state: self.chain_state.clone(), cache: self.cache.clone(), + height_cap: None, } } + /// Creates a State at specific height + pub fn state_at(&self, height: u64) -> Result { + self.chain_state.write().pin_at(height)?; + Ok(State { + chain_state: self.chain_state.clone(), + cache: SessionedCache::new(self.cache.is_merkle()), + height_cap: Some(height), + }) + } + /// Returns the chain state of the store. pub fn chain_state(&self) -> Arc>> { self.chain_state.clone() @@ -84,11 +107,18 @@ impl State { //If the key isn't found in the cache then query the chain state directly let cs = self.chain_state.read(); - cs.get(key) + match self.height_cap { + Some(height) => cs.get_ver(key, height), + None => cs.get(key), + } } pub fn get_ver(&self, key: &[u8], height: u64) -> Result>> { - self.chain_state.read().get_ver(key, height) + let query_at = match self.height_cap { + Some(cap) if cap < height => cap, + _ => height, + }; + self.chain_state.read().get_ver(key, query_at) } /// Queries whether a key exists in the current state. @@ -101,7 +131,10 @@ impl State { return Ok(true); } let cs = self.chain_state.read(); - cs.exists(key) + match self.height_cap { + Some(height) => cs.get_ver(key, height).map(|v| v.is_some()), + None => cs.exists(key), + } } /// Sets a key value pair in the cache @@ -144,7 +177,7 @@ impl State { } /// Iterates the cache for a given prefix - pub fn iterate_cache(&self, prefix: &[u8], map: &mut KVecMap) { + pub(crate) fn iterate_cache(&self, prefix: &[u8], map: &mut KVecMap) { self.cache.iter_prefix(prefix, map); } @@ -152,6 +185,9 @@ impl State { /// /// The cache gets persisted to the MerkleDB and then cleared pub fn commit(&mut self, height: u64) -> Result<(Vec, u64)> { + if self.height_cap.is_some() { + return Err(eg!("Not support commit a state with height cap")); + } let mut cs = self.chain_state.write(); //Get batch for current block and remove uncessary DELETE. @@ -200,12 +236,20 @@ impl State { /// Return the current height of the Merkle tree pub fn height(&self) -> Result { let cs = self.chain_state.read(); - cs.height() + let current = cs.height()?; + Ok(match self.height_cap { + Some(cap) if cap < current => cap, + _ => current, + }) } /// Returns the root hash of the last commit pub fn root_hash(&self) -> Vec { - let cs = self.chain_state.read(); - cs.root_hash() + if self.height_cap.is_some() { + vec![] + } else { + let cs = self.chain_state.read(); + cs.root_hash() + } } } diff --git a/storage/tests/chain_state.rs b/storage/tests/chain_state.rs new file mode 100644 index 0000000..28fc0ac --- /dev/null +++ b/storage/tests/chain_state.rs @@ -0,0 +1,814 @@ +use fin_db::FinDB; +use std::{env::temp_dir, time::SystemTime}; +use storage::{ + db::MerkleDB, + state::{ChainState, ChainStateOpts}, +}; +use temp_db::TempFinDB; + +#[test] +fn test_current_window() { + let ver_window = 2; + let fdb = TempFinDB::new().expect("failed to create temp findb"); + let mut chain = ChainState::new(fdb, "test".to_string(), ver_window); + + assert!(chain.current_window().map(|t| t == (0, 0)).unwrap()); + + assert!(chain.commit(vec![], 1, true).is_ok()); + assert!(chain.commit(vec![], 2, true).is_ok()); + + assert!(chain.current_window().map(|t| t == (0, 2)).unwrap()); + assert!(chain.commit(vec![], 3, true).is_ok()); + // current > ver_window + 1 => 3 == 2 + 1 + assert!(chain.current_window().map(|t| t == (1, 3)).unwrap()); + assert!(chain.commit(vec![], 4, true).is_ok()); + // current > ver_window + 1 => 4 > 2 + 1 + assert!(chain.current_window().map(|t| t == (2, 4)).unwrap()); + + assert!(chain.commit(vec![], 5, true).is_ok()); + assert!(chain.current_window().map(|t| t == (3, 5)).unwrap()); +} + +#[test] +fn test_pin_height() { + let ver_window = 3; + let fdb = TempFinDB::new().expect("failed to create temp findb"); + let mut chain = ChainState::new(fdb, "test".to_string(), ver_window); + assert!(chain.commit(vec![], 1, true).is_ok()); + assert!(chain.commit(vec![], 2, true).is_ok()); + assert!(chain.commit(vec![], 3, true).is_ok()); + assert!(chain.commit(vec![], 4, true).is_ok()); + assert!(chain.pin_at(1).is_ok()); + assert!(chain.pin_at(2).is_ok()); + assert_eq!(chain.current_pinned_height(), vec![1, 2]); + + assert!(chain.pin_at(1).is_ok()); + assert_eq!(chain.current_pinned_height(), vec![1, 2]); + assert!(chain.pin_at(3).is_ok()); + assert_eq!(chain.current_pinned_height(), vec![1, 2, 3]); +} + +#[test] +fn test_pin_extend_window() { + let ver_window = 2; + let fdb = TempFinDB::new().expect("failed to create temp findb"); + let mut chain = ChainState::new(fdb, "test".to_string(), ver_window); + + assert!(chain.commit(vec![], 1, true).is_ok()); + assert!(chain.commit(vec![], 2, true).is_ok()); + assert!(chain.commit(vec![], 3, true).is_ok()); + assert!(chain.current_window().map(|t| t == (1, 3)).is_ok()); + + assert!(chain.pin_at(1).is_ok()); + assert!(chain.commit(vec![], 4, true).is_ok()); + assert!(chain.current_window().map(|t| t == (1, 4)).is_ok()); +} + +#[test] +fn test_pin_height_error() { + let ver_window = 1; + let fdb = TempFinDB::new().expect("failed to create temp findb"); + let mut chain = ChainState::new(fdb, "test".to_string(), ver_window); + + // future height + assert!(chain.pin_at(1).is_err()); + assert!(chain.commit(vec![], 1, true).is_ok()); + assert!(chain.commit(vec![], 2, true).is_ok()); + assert!(chain.commit(vec![], 3, true).is_ok()); + // too old height + assert!(chain.pin_at(1).is_err()); +} + +#[test] +fn test_unpin_height() { + let ver_window = 1; + let fdb = TempFinDB::new().expect("failed to create temp findb"); + let mut chain = ChainState::new(fdb, "test".to_string(), ver_window); + + assert!(chain.commit(vec![], 1, true).is_ok()); + assert!(chain.commit(vec![], 2, true).is_ok()); + assert!(chain.pin_at(1).is_ok()); + assert!(chain.pin_at(1).is_ok()); + chain.unpin_at(1); + assert_eq!(chain.current_pinned_height(), vec![1]); + chain.unpin_at(1); + assert_eq!(chain.current_pinned_height(), Vec::::new()); + assert!(chain.commit(vec![], 3, true).is_ok()); + assert!(chain.pin_at(2).is_ok()); + assert!(chain.pin_at(3).is_ok()); + chain.unpin_at(3); + assert_eq!(chain.current_pinned_height(), vec![2]); +} + +#[test] +fn test_unpin_shrink_window() { + let ver_window = 2; + let fdb = TempFinDB::new().expect("failed to create temp findb"); + let mut chain = ChainState::new(fdb, "test".to_string(), ver_window); + + assert!(chain.commit(vec![], 1, true).is_ok()); + assert!(chain.commit(vec![], 2, true).is_ok()); + assert!(chain.commit(vec![], 3, true).is_ok()); + assert!(chain.pin_at(1).is_ok()); + assert!(chain.commit(vec![], 4, true).is_ok()); + chain.unpin_at(1); + assert!(chain.current_window().map(|t| t == (1, 4)).is_ok()); + // next commit following unpin_at + assert!(chain.commit(vec![], 5, true).is_ok()); + assert!(chain.current_window().map(|t| t == (3, 5)).is_ok()); +} + +#[test] +fn test_zero_window() { + let fdb = TempFinDB::new().expect("failed to create temp findb"); + let mut chain = ChainState::new(fdb, "test".to_string(), 0); + + for h in 1..4 { + assert!(chain.commit(vec![], h, true).is_ok()); + } + + assert!(chain.current_window().is_err()); + + assert!(chain.pin_at(4).is_err()); +} + +#[test] +fn test_create_snapshot_1() { + let fdb = TempFinDB::new().expect("failed to create temp findb"); + let opts = ChainStateOpts { + name: Some("test".to_string()), + ver_window: 10, + interval: 0, + cleanup_aux: false, + }; + let mut chain = ChainState::create_with_opts(fdb, opts); + assert!(chain.get_snapshots_info().is_empty()); + + for h in 0..20 { + assert!(chain.commit(vec![], h, true).is_ok()); + assert!(chain.get_snapshots_info().is_empty()); + } +} + +#[test] +#[should_panic] +fn test_create_snapshot_2() { + let fdb = TempFinDB::new().expect("failed to create temp findb"); + let opts = ChainStateOpts { + name: Some("test".to_string()), + ver_window: 10, + interval: 1, + cleanup_aux: false, + }; + let _ = ChainState::create_with_opts(fdb, opts); +} + +#[test] +#[should_panic] +fn test_create_snapshot_2_1() { + let fdb = TempFinDB::new().expect("failed to create temp findb"); + let opts = ChainStateOpts { + name: Some("test".to_string()), + ver_window: 0, + interval: 2, + cleanup_aux: false, + }; + let _ = ChainState::create_with_opts(fdb, opts); +} + +#[test] +#[should_panic] +fn test_create_snapshot_2_2() { + let fdb = TempFinDB::new().expect("failed to create temp findb"); + let opts = ChainStateOpts { + name: Some("test".to_string()), + ver_window: 3, + interval: 2, + cleanup_aux: false, + }; + let _ = ChainState::create_with_opts(fdb, opts); +} + +#[test] +fn test_create_snapshot_3() { + let ver_window = 12; + let interval = 3; + let fdb = TempFinDB::new().expect("failed to create temp findb"); + let opts = ChainStateOpts { + name: Some("test".to_string()), + ver_window, + interval, + cleanup_aux: false, + }; + let snapshot_created_at = interval.saturating_add(1); + let snapshot_dropped_at = opts.ver_window.saturating_add(interval); + let mut chain = ChainState::create_with_opts(fdb, opts); + + println!("{:?}", chain.get_snapshots_info()); + assert!(chain.get_snapshots_info().is_empty()); + + for h in 0..snapshot_created_at { + assert!(chain.commit(vec![], h, true).is_ok()); + assert!(chain.get_snapshots_info().is_empty()); + } + + for h in snapshot_created_at..snapshot_dropped_at { + assert!(chain.commit(vec![], h, true).is_ok()); + + let snapshots = chain.get_snapshots_info(); + let latest = snapshots.last().unwrap(); + assert_eq!(latest.end, h.saturating_sub(1) / interval * interval); + assert_eq!(latest.count, 0); + let first = snapshots.first().unwrap(); + assert_eq!(first.end, snapshot_created_at.saturating_sub(1)); + } + + for h in snapshot_dropped_at..20 { + assert!(chain.commit(vec![], h, true).is_ok()); + + let snapshots = chain.get_snapshots_info(); + let latest = snapshots.last().unwrap(); + assert_eq!(latest.end, h.saturating_sub(1) / interval * interval); + + let first = snapshots.first().unwrap(); + let min_height = chain.get_ver_range().unwrap().start; + let mut snapshot_at = chain.last_snapshot_before(min_height).unwrap(); + if snapshot_at < min_height { + // At this case, the snapshot at `snapshot_at` has been removed in last commit + snapshot_at += interval; + } + assert_eq!(first.end, snapshot_at); + } +} + +#[test] +fn test_create_snapshot_3_1() { + let ver_window = 21; + let interval = 7; + let fdb = TempFinDB::new().expect("failed to create temp findb"); + let opts = ChainStateOpts { + name: Some("test".to_string()), + ver_window, + interval, + cleanup_aux: false, + }; + + let snapshot_dropped_at = opts.ver_window.saturating_add(interval); + let mut chain = ChainState::create_with_opts(fdb, opts); + + for h in 0..snapshot_dropped_at { + assert!(chain.commit(vec![], h, true).is_ok()); + } + + let height = snapshot_dropped_at.saturating_sub(ver_window); + assert!(chain.pin_at(height).is_ok()); + + // commit to create more snapshots + for h in snapshot_dropped_at..100 { + assert!(chain.commit(vec![], h, true).is_ok()); + } + + // pin first, then calculate oldest snapshot + let last_snapshot = chain.oldest_snapshot().unwrap(); + let snapshots = chain.get_snapshots_info(); + let first = snapshots.first().unwrap(); + assert_eq!(first.end, last_snapshot); + + chain.unpin_at(height); + // commit to remove snapshots + assert!(chain.commit(vec![], 101, true).is_ok()); + let snapshots = chain.get_snapshots_info(); + let first = snapshots.first().unwrap(); + let snapshot_at = chain.oldest_snapshot().unwrap(); + assert_eq!(first.end, snapshot_at); +} + +fn gen_cs(ver_window: u64, interval: u64) -> ChainState { + let fdb = TempFinDB::new().expect("failed to create temp findb"); + let opts = ChainStateOpts { + name: Some("test".to_string()), + ver_window, + interval, + cleanup_aux: false, + }; + ChainState::create_with_opts(fdb, opts) +} + +fn apply_operations( + chain: &mut ChainState, + operations: Vec<(u64, Option>)>, + height_cap: u64, +) { + let key = b"test_key".to_vec(); + let mut h = 0; + for e in operations { + while h < e.0 { + chain.commit(vec![], h, false).unwrap(); + h += 1; + } + let batch = vec![(key.clone(), e.1)]; + chain.commit(batch, e.0, false).unwrap(); + h += 1; + } + + while h < height_cap { + chain.commit(vec![], h, false).unwrap(); + h += 1; + } +} + +fn verify_expectations( + chain: &ChainState, + expectations: Vec<(u64, Option>)>, +) { + for e in expectations { + let val = match chain.get_ver(b"test_key", e.0) { + Err(e) if e.to_string().contains("no versioning info") => None, + Ok(v) => v, + _ => { + panic!("failed at height {}", e.0); + } + }; + if val != e.1 { + println!( + "Error at {} expect: {:?} actual: {:?}", + e.0, + e.1.as_ref().and_then(|v| String::from_utf8(v.clone()).ok()), + val.as_ref().and_then(|v| String::from_utf8(v.clone()).ok()) + ); + } + assert_eq!(val, e.1); + } +} + +#[test] +fn test_get_ver_with_snapshots() { + let mut chain = gen_cs(110, 11); + + let operations = vec![ + (3, Some(b"test-val3".to_vec())), + (4, Some(b"test-val4".to_vec())), + (7, None), + (15, Some(b"test-val15".to_vec())), + ]; + apply_operations(&mut chain, operations, 50); + + let expectations = vec![ + (3, Some(b"test-val3".to_vec())), + (4, Some(b"test-val4".to_vec())), + (6, Some(b"test-val4".to_vec())), + (7, None), + (8, None), + (10, None), + (15, Some(b"test-val15".to_vec())), + (20, Some(b"test-val15".to_vec())), + ]; + verify_expectations(&chain, expectations); +} + +#[test] +fn test_get_ver_with_snapshots_2() { + for interval in 5..20 { + let mut chain = gen_cs(interval * 10, interval); + + let operations = vec![ + (3, Some(b"test-val3".to_vec())), + (4, Some(b"test-val4".to_vec())), + (7, None), + (15, Some(b"test-val15".to_vec())), + ]; + apply_operations(&mut chain, operations, 50); + + let expectations = vec![ + (3, Some(b"test-val3".to_vec())), + (4, Some(b"test-val4".to_vec())), + (6, Some(b"test-val4".to_vec())), + (7, None), + (8, None), + (10, None), + (15, Some(b"test-val15".to_vec())), + (20, Some(b"test-val15".to_vec())), + ]; + verify_expectations(&chain, expectations); + } +} + +#[test] +fn test_get_ver_with_snapshots_3() { + let mut chain = gen_cs(55, 11); + + let operations = vec![ + (3, Some(b"test-val3".to_vec())), + (4, Some(b"test-val4".to_vec())), + (60, None), + (77, Some(b"test-val77".to_vec())), + ]; + apply_operations(&mut chain, operations, 100); + + // min_height is 100 - 55 = 45 + let expectations = vec![ + (3, None), + (4, None), // squashed in base + (44, Some(b"test-val4".to_vec())), // in th base + (45, Some(b"test-val4".to_vec())), // in th ver_window + (60, None), + (61, None), + (77, Some(b"test-val77".to_vec())), + (80, Some(b"test-val77".to_vec())), + ]; + verify_expectations(&chain, expectations); +} + +#[test] +fn test_commit_at_zero() { + let mut chain = gen_cs(100, 0); + + let key = vec![0u8; 12]; + let val = vec![0u8; 12]; + + chain + .commit(vec![(key.clone(), Some(val.clone()))], 0, true) + .unwrap(); + chain.commit(vec![], 1, true).unwrap(); + chain.commit(vec![], 1, true).unwrap(); + + assert_eq!(chain.get(key.as_slice()).unwrap(), Some(val.clone())); + assert_eq!(chain.get_ver(key.as_slice(), 0).unwrap(), Some(val.clone())); + assert_eq!(chain.get_ver(key.as_slice(), 1).unwrap(), Some(val)); +} + +fn gen_findb_cs( + exist: Option, + ver_window: u64, + interval: u64, +) -> (String, ChainState) { + gen_findb_cs_v2(exist, ver_window, interval, false) +} + +fn gen_findb_cs_v2( + exist: Option, + ver_window: u64, + interval: u64, + cleanup_aux: bool, +) -> (String, ChainState) { + let path = exist.unwrap_or_else(|| { + let time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos(); + let mut path = temp_dir(); + path.push(format!("findb–{}", time)); + path.as_os_str().to_str().unwrap().to_string() + }); + + let fdb = FinDB::open(&path).unwrap_or_else(|_| panic!("Failed to open a findb at {}", path)); + let opts = ChainStateOpts { + name: Some("findb".to_string()), + ver_window, + interval, + cleanup_aux, + }; + + (path, ChainState::create_with_opts(fdb, opts)) +} + +fn commit_n(chain: &mut ChainState, n: u64) { + commit_range(chain, 0, n); +} + +fn commit_range(chain: &mut ChainState, s: u64, e: u64) { + let key = b"test_key".to_vec(); + for h in s..e { + let val = format!("val-{}", h); + let batch = vec![(key.clone(), Some(val.into_bytes()))]; + chain.commit(batch, h, false).unwrap(); + } +} + +fn compare_n(chain: &ChainState, s: u64, e: u64) { + let mut expectations = vec![]; + for h in s..e { + let val = format!("val-{}", h); + expectations.push((h, Some(val.into_bytes()))); + } + verify_expectations(chain, expectations); +} + +fn expect_same(chain: &ChainState, s: u64, e: u64, val: Option>) { + let mut expectations = vec![]; + for h in s..e { + expectations.push((h, val.clone())); + } + verify_expectations(chain, expectations); +} + +#[test] +fn test_chain_reload_with_snapshots() { + let (path, cs) = gen_findb_cs(None, 0, 0); + drop(cs); + let (path, cs) = gen_findb_cs(Some(path), 100, 10); + drop(cs); + let (path, cs) = gen_findb_cs(Some(path), 111, 3); + drop(cs); + let (path, cs) = gen_findb_cs(Some(path), 20, 4); + drop(cs); + let (_path, cs) = gen_findb_cs(Some(path), 0, 0); + drop(cs); +} + +#[test] +fn test_chain_reload_with_snapshots_1() { + println!("ver_window 100, interval 10"); + let (path, mut cs) = gen_findb_cs(None, 100, 10); + commit_n(&mut cs, 200); + expect_same(&cs, 0, 98, None); + compare_n(&cs, 98, 200); + expect_same(&cs, 200, 210, Some(format!("val-{}", 199).into_bytes())); + drop(cs); + + println!("ver_window 100, interval 5"); + let (path, cs) = gen_findb_cs(Some(path), 100, 5); + // current height 199, min_height 99 + // height 98 is in the base but not squashed + expect_same(&cs, 0, 98, None); + compare_n(&cs, 98, 200); + expect_same(&cs, 200, 210, Some(format!("val-{}", 199).into_bytes())); + drop(cs); + + println!("ver_window 111, interval 3"); + let (_, cs) = gen_findb_cs(Some(path), 111, 3); + // current height 199, ver_window 111 min_height 88 + // height 87 is in the base but not squashed + // but we don't have versioned keys before height 98 + expect_same(&cs, 0, 98, None); + compare_n(&cs, 98, 200); + expect_same(&cs, 200, 210, Some(format!("val-{}", 199).into_bytes())); + drop(cs); +} + +#[test] +fn test_chain_reload_with_snapshots_2() { + println!("ver_window 3333, interval 3"); + let (path, mut cs) = gen_findb_cs(None, 3333, 3); + commit_n(&mut cs, 4000); + drop(cs); + + println!("ver_window 3333, interval 11"); + let (_, cs) = gen_findb_cs(Some(path), 3333, 11); + expect_same(&cs, 0, 665, None); + compare_n(&cs, 666, 4000); + expect_same(&cs, 4000, 4010, Some(format!("val-{}", 3999).into_bytes())); + drop(cs); +} + +#[test] +fn test_chain_reload_with_snapshots_3() { + println!("ver_window 140, interval 7"); + let (path, mut cs) = gen_findb_cs(None, 140, 7); + // first commits + commit_n(&mut cs, 100); + drop(cs); + + println!("ver_window 140, interval 10"); + let (path, mut cs) = gen_findb_cs(Some(path), 140, 10); + // second commits + commit_range(&mut cs, 100, 200); + drop(cs); + + println!("ver_window 140, interval 14"); + let (_path, cs) = gen_findb_cs(Some(path), 140, 14); + expect_same(&cs, 0, 58, None); + compare_n(&cs, 59, 200); + expect_same(&cs, 200, 230, Some(format!("val-{}", 199).into_bytes())); + drop(cs); +} + +#[test] +fn test_chain_reload_with_snapshots_4() { + println!("ver_window 140, interval 10"); + let (path, mut cs) = gen_findb_cs(None, 140, 10); + // first commits + commit_n(&mut cs, 100); + drop(cs); + + println!("ver_window 140, interval 7"); + let (path, mut cs) = gen_findb_cs(Some(path), 140, 7); + // second commits + commit_range(&mut cs, 100, 200); + drop(cs); + + println!("ver_window 140, interval 0"); + let (path, cs) = gen_findb_cs(Some(path), 140, 0); + expect_same(&cs, 0, 58, None); + compare_n(&cs, 59, 200); + expect_same(&cs, 200, 230, Some(format!("val-{}", 199).into_bytes())); + drop(cs); + + std::fs::remove_dir_all(path).unwrap(); +} + +#[test] +// different ver_window with same interval +fn test_chain_reload_with_ver_window_1() { + println!("ver_window 100 interval 5"); + let (path, mut cs) = gen_findb_cs(None, 100, 5); + commit_n(&mut cs, 100); + // no base now + compare_n(&cs, 0, 100); + expect_same(&cs, 100, 130, Some(format!("val-{}", 99).into_bytes())); + assert_eq!(cs.get(b"test_key").unwrap(), Some(b"val-99".to_vec())); + drop(cs); + + println!("ver_window 90 interval 5"); + let (path, mut cs) = gen_findb_cs(Some(path), 90, 5); + // current height 99, min_height 9, base_height 8 + expect_same(&cs, 0, 8, None); + compare_n(&cs, 8, 100); + expect_same(&cs, 100, 130, Some(format!("val-{}", 99).into_bytes())); + assert_eq!(cs.get(b"test_key").unwrap(), Some(b"val-99".to_vec())); + + // advance forward + commit_range(&mut cs, 100, 120); + expect_same(&cs, 0, 28, None); + compare_n(&cs, 28, 120); + expect_same(&cs, 120, 130, Some(format!("val-{}", 119).into_bytes())); + drop(cs); + + println!("ver_window 110 interval 5"); + let (path, mut cs) = gen_findb_cs(Some(path), 110, 5); + expect_same(&cs, 0, 28, None); + compare_n(&cs, 28, 120); + expect_same(&cs, 120, 130, Some(format!("val-{}", 119).into_bytes())); + + // advance forward + commit_range(&mut cs, 120, 150); + // current height 149, min_height 39, base_height 38 + expect_same(&cs, 0, 38, None); + compare_n(&cs, 38, 150); + expect_same(&cs, 150, 151, Some(format!("val-{}", 149).into_bytes())); + + drop(cs); + + std::fs::remove_dir_all(path).unwrap(); +} + +#[test] +// same ver_window with different interval +fn test_chain_reload_with_ver_window_2() { + println!("ver_window 100 interval 5"); + let (path, mut cs) = gen_findb_cs(None, 100, 5); + commit_n(&mut cs, 100); + // no base now + compare_n(&cs, 0, 100); + expect_same(&cs, 100, 130, Some(format!("val-{}", 99).into_bytes())); + assert_eq!(cs.get(b"test_key").unwrap(), Some(b"val-99".to_vec())); + drop(cs); + + println!("ver_window 100 interval 2"); + let (path, cs) = gen_findb_cs(Some(path), 100, 2); + // no base now + compare_n(&cs, 0, 100); + expect_same(&cs, 100, 130, Some(format!("val-{}", 99).into_bytes())); + assert_eq!(cs.get(b"test_key").unwrap(), Some(b"val-99".to_vec())); + drop(cs); + + println!("ver_window 100 interval 20"); + let (path, mut cs) = gen_findb_cs(Some(path), 100, 20); + // no base now + compare_n(&cs, 0, 100); + expect_same(&cs, 100, 130, Some(format!("val-{}", 99).into_bytes())); + assert_eq!(cs.get(b"test_key").unwrap(), Some(b"val-99".to_vec())); + + // move forward + commit_range(&mut cs, 100, 120); + // current_height 119, min_height 19, base_height 18 + expect_same(&cs, 0, 18, None); + compare_n(&cs, 18, 120); + expect_same(&cs, 120, 130, Some(format!("val-{}", 119).into_bytes())); + + drop(cs); + + std::fs::remove_dir_all(path).unwrap(); +} + +#[test] +// different ver_window with different interval +fn test_chain_reload_with_ver_window_3() { + println!("ver_window 100 interval 5"); + let (path, mut cs) = gen_findb_cs(None, 100, 5); + commit_n(&mut cs, 100); + // no base now + compare_n(&cs, 0, 100); + expect_same(&cs, 100, 130, Some(format!("val-{}", 99).into_bytes())); + assert_eq!(cs.get(b"test_key").unwrap(), Some(b"val-99".to_vec())); + drop(cs); + + println!("ver_window 150 interval 3"); + let (path, mut cs) = gen_findb_cs(Some(path), 150, 3); + // no base now + compare_n(&cs, 0, 100); + expect_same(&cs, 100, 130, Some(format!("val-{}", 99).into_bytes())); + assert_eq!(cs.get(b"test_key").unwrap(), Some(b"val-99".to_vec())); + + // move forward + commit_range(&mut cs, 100, 160); + // current_height 159 min_height 9 base_height 8 + expect_same(&cs, 0, 8, None); + compare_n(&cs, 8, 160); + expect_same(&cs, 160, 165, Some(format!("val-{}", 159).into_bytes())); + drop(cs); + + println!("ver_window 120 interval 12"); + let (path, mut cs) = gen_findb_cs(Some(path), 120, 12); + // current_height 159 min_height 39 base_height 38 + expect_same(&cs, 0, 38, None); + compare_n(&cs, 38, 160); + expect_same(&cs, 160, 165, Some(format!("val-{}", 159).into_bytes())); + + // move forward + commit_range(&mut cs, 160, 170); + // current_height 169 min_height 49 base_height 48 + expect_same(&cs, 0, 48, None); + compare_n(&cs, 48, 170); + expect_same(&cs, 170, 175, Some(format!("val-{}", 169).into_bytes())); + + drop(cs); + + std::fs::remove_dir_all(path).unwrap(); +} + +#[test] +fn test_chain_no_version() { + println!("ver_window 0 interval 0"); + let (path, mut cs) = gen_findb_cs_v2(None, 0, 0, false); + // key: b"test-key" + commit_n(&mut cs, 99); + + let val = format!("val-{}", 99); + let batch = vec![ + // this key will miss when changing to versioned-chain + (b"another_test_key".to_vec(), Some(val.clone().into_bytes())), + (b"test_key".to_vec(), Some(val.into_bytes())), + ]; + cs.commit(batch, 99, false).unwrap(); + + assert!(cs.get_ver(b"test_key", 10).is_err()); + assert_eq!(cs.get(b"test_key").unwrap(), Some(b"val-99".to_vec())); + drop(cs); + + println!("ver_window 100 interval 5"); + let (path, mut cs) = gen_findb_cs_v2(Some(path), 100, 5, true); + // current_height 99 base_height 99 + expect_same(&cs, 0, 99, None); + assert_eq!( + cs.get_ver(b"test_key", 99).unwrap(), + Some(b"val-99".to_vec()) + ); + assert_eq!( + cs.get_ver(b"another_test_key", 99).unwrap(), + Some(b"val-99".to_vec()) + ); + assert_eq!(cs.get(b"test_key").unwrap(), Some(b"val-99".to_vec())); + // move forward + commit_range(&mut cs, 100, 150); + // current_height 149, base_height 99 + assert_eq!( + cs.get_ver(b"test_key", 100).unwrap(), + Some(b"val-100".to_vec()) + ); + assert_eq!( + cs.get_ver(b"another_test_key", 100).unwrap(), + Some(b"val-99".to_vec()) + ); + + drop(cs); + + std::fs::remove_dir_all(path).unwrap(); +} + +#[test] +fn test_chain_no_version_1() { + println!("ver_window 100 interval 5"); + let (path, mut cs) = gen_findb_cs_v2(None, 100, 5, false); + commit_n(&mut cs, 120); + // current_height 119, base_height 18 + expect_same(&cs, 0, 18, None); + compare_n(&cs, 18, 120); + expect_same(&cs, 120, 130, Some(b"val-119".to_vec())); + drop(cs); + + println!("ver_window 0 interval 0"); + let (path, mut cs) = gen_findb_cs_v2(Some(path), 0, 0, false); + commit_range(&mut cs, 120, 130); + drop(cs); + + // cleanup aux and reconstruct base + let (path, cs) = gen_findb_cs_v2(Some(path), 100, 5, true); + // current_height 129 base_height 129 + expect_same(&cs, 0, 129, None); + compare_n(&cs, 129, 130); + expect_same(&cs, 130, 140, Some(b"val-129".to_vec())); + drop(cs); + + std::fs::remove_dir_all(path).unwrap(); +} diff --git a/storage/tests/state.rs b/storage/tests/state.rs index d808e62..50049cd 100644 --- a/storage/tests/state.rs +++ b/storage/tests/state.rs @@ -1,9 +1,11 @@ use fin_db::FinDB; +use mem_db::MemoryDB; +use parking_lot::RwLock; use rand::Rng; -use std::thread; +use std::{sync::Arc, thread}; use storage::{ db::{IterOrder, KVBatch, KValue, MerkleDB}, - state::ChainState, + state::{ChainState, ChainStateOpts, State}, store::Prefix, }; use temp_db::{TempFinDB, TempRocksDB}; @@ -22,6 +24,18 @@ fn gen_cs_rocks(path: String) -> ChainState { ChainState::new(fdb, "test_db".to_string(), 0) } +/// create chain state of `RocksDB` +fn gen_cs_rocks_fresh(path: String) -> ChainState { + let fdb = TempRocksDB::open(path).expect("failed to open rocksdb"); + let opts = ChainStateOpts { + name: Some("test_db".to_string()), + ver_window: 0, + interval: 0, + cleanup_aux: true, + }; + ChainState::create_with_opts(fdb, opts) +} + #[test] fn test_new_chain_state() { let path = thread::current().name().unwrap().to_owned(); @@ -435,6 +449,44 @@ fn test_prune_aux_batch() { } } +#[test] +fn test_height_internal_to_base() { + let path = thread::current().name().unwrap().to_owned(); + let fdb = TempFinDB::open(path).expect("failed to open db"); + let mut cs = ChainState::new(fdb, "test_db".to_string(), 100); + + let batch_size = 7; + let mut batch: KVBatch = KVBatch::new(); + for j in 0..batch_size { + let key = format!("key-{}", j); + let val = format!("val-{}", j); + batch.push((Vec::from(key), Some(Vec::from(val)))); + } + + let _ = cs.commit(batch, 10, false); + for k in 0..batch_size { + let key = ChainState::::versioned_key(format!("key-{}", k).as_bytes(), 10); + let value = format!("val-{}", k); + // println!("versioned_key:{:?}, versioned_value:{:?}", std::str::from_utf8(&key).unwrap(), value); + assert_eq!( + cs.get_aux(key.as_slice()).unwrap().unwrap().as_slice(), + value.as_bytes() + ) + } + + cs.height_internal_to_base(10).unwrap(); + for k in 0..batch_size { + let key = ChainState::::base_key(format!("key-{}", k).as_bytes()); + let value = format!("val-{}", k); + // println!("height_internal_to_base_key:{:?}", std::str::from_utf8(&key).unwrap()); + assert_eq!( + cs.get_aux(key.as_slice()).unwrap().unwrap().as_slice(), + value.as_bytes() + ) + } + assert_eq!(cs.get_aux(b"BaseHeight").unwrap().unwrap(), b"10".to_vec()); +} + #[test] fn test_build_state() { let path = thread::current().name().unwrap().to_owned(); @@ -540,6 +592,97 @@ fn test_clean_aux_db() { } } +#[test] +#[should_panic] +fn test_clean_aux() { + // test FinDB + let path_base = thread::current().name().unwrap().to_owned(); + let mut fin_path = path_base.clone(); + fin_path.push_str("fin"); + let mut fdb = FinDB::open(fin_path).unwrap(); + fdb.commit(vec![(b"k11".to_vec(), Some(b"v11".to_vec()))], false) + .unwrap(); + assert_eq!(fdb.get_aux(b"k11").unwrap().unwrap(), b"v11".to_vec()); + fdb.clean_aux().unwrap(); + assert_eq!(fdb.get_aux(b"k11").unwrap(), None); + + // test TempFinDB + let mut tfin_path = path_base.clone(); + tfin_path.push_str("tfin"); + let mut tfdb = TempFinDB::open(tfin_path).unwrap(); + tfdb.commit(vec![(b"k11".to_vec(), Some(b"v11".to_vec()))], false) + .unwrap(); + assert_eq!(tfdb.get_aux(b"k11").unwrap().unwrap(), b"v11".to_vec()); + tfdb.clean_aux().unwrap(); + assert_eq!(tfdb.get_aux(b"k11").unwrap(), None); + + // // test RocksDB + // let mut rocks_path = path_base.clone(); + // rocks_path.push_str("rocks"); + // let mut rdb = RocksDB::open(rocks_path).unwrap(); + // rdb.commit(vec![(b"k11".to_vec(), Some(b"v11".to_vec()))], false) + // .unwrap(); + // assert_eq!(rdb.get_aux(b"k11").unwrap().unwrap(), b"v11".to_vec()); + // rdb.clean_aux().unwrap(); + // assert_eq!(rdb.get_aux(b"k11").unwrap(), None); + + // // test TempRocksDB + // let mut trocks_path = path_base.clone(); + // trocks_path.push_str("trocks"); + // let mut trdb = TempRocksDB::open(trocks_path).expect("failed to open db"); + // trdb.commit(vec![(b"k11".to_vec(), Some(b"v11".to_vec()))], false) + // .unwrap(); + // assert_eq!(trdb.get_aux(b"k11").unwrap().unwrap(), b"v11".to_vec()); + // trdb.clean_aux().unwrap(); + // assert_eq!(trdb.get_aux(b"k11").unwrap(), None); + + // test MemoryDB + let mut mdb = MemoryDB::new(); + mdb.commit(vec![(b"height".to_vec(), Some(b"100".to_vec()))], false) + .unwrap(); + assert_eq!(mdb.get_aux(b"height").unwrap().unwrap(), b"100".to_vec()); + mdb.clean_aux().unwrap(); + assert_eq!(mdb.get_aux(b"k11").unwrap(), None); + + // test ChainState on FinDB + let mut cs_fn_path = path_base.clone(); + cs_fn_path.push_str("cs_fin"); + let mut cs_tfdb = gen_cs(cs_fn_path); + cs_tfdb + .commit(vec![(b"k10".to_vec(), Some(b"v10".to_vec()))], 25, true) + .unwrap(); + assert_eq!( + cs_tfdb.get_aux(&b"Height".to_vec()).unwrap(), + Some(b"25".to_vec()) + ); + cs_tfdb.clean_aux().unwrap(); + assert_eq!( + cs_tfdb.get_aux(&b"Height".to_vec()).unwrap(), + Some(b"25".to_vec()) + ); + std::mem::drop(cs_tfdb); + let mut cs_fin_path = path_base.clone(); + cs_fin_path.push_str("cs_fin"); + let _ = gen_cs_rocks_fresh(cs_fin_path); + + // // test ChainState on RocksDB + // let mut cs_rocks_path = path_base.clone(); + // cs_rocks_path.push_str("cs_rocks"); + // let mut cs_rocks = gen_cs_rocks(cs_rocks_path); + // cs_rocks + // .commit(vec![(b"k10".to_vec(), Some(b"v10".to_vec()))], 25, true) + // .unwrap(); + // assert_eq!( + // cs_rocks.get_aux(&b"Height".to_vec()).unwrap(), + // Some(b"25".to_vec()) + // ); + // std::mem::drop(cs_rocks); + // let mut cs_rocks_path = path_base.clone(); + // cs_rocks_path.push_str("cs_rocks"); + // let cs_rocks = gen_cs_rocks_fresh(cs_rocks_path); + // assert_eq!(cs_rocks.get_aux(&b"Height".to_vec()).unwrap(), None); +} + #[test] fn test_get_ver() { //Create new Chain State with new database @@ -732,3 +875,107 @@ fn test_snapshot() { let snap_path_1 = format!("{}_{}_snap", path, 1); let _ = TempFinDB::open(snap_path_1).expect("failed to open db snapshot"); } + +#[test] +fn test_state_at() { + let fdb = TempFinDB::new().expect("failed to create fin db"); + let chain = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string(), 2))); + let state = State::new(chain.clone(), true); + + assert!(chain + .write() + .commit( + vec![ + (b"k10".to_vec(), Some(b"v110".to_vec())), + (b"k20".to_vec(), Some(b"v120".to_vec())), + ], + 1, + true, + ) + .is_ok()); + + assert!(chain + .write() + .commit( + vec![ + (b"k10".to_vec(), Some(b"v210".to_vec())), + (b"k20".to_vec(), Some(b"v220".to_vec())), + ], + 2, + true, + ) + .is_ok()); + + let state_1 = state + .state_at(1) + .expect("failed to create state at height 1"); + + let state_2 = state + .state_at(2) + .expect("failed to create state at height 2"); + + assert!(chain + .write() + .commit( + vec![ + (b"k10".to_vec(), Some(b"v310".to_vec())), + (b"k20".to_vec(), Some(b"v320".to_vec())), + ], + 3, + true, + ) + .is_ok()); + + assert!(state_1 + .get(b"k10") + .map_or(false, |v| v == Some(b"v110".to_vec()))); + + drop(state_1); + + assert!(state + .get(b"k10") + .map_or(false, |v| v == Some(b"v310".to_vec()))); + + assert!(state_2 + .get(b"k10") + .map_or(false, |v| v == Some(b"v210".to_vec()))); + drop(state_2); + + assert!(chain + .write() + .commit( + vec![ + (b"k10".to_vec(), Some(b"v410".to_vec())), + (b"k20".to_vec(), Some(b"v420".to_vec())), + ], + 4, + true, + ) + .is_ok()); + + assert!(state + .get_ver(b"k10", 1) + .map_or(false, |v| v == Some(b"v110".to_vec()))); + assert!(state + .get_ver(b"k10", 2) + .map_or(false, |v| v == Some(b"v210".to_vec()))); + + // Keys at height 2 are moved to base after this commit + assert!(chain + .write() + .commit( + vec![ + (b"k10".to_vec(), Some(b"v510".to_vec())), + (b"k20".to_vec(), Some(b"v520".to_vec())), + ], + 5, + true, + ) + .is_ok()); + + // Keys at height 1 is in base now and override by height 2 + assert!(state.get_ver(b"k10", 1).is_err()); + assert!(state + .get_ver(b"k10", 2) + .map_or(false, |v| v == Some(b"v210".to_vec()))); +} diff --git a/temp_db/Cargo.toml b/temp_db/Cargo.toml index 7839f13..44d04f3 100644 --- a/temp_db/Cargo.toml +++ b/temp_db/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] ruc = "1.0" -fmerk = "0.1" +fmerk = { git = "https://github.com/FindoraNetwork/fmerk.git", tag = "v2.1.1"} storage = { path = "../storage", version = "0.2" } fin_db = { path = "../fin_db", version = "0.2" } diff --git a/temp_db/src/fin.rs b/temp_db/src/fin.rs index 94d1834..def2868 100644 --- a/temp_db/src/fin.rs +++ b/temp_db/src/fin.rs @@ -58,7 +58,9 @@ impl MerkleDB for TempFinDB { fn iter_aux(&self, lower: &[u8], upper: &[u8], order: IterOrder) -> DbIter<'_> { self.deref().iter_aux(lower, upper, order) } - + fn db_all_iterator(&self, order: IterOrder) -> DbIter<'_>{ + self.deref().db_all_iterator(order) + } fn commit(&mut self, aux: KVBatch, flush: bool) -> Result<()> { self.deref_mut().commit(aux, flush) } @@ -70,6 +72,10 @@ impl MerkleDB for TempFinDB { fn decode_kv(&self, kv_pair: (Box<[u8]>, Box<[u8]>)) -> KValue { self.deref().decode_kv(kv_pair) } + + fn clean_aux(&mut self) -> Result<()> { + self.deref_mut().clean_aux() + } } impl Deref for TempFinDB { diff --git a/temp_db/src/rocks.rs b/temp_db/src/rocks.rs index 773b1e3..4d668fb 100644 --- a/temp_db/src/rocks.rs +++ b/temp_db/src/rocks.rs @@ -58,7 +58,9 @@ impl MerkleDB for TempRocksDB { fn iter_aux(&self, lower: &[u8], upper: &[u8], order: IterOrder) -> DbIter<'_> { self.deref().iter(lower, upper, order) } - + fn db_all_iterator(&self, order: IterOrder) -> DbIter<'_>{ + self.deref().db_all_iterator(order) + } fn commit(&mut self, kvs: KVBatch, flush: bool) -> Result<()> { self.deref_mut().commit(kvs, flush) } @@ -70,6 +72,10 @@ impl MerkleDB for TempRocksDB { fn decode_kv(&self, kv_pair: (Box<[u8]>, Box<[u8]>)) -> KValue { self.deref().decode_kv(kv_pair) } + + fn clean_aux(&mut self) -> Result<()> { + self.deref_mut().clean_aux() + } } impl Deref for TempRocksDB {