Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force alignment for all chunk buffers #225

Merged
merged 5 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ loom = { version = "0.5.1", optional = true }
siphasher = "0.3.10"

[dev-dependencies]
env_logger = "0.10.0"
env_logger = { version = "0.10.0", default-features = false, features = ["auto-color", "humantime"] }
fdlimit = "0.2.1"
rand = { version = "0.8.2", features = ["small_rng"] }
tempfile = "3.2"
Expand Down
2 changes: 1 addition & 1 deletion admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2018"

[dependencies]
clap = { version = "4", features = ["derive"] }
env_logger = "0.10.0"
env_logger = { version = "0.10.0", default-features = false, features = ["auto-color", "humantime"] }
fdlimit = "0.2.1"
log = "0.4.8"
parity-db = { path = ".." }
Expand Down
2 changes: 1 addition & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Borrow<[u8]> for RcValue {

impl Borrow<Vec<u8>> for RcValue {
fn borrow(&self) -> &Vec<u8> {
self.value().borrow()
self.value()
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub fn madvise_random(_map: &mut memmap2::MmapMut) {}
fn mmap(file: &std::fs::File, len: usize) -> Result<memmap2::MmapMut> {
#[cfg(not(test))]
const RESERVE_ADDRESS_SPACE: usize = 1024 * 1024 * 1024; // 1 Gb
// Use different value for tests to work around docker limits on the test machine.
// Use a different value for tests to work around docker limits on the test machine.
#[cfg(test)]
const RESERVE_ADDRESS_SPACE: usize = 64 * 1024 * 1024; // 64 Mb

Expand Down Expand Up @@ -92,7 +92,7 @@ impl TableFile {
} else {
capacity = len / entry_size as u64;
}
let mut map = mmap(&file, len as usize)?;
let map = mmap(&file, len as usize)?;
Some((map, file))
} else {
None
Expand Down
119 changes: 45 additions & 74 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ const META_SIZE: usize = 16 * 1024; // Contains header and column stats
const ENTRY_BITS: u8 = 64;
pub const ENTRY_BYTES: usize = ENTRY_BITS as usize / 8;

const EMPTY_CHUNK: Chunk = [0u8; CHUNK_LEN];
const EMPTY_CHUNK: Chunk = Chunk([0u8; CHUNK_LEN]);
const EMPTY_ENTRIES: [Entry; CHUNK_ENTRIES] = [Entry::empty(); CHUNK_ENTRIES];

pub type Chunk = [u8; CHUNK_LEN];
#[repr(C, align(8))]
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct Chunk(pub [u8; CHUNK_LEN]);

#[allow(clippy::assertions_on_constants)]
const _: () = assert!(META_SIZE >= HEADER_SIZE + stats::TOTAL_SIZE);
Expand Down Expand Up @@ -78,7 +81,7 @@ impl Entry {
self.0
}

fn empty() -> Self {
const fn empty() -> Self {
Entry(0)
}

Expand Down Expand Up @@ -246,9 +249,9 @@ impl IndexTable {
Ok(())
}

fn chunk_at(index: u64, map: &memmap2::MmapMut) -> Result<&[u8; CHUNK_LEN]> {
fn chunk_at(index: u64, map: &memmap2::MmapMut) -> Result<&Chunk> {
let offset = META_SIZE + index as usize * CHUNK_LEN;
let ptr = unsafe { &*(map[offset..offset + CHUNK_LEN].as_ptr() as *const [u8; CHUNK_LEN]) };
let ptr = unsafe { &*(map[offset..offset + CHUNK_LEN].as_ptr() as *const Chunk) };
Ok(try_io!(Ok(ptr)))
}

Expand All @@ -260,33 +263,18 @@ impl IndexTable {
Ok(try_io!(Ok(ptr)))
}
#[cfg(target_arch = "x86_64")]
fn find_entry(
&self,
key_prefix: u64,
sub_index: usize,
chunk: &[u8; CHUNK_LEN],
) -> (Entry, usize) {
fn find_entry(&self, key_prefix: u64, sub_index: usize, chunk: &Chunk) -> (Entry, usize) {
self.find_entry_sse2(key_prefix, sub_index, chunk)
}

#[cfg(not(target_arch = "x86_64"))]
fn find_entry(
&self,
key_prefix: u64,
sub_index: usize,
chunk: &[u8; CHUNK_LEN],
) -> (Entry, usize) {
fn find_entry(&self, key_prefix: u64, sub_index: usize, chunk: &Chunk) -> (Entry, usize) {
self.find_entry_base(key_prefix, sub_index, chunk)
}

#[cfg(target_arch = "x86_64")]
fn find_entry_sse2(
&self,
key_prefix: u64,
sub_index: usize,
chunk: &[u8; CHUNK_LEN],
) -> (Entry, usize) {
assert!(chunk.len() >= CHUNK_ENTRIES * 8); // Bound checking (not done by SIMD instructions)
fn find_entry_sse2(&self, key_prefix: u64, sub_index: usize, chunk: &Chunk) -> (Entry, usize) {
assert!(chunk.0.len() >= CHUNK_ENTRIES * 8); // Bound checking (not done by SIMD instructions)
const _: () = assert!(
CHUNK_ENTRIES % 4 == 0,
"We assume here we got buffer with a number of elements that is a multiple of 4"
Expand All @@ -308,11 +296,11 @@ impl IndexTable {
// Then we remove the address by shifting such that the partial key is in the low
// part
let first_two = _mm_shuffle_epi32::<0b11011000>(_mm_srl_epi64(
_mm_loadu_si128(chunk[i * 8..].as_ptr() as *const __m128i),
_mm_loadu_si128(chunk.0[i * 8..].as_ptr() as *const __m128i),
shift_mask,
));
let last_two = _mm_shuffle_epi32::<0b11011000>(_mm_srl_epi64(
_mm_loadu_si128(chunk[(i + 2) * 8..].as_ptr() as *const __m128i),
_mm_loadu_si128(chunk.0[(i + 2) * 8..].as_ptr() as *const __m128i),
shift_mask,
));
// We set into current the input low parts
Expand All @@ -329,12 +317,7 @@ impl IndexTable {
(Entry::empty(), 0)
}

fn find_entry_base(
&self,
key_prefix: u64,
sub_index: usize,
chunk: &[u8; CHUNK_LEN],
) -> (Entry, usize) {
fn find_entry_base(&self, key_prefix: u64, sub_index: usize, chunk: &Chunk) -> (Entry, usize) {
let partial_key = Entry::extract_key(key_prefix, self.id.index_bits());
for i in sub_index..CHUNK_ENTRIES {
let entry = Self::read_entry(chunk, i);
Expand Down Expand Up @@ -378,18 +361,16 @@ impl IndexTable {
}

pub fn entries(&self, chunk_index: u64, log: &impl LogQuery) -> Result<[Entry; CHUNK_ENTRIES]> {
let mut chunk = [0; CHUNK_LEN];
if let Some(entry) =
log.with_index(self.id, chunk_index, |chunk| Self::transmute_chunk(*chunk))
log.with_index(self.id, chunk_index, |chunk| *Self::transmute_chunk(chunk))
{
return Ok(entry)
}
if let Some(map) = &*self.map.read() {
let source = Self::chunk_at(chunk_index, map)?;
chunk.copy_from_slice(source);
return Ok(Self::transmute_chunk(chunk))
let chunk = Self::chunk_at(chunk_index, map)?;
return Ok(*Self::transmute_chunk(chunk))
}
Ok(Self::transmute_chunk(EMPTY_CHUNK))
Ok(EMPTY_ENTRIES)
}

pub fn sorted_entries(&self) -> Result<Vec<Entry>> {
Expand All @@ -415,24 +396,18 @@ impl IndexTable {
}

#[inline(always)]
fn transmute_chunk(chunk: [u8; CHUNK_LEN]) -> [Entry; CHUNK_ENTRIES] {
let mut result: [Entry; CHUNK_ENTRIES] = unsafe { std::mem::transmute(chunk) };
if !cfg!(target_endian = "little") {
for item in result.iter_mut() {
*item = Entry::from_u64(u64::from_le(item.0));
}
}
result
fn transmute_chunk(chunk: &Chunk) -> &[Entry; CHUNK_ENTRIES] {
unsafe { std::mem::transmute(chunk) }
}

#[inline(always)]
fn write_entry(entry: &Entry, at: usize, chunk: &mut [u8; CHUNK_LEN]) {
chunk[at * 8..at * 8 + 8].copy_from_slice(&entry.as_u64().to_le_bytes());
fn write_entry(entry: &Entry, at: usize, chunk: &mut Chunk) {
chunk.0[at * 8..at * 8 + 8].copy_from_slice(&entry.as_u64().to_le_bytes());
}

#[inline(always)]
fn read_entry(chunk: &[u8; CHUNK_LEN], at: usize) -> Entry {
Entry::from_u64(u64::from_le_bytes(chunk[at * 8..at * 8 + 8].try_into().unwrap()))
fn read_entry(chunk: &Chunk, at: usize) -> Entry {
Entry::from_u64(u64::from_le_bytes(chunk.0[at * 8..at * 8 + 8].try_into().unwrap()))
}

#[inline(always)]
Expand All @@ -444,7 +419,7 @@ impl IndexTable {
&self,
key_prefix: u64,
address: Address,
source: &[u8],
mut chunk: Chunk,
sub_index: Option<usize>,
log: &mut LogWriter,
) -> Result<PlanOutcome> {
Expand All @@ -454,8 +429,6 @@ impl IndexTable {
log::warn!(target: "parity-db", "{}: Address space overflow at {}: {}", self.id, chunk_index, address);
return Ok(PlanOutcome::NeedReindex)
}
let mut chunk = [0; CHUNK_LEN];
chunk.copy_from_slice(source);
let partial_key = Entry::extract_key(key_prefix, self.id.index_bits());
let new_entry = Entry::new(address, partial_key, self.id.index_bits());
if let Some(i) = sub_index {
Expand All @@ -466,15 +439,15 @@ impl IndexTable {
);
Self::write_entry(&new_entry, i, &mut chunk);
log::trace!(target: "parity-db", "{}: Replaced at {}.{}: {}", self.id, chunk_index, i, new_entry.address(self.id.index_bits()));
log.insert_index(self.id, chunk_index, i as u8, &chunk);
log.insert_index(self.id, chunk_index, i as u8, chunk);
return Ok(PlanOutcome::Written)
}
for i in 0..CHUNK_ENTRIES {
let entry = Self::read_entry(&chunk, i);
if entry.is_empty() {
Self::write_entry(&new_entry, i, &mut chunk);
log::trace!(target: "parity-db", "{}: Inserted at {}.{}: {}", self.id, chunk_index, i, new_entry.address(self.id.index_bits()));
log.insert_index(self.id, chunk_index, i as u8, &chunk);
log.insert_index(self.id, chunk_index, i as u8, chunk);
return Ok(PlanOutcome::Written)
}
}
Expand All @@ -493,28 +466,26 @@ impl IndexTable {
let key_prefix = TableKey::index_from_partial(key);
let chunk_index = self.chunk_index(key_prefix);

if let Some(chunk) = log.with_index(self.id, chunk_index, |chunk| *chunk) {
return self.plan_insert_chunk(key_prefix, address, &chunk, sub_index, log)
if let Some(chunk) = log.with_index(self.id, chunk_index, |chunk| chunk.clone()) {
return self.plan_insert_chunk(key_prefix, address, chunk, sub_index, log)
}

if let Some(map) = &*self.map.read() {
let chunk = Self::chunk_at(chunk_index, map)?;
let chunk = Self::chunk_at(chunk_index, map)?.clone();
return self.plan_insert_chunk(key_prefix, address, chunk, sub_index, log)
}

let chunk = &EMPTY_CHUNK;
let chunk = EMPTY_CHUNK.clone();
self.plan_insert_chunk(key_prefix, address, chunk, sub_index, log)
}

fn plan_remove_chunk(
&self,
key_prefix: u64,
source: &[u8],
mut chunk: Chunk,
sub_index: usize,
log: &mut LogWriter,
) -> Result<PlanOutcome> {
let mut chunk = [0; CHUNK_LEN];
chunk.copy_from_slice(source);
let chunk_index = self.chunk_index(key_prefix);
let partial_key = Entry::extract_key(key_prefix, self.id.index_bits());

Expand All @@ -523,7 +494,7 @@ impl IndexTable {
if !entry.is_empty() && entry.partial_key(self.id.index_bits()) == partial_key {
let new_entry = Entry::empty();
Self::write_entry(&new_entry, i, &mut chunk);
log.insert_index(self.id, chunk_index, i as u8, &chunk);
log.insert_index(self.id, chunk_index, i as u8, chunk);
log::trace!(target: "parity-db", "{}: Removed at {}.{}", self.id, chunk_index, i);
return Ok(PlanOutcome::Written)
}
Expand All @@ -541,12 +512,12 @@ impl IndexTable {

let chunk_index = self.chunk_index(key_prefix);

if let Some(chunk) = log.with_index(self.id, chunk_index, |chunk| *chunk) {
return self.plan_remove_chunk(key_prefix, &chunk, sub_index, log)
if let Some(chunk) = log.with_index(self.id, chunk_index, |chunk| chunk.clone()) {
return self.plan_remove_chunk(key_prefix, chunk, sub_index, log)
}

if let Some(map) = &*self.map.read() {
let chunk = Self::chunk_at(chunk_index, map)?;
let chunk = Self::chunk_at(chunk_index, map)?.clone();
return self.plan_remove_chunk(key_prefix, chunk, sub_index, log)
}

Expand Down Expand Up @@ -649,7 +620,7 @@ mod test {

#[test]
fn test_entries() {
let mut chunk = IndexTable::transmute_chunk(EMPTY_CHUNK);
let mut chunk = IndexTable::transmute_chunk(&EMPTY_CHUNK).clone();
let mut chunk2 = EMPTY_CHUNK;
for (i, chunk) in chunk.iter_mut().enumerate().take(CHUNK_ENTRIES) {
use std::{
Expand All @@ -664,7 +635,7 @@ mod test {
*chunk = entry;
}

assert!(IndexTable::transmute_chunk(chunk2) == chunk);
assert!(IndexTable::transmute_chunk(&chunk2) == &chunk);
}

#[test]
Expand All @@ -679,9 +650,9 @@ mod test {

let data_address = Address::from_u64((1 << index_bits) - 1);

let mut chunk = [0; CHUNK_ENTRIES * 8];
let mut chunk = Chunk([0; CHUNK_ENTRIES * 8]);
for (i, partial_key) in partial_keys.iter().enumerate() {
chunk[i * 8..(i + 1) * 8].copy_from_slice(
chunk.0[i * 8..(i + 1) * 8].copy_from_slice(
&Entry::new(data_address, *partial_key, index_bits).as_u64().to_le_bytes(),
);
}
Expand All @@ -705,7 +676,7 @@ mod test {
fn test_find_any_entry() {
let table =
IndexTable { id: TableId(18), map: RwLock::new(None), path: Default::default() };
let mut chunk = [0u8; CHUNK_LEN];
let mut chunk = Chunk([0u8; CHUNK_LEN]);
let mut entries = [Entry::empty(); CHUNK_ENTRIES];
let mut keys = [0u64; CHUNK_ENTRIES];
let mut rng = rand::prelude::SmallRng::from_seed(Default::default());
Expand Down Expand Up @@ -742,7 +713,7 @@ mod test {
fn test_find_entry_same_value() {
let table =
IndexTable { id: TableId(18), map: RwLock::new(None), path: Default::default() };
let mut chunk = [0u8; CHUNK_LEN];
let mut chunk = Chunk([0u8; CHUNK_LEN]);
let key = 0x4242424242424242;
let partial_key = Entry::extract_key(key, 18);
let entry = Entry::new(Address::new(0, 0), partial_key, 18);
Expand All @@ -765,7 +736,7 @@ mod test {
fn test_find_entry_zero_pk() {
let table =
IndexTable { id: TableId(16), map: RwLock::new(None), path: Default::default() };
let mut chunk = [0u8; CHUNK_LEN];
let mut chunk = Chunk([0u8; CHUNK_LEN]);
let zero_key = 0x0000000000000000;
let entry = Entry::new(Address::new(1, 1), zero_key, 16);

Expand All @@ -790,7 +761,7 @@ mod test {
) {
let table =
IndexTable { id: TableId(18), map: RwLock::new(None), path: Default::default() };
let mut chunk = [0u8; CHUNK_LEN];
let mut chunk = Chunk([0u8; CHUNK_LEN]);
let mut keys = [0u64; CHUNK_ENTRIES];
let mut rng = rand::prelude::SmallRng::from_seed(Default::default());
for i in 0..CHUNK_ENTRIES {
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ pub type Key = [u8; KEY_SIZE];

#[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
compile_error!("parity-db only supports x86_64 and aarch64");

#[cfg(not(target_endian = "little"))]
compile_error!("parity-db only supports little-endian platforms");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense (keeping support would require a CI test indeed).

Copy link
Member Author

@arkpar arkpar Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is not tested properly. The platform restriction of x86_64 and aarch64 limits it to little endian anyway.

8 changes: 4 additions & 4 deletions src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl LogChange {
while mask != 0 {
let i = mask.trailing_zeros();
mask &= !(1 << i);
write(&chunk[i as usize * ENTRY_BYTES..(i as usize + 1) * ENTRY_BYTES])?;
write(&chunk.0[i as usize * ENTRY_BYTES..(i as usize + 1) * ENTRY_BYTES])?;
}
}
}
Expand Down Expand Up @@ -405,13 +405,13 @@ impl<'a> LogWriter<'a> {
self.log.record_id
}

pub fn insert_index(&mut self, table: IndexTableId, index: u64, sub: u8, data: &IndexChunk) {
pub fn insert_index(&mut self, table: IndexTableId, index: u64, sub: u8, data: IndexChunk) {
match self.log.local_index.entry(table).or_default().map.entry(index) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
*entry.get_mut() = (self.log.record_id, entry.get().1 | (1 << sub), *data);
*entry.get_mut() = (self.log.record_id, entry.get().1 | (1 << sub), data);
},
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert((self.log.record_id, 1 << sub, *data));
entry.insert((self.log.record_id, 1 << sub, data));
},
}
}
Expand Down
Loading