Skip to content

Commit

Permalink
feat: load number of pages from disk in merge strategy (#1248)
Browse files Browse the repository at this point in the history
In order to create merge strategy, we need to know for each `PageMap`
its size on disk and in memory. We used to propagate the in memory size
from DSM thread. It is complicated code wise and makes the lazy
`PageMaps` loading harder.
This change loads pagemaps in the `TipThread` in parallel to get the
in-memory size.
  • Loading branch information
pakhomov-dfinity authored Sep 6, 2024
1 parent e40a730 commit 656d7a6
Show file tree
Hide file tree
Showing 16 changed files with 226 additions and 111 deletions.
7 changes: 6 additions & 1 deletion Cargo.Bazel.Fuzzing.json.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"checksum": "323662bf314beb68f50ba1391490209302c5a558ddf4886bd46056e6ca7bcded",
"checksum": "d5df3c4abc15a738eace4dc09b45a30daafdb0323821fdd79230310a49331ded",
"crates": {
"abnf 0.12.0": {
"name": "abnf",
Expand Down Expand Up @@ -18580,6 +18580,10 @@
"id": "ssh2 0.9.4",
"target": "ssh2"
},
{
"id": "static_assertions 1.1.0",
"target": "static_assertions"
},
{
"id": "strum 0.26.2",
"target": "strum"
Expand Down Expand Up @@ -78069,6 +78073,7 @@
"slog-term 2.9.1",
"socket2 0.5.7",
"ssh2 0.9.4",
"static_assertions 1.1.0",
"strum 0.26.2",
"strum_macros 0.26.2",
"stubborn-io 0.3.2",
Expand Down
1 change: 1 addition & 0 deletions Cargo.Bazel.Fuzzing.toml.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3238,6 +3238,7 @@ dependencies = [
"slog-term",
"socket2 0.5.7",
"ssh2",
"static_assertions",
"strum 0.26.2",
"strum_macros 0.26.2",
"stubborn-io",
Expand Down
7 changes: 6 additions & 1 deletion Cargo.Bazel.json.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"checksum": "f03bd2f4387f522e48344c064f7dc4ae070df531bdf960f3e7c1b1048a9738d9",
"checksum": "6814fe01f53a8d40665d111193d917353581d59aafc6b27cfe2accd176331ca2",
"crates": {
"abnf 0.12.0": {
"name": "abnf",
Expand Down Expand Up @@ -18381,6 +18381,10 @@
"id": "ssh2 0.9.4",
"target": "ssh2"
},
{
"id": "static_assertions 1.1.0",
"target": "static_assertions"
},
{
"id": "strum 0.26.2",
"target": "strum"
Expand Down Expand Up @@ -78289,6 +78293,7 @@
"slog-term 2.9.1",
"socket2 0.5.7",
"ssh2 0.9.4",
"static_assertions 1.1.0",
"strum 0.26.2",
"strum_macros 0.26.2",
"stubborn-io 0.3.2",
Expand Down
1 change: 1 addition & 0 deletions Cargo.Bazel.toml.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3227,6 +3227,7 @@ dependencies = [
"slog-term",
"socket2 0.5.7",
"ssh2",
"static_assertions",
"strum 0.26.2",
"strum_macros 0.26.2",
"stubborn-io",
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ slog-async = { version = "2.8.0", features = ["nested-values"] }
slog-scope = "4.4.0"
slog-term = "2.9.1"
socket2 = { version = "0.5.7", features = ["all"] }
static_assertions = "1.1.0"
strum = { version = "0.26.2", features = ["derive"] }
strum_macros = "0.26.2"
syn = { version = "1.0.109", features = ["fold", "full"] }
Expand Down
3 changes: 3 additions & 0 deletions bazel/external_crates.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,9 @@ def external_crates_repository(name, cargo_lockfile, lockfile, sanitizers_enable
"ssh2": crate.spec(
version = "0.9.4",
),
"static_assertions": crate.spec(
version = "1.1.0",
),
"strum": crate.spec(
version = "^0.26.2",
features = [
Expand Down
1 change: 1 addition & 0 deletions rs/replicated_state/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ DEPENDENCIES = [
"@crate_index//:rayon",
"@crate_index//:serde",
"@crate_index//:slog",
"@crate_index//:static_assertions",
"@crate_index//:strum",
"@crate_index//:tempfile",
"@crate_index//:uuid",
Expand Down
1 change: 1 addition & 0 deletions rs/replicated_state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ rand_chacha = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
slog = { workspace = true }
static_assertions = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
tempfile = { workspace = true }
Expand Down
111 changes: 91 additions & 20 deletions rs/replicated_state/src/page_map/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::{
collections::{BTreeMap, BTreeSet},
fs::{File, OpenOptions},
io::Write,
io::{Read, Seek, SeekFrom, Write},
ops::Range,
path::{Path, PathBuf},
sync::Arc,
Expand Down Expand Up @@ -822,7 +822,7 @@ fn check_mapping_correctness(mapping: &Mapping, path: &Path) -> Result<(), Persi
/// 50GiB only contains the last page, we would have only the shard number 7.
pub struct ShardTag {}
pub type Shard = AmountOf<ShardTag, u64>;
pub type StorageResult<T> = Result<T, Box<dyn std::error::Error>>;
pub type StorageResult<T> = Result<T, Box<dyn std::error::Error + Send>>;

/// Provide information from `StateLayout` about paths of a specific `PageMap`.
pub trait StorageLayout {
Expand All @@ -836,27 +836,51 @@ pub trait StorageLayout {
fn existing_overlays(&self) -> StorageResult<Vec<PathBuf>>;

/// Get the height of an existing overlay path.
fn overlay_height(&self, overlay: &Path) -> Result<Height, Box<dyn std::error::Error>>;
fn overlay_height(&self, overlay: &Path) -> StorageResult<Height>;

/// Get the shard of an existing overlay path.
fn overlay_shard(&self, overlay: &Path) -> Result<Shard, Box<dyn std::error::Error>>;
fn overlay_shard(&self, overlay: &Path) -> StorageResult<Shard>;
}

impl dyn StorageLayout + '_ {
pub fn storage_size(&self) -> StorageResult<u64> {
pub fn storage_size_bytes(&self) -> StorageResult<u64> {
let mut result = 0;
for path in self.existing_files()? {
result += std::fs::metadata(&path)
.map_err(|err: _| PersistenceError::FileSystemError {
path: path.display().to_string(),
context: format!("Failed get existing file length: {}", path.display()),
internal_error: err.to_string(),
.map_err(|err: _| {
Box::new(PersistenceError::FileSystemError {
path: path.display().to_string(),
context: format!("Failed get existing file length: {}", path.display()),
internal_error: err.to_string(),
}) as Box<dyn std::error::Error + Send>
})?
.len();
}
Ok(result)
}

/// Number of pages required to load the PageMap into memory.
/// Implementation ignores the zero pages that we don't load, so it's the index of last page + 1.
pub fn memory_size_pages(&self) -> StorageResult<usize> {
let mut result = 0;
if let Some(base) = self.existing_base() {
result = (std::fs::metadata(&base)
.map_err(|err: _| {
Box::new(PersistenceError::FileSystemError {
path: base.display().to_string(),
context: format!("Failed get existing file length: {}", base.display()),
internal_error: err.to_string(),
}) as Box<dyn std::error::Error + Send>
})?
.len() as usize)
/ PAGE_SIZE;
}
for overlay in self.existing_overlays()? {
result = std::cmp::max(result, Self::num_overlay_logical_pages(&overlay)?);
}
debug_assert_eq!(result, Storage::load(self).unwrap().num_logical_pages());
Ok(result)
}
fn existing_base(&self) -> Option<PathBuf> {
if self.base().exists() {
Some(self.base().to_path_buf())
Expand Down Expand Up @@ -884,6 +908,48 @@ impl dyn StorageLayout + '_ {
}
Ok(result)
}

// Read the number of memory pages from overlay.
// Basically it's the index of the last page, which we read based on the offset from the end of
// the file plus some error handling.
fn num_overlay_logical_pages(overlay: &Path) -> StorageResult<usize> {
let to_storage_err = |err: std::io::Error| -> Box<dyn std::error::Error + Send> {
Box::new(PersistenceError::FileSystemError {
path: overlay.display().to_string(),
context: "Failed to get number of memory pages".to_string(),
internal_error: err.to_string(),
}) as Box<dyn std::error::Error + Send>
};

let mut file = OpenOptions::new()
.read(true)
.open(overlay)
.map_err(to_storage_err)?;

let mut version_buf = [0u8; VERSION_NUM_BYTES];
file.seek(SeekFrom::End(-(VERSION_NUM_BYTES as i64)))
.map_err(to_storage_err)?;
file.read_exact(&mut version_buf).map_err(to_storage_err)?;
static_assertions::const_assert_eq!(MAX_SUPPORTED_OVERLAY_VERSION as u32, 0);
let version = u32::from_le_bytes(version_buf);
if version > MAX_SUPPORTED_OVERLAY_VERSION as u32 {
return Err(Box::new(PersistenceError::VersionMismatch {
path: overlay.display().to_string(),
file_version: version,
supported: MAX_SUPPORTED_OVERLAY_VERSION,
}) as Box<dyn std::error::Error + Send>);
}

let mut last_page_index_range_buf = [[0u8; 8]; 3];
file.seek(SeekFrom::End(
-((VERSION_NUM_BYTES + SIZE_NUM_BYTES + PAGE_INDEX_RANGE_NUM_BYTES) as i64),
))
.map_err(to_storage_err)?;
file.read_exact(last_page_index_range_buf.as_flattened_mut())
.map_err(to_storage_err)?;
let last_page_index_range = PageIndexRange::from(&last_page_index_range_buf);
Ok(last_page_index_range.end_page.get() as usize)
}
}

/// Whether to merge into a base file or an overlay.
Expand Down Expand Up @@ -1000,7 +1066,7 @@ impl MergeCandidate {
num_pages: u64,
lsmt_config: &LsmtConfig,
metrics: &StorageMetrics,
) -> Result<Vec<MergeCandidate>, Box<dyn std::error::Error>> {
) -> StorageResult<Vec<MergeCandidate>> {
if layout.base().exists() && num_pages > lsmt_config.shard_num_pages {
Self::split_to_shards(layout, height, num_pages, lsmt_config)
} else {
Expand All @@ -1012,13 +1078,13 @@ impl MergeCandidate {
pub fn merge_to_base(
layout: &dyn StorageLayout,
num_pages: u64,
) -> Result<Option<MergeCandidate>, Box<dyn std::error::Error>> {
) -> StorageResult<Option<MergeCandidate>> {
let existing_overlays = layout.existing_overlays()?;
let base_path = layout.base();
if existing_overlays.is_empty() {
Ok(None)
} else {
let storage_size = layout.storage_size()?;
let storage_size = layout.storage_size_bytes()?;
Ok(Some(MergeCandidate {
overlays: existing_overlays.to_vec(),
base: if base_path.exists() {
Expand Down Expand Up @@ -1135,7 +1201,7 @@ impl MergeCandidate {
height: Height,
num_pages: u64,
lsmt_config: &LsmtConfig,
) -> Result<Vec<MergeCandidate>, Box<dyn std::error::Error>> {
) -> StorageResult<Vec<MergeCandidate>> {
let dst_overlays: Vec<_> = (0..num_shards(num_pages, lsmt_config))
.map(|shard| layout.overlay(height, Shard::new(shard)).to_path_buf())
.collect();
Expand All @@ -1154,7 +1220,7 @@ impl MergeCandidate {
} else {
None
};
let storage_size = layout.storage_size()?;
let storage_size = layout.storage_size_bytes()?;
Ok(vec![MergeCandidate {
overlays: layout.existing_overlays()?,
base,
Expand All @@ -1178,7 +1244,7 @@ impl MergeCandidate {
num_pages: u64,
lsmt_config: &LsmtConfig,
metrics: &StorageMetrics,
) -> Result<Vec<MergeCandidate>, Box<dyn std::error::Error>> {
) -> StorageResult<Vec<MergeCandidate>> {
let existing_base = layout.existing_base();

let mut result = Vec::new();
Expand Down Expand Up @@ -1208,14 +1274,19 @@ impl MergeCandidate {
.iter()
.map(|path| {
Ok(std::fs::metadata(path)
.map_err(|err: _| PersistenceError::FileSystemError {
path: path.display().to_string(),
context: format!("Failed get existing file length: {}", path.display()),
internal_error: err.to_string(),
.map_err(|err: _| {
Box::new(PersistenceError::FileSystemError {
path: path.display().to_string(),
context: format!(
"Failed get existing file length: {}",
path.display()
),
internal_error: err.to_string(),
}) as Box<dyn std::error::Error + Send>
})?
.len())
})
.collect::<Result<_, PersistenceError>>()?;
.collect::<StorageResult<_>>()?;
let existing_overlays = &existing_files[existing_base.iter().len()..];

metrics
Expand Down
12 changes: 9 additions & 3 deletions rs/replicated_state/src/page_map/storage/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,12 @@ fn storage_files(dir: &Path) -> StorageFiles {

/// Verify that the storage in the `dir` directory is equivalent to `expected`.
fn verify_storage(dir: &Path, expected: &PageDelta) {
let storage = Storage::load(&ShardedTestStorageLayout {
let storage_layout = ShardedTestStorageLayout {
dir_path: dir.to_path_buf(),
base: dir.join("vmemory_0.bin"),
overlay_suffix: "vmemory_0.overlay".to_owned(),
})
.unwrap();
};
let storage = Storage::load(&storage_layout).unwrap();

let expected_num_pages = if let Some(max) = expected.max_page_index() {
max.get() + 1
Expand All @@ -373,6 +373,12 @@ fn verify_storage(dir: &Path, expected: &PageDelta) {

// Verify `num_logical_pages`.
assert_eq!(expected_num_pages, storage.num_logical_pages() as u64);
assert_eq!(
expected_num_pages as usize,
(&storage_layout as &dyn StorageLayout)
.memory_size_pages()
.unwrap()
);

// Verify every single page in the range.
for index in 0..storage.num_logical_pages() as u64 {
Expand Down
14 changes: 7 additions & 7 deletions rs/replicated_state/src/page_map/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::page_map::storage::{Shard, StorageLayout};
use crate::page_map::storage::{Shard, StorageLayout, StorageResult};
use ic_types::Height;
use std::path::{Path, PathBuf};

Expand All @@ -16,13 +16,13 @@ impl StorageLayout for TestStorageLayout {
assert_eq!(shard.get(), 0);
self.overlay_dst.clone()
}
fn existing_overlays(&self) -> Result<Vec<PathBuf>, Box<dyn std::error::Error>> {
fn existing_overlays(&self) -> StorageResult<Vec<PathBuf>> {
Ok(self.existing_overlays.clone())
}
fn overlay_height(&self, _path: &Path) -> Result<Height, Box<dyn std::error::Error>> {
fn overlay_height(&self, _path: &Path) -> StorageResult<Height> {
unimplemented!()
}
fn overlay_shard(&self, _path: &Path) -> Result<Shard, Box<dyn std::error::Error>> {
fn overlay_shard(&self, _path: &Path) -> StorageResult<Shard> {
unimplemented!()
}
}
Expand Down Expand Up @@ -53,7 +53,7 @@ impl StorageLayout for ShardedTestStorageLayout {
self.overlay_suffix
))
}
fn existing_overlays(&self) -> Result<Vec<PathBuf>, Box<dyn std::error::Error>> {
fn existing_overlays(&self) -> StorageResult<Vec<PathBuf>> {
let mut result: Vec<_> = std::fs::read_dir(&self.dir_path)
.unwrap()
.filter(|entry| {
Expand All @@ -72,13 +72,13 @@ impl StorageLayout for ShardedTestStorageLayout {
result.sort_unstable();
Ok(result)
}
fn overlay_height(&self, path: &Path) -> Result<Height, Box<dyn std::error::Error>> {
fn overlay_height(&self, path: &Path) -> StorageResult<Height> {
let file = path.file_name().unwrap();
Ok(Height::from(
file.to_str().unwrap()[0..6].parse::<u64>().unwrap(),
))
}
fn overlay_shard(&self, path: &Path) -> Result<Shard, Box<dyn std::error::Error>> {
fn overlay_shard(&self, path: &Path) -> StorageResult<Shard> {
let file = path.file_name().unwrap();
Ok(Shard::from(
file.to_str().unwrap()[7..10].parse::<u64>().unwrap(),
Expand Down
Loading

0 comments on commit 656d7a6

Please sign in to comment.