Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various Improvements #55

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ license = "MIT"
[dependencies]
bytemuck = { version = "1.16.1", features = ["derive"] }
byteorder = "1.5.0"
either = { version = "1.13.0", default-features = false }
flate2 = { version = "1.0", optional = true }
lz4_flex = { version = "0.11.3", optional = true }
rayon = { version = "1.10.0", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion benches/index-levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn index_levels(bytes: &[u8]) {

for x in (0..NUMBER_OF_ENTRIES).step_by(1_567) {
let num = x.to_be_bytes();
cursor.move_on_key_greater_than_or_equal_to(&num).unwrap().unwrap();
cursor.move_on_key_greater_than_or_equal_to(num).unwrap().unwrap();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@
/// Insert a key that must be greater than the previously added one.
pub fn insert(&mut self, key: &[u8], val: &[u8]) {
debug_assert!(self.index_key_counter <= self.index_key_interval.get());
assert!(key.len() <= u32::max_value() as usize);

Check warning on line 97 in src/block_writer.rs

View workflow job for this annotation

GitHub Actions / lint

usage of a legacy numeric method
assert!(val.len() <= u32::max_value() as usize);

Check warning on line 98 in src/block_writer.rs

View workflow job for this annotation

GitHub Actions / lint

usage of a legacy numeric method

if self.index_key_counter == self.index_key_interval.get() {
self.index_offsets.push(self.buffer.len() as u64);
Expand All @@ -106,7 +106,7 @@
// and save the current key to become the last key.
match &mut self.last_key {
Some(last_key) => {
assert!(key > last_key, "{:?} must be greater than {:?}", key, last_key);
assert!(key > last_key.as_slice(), "{:?} must be greater than {:?}", key, last_key);
last_key.clear();
last_key.extend_from_slice(key);
}
Expand Down
9 changes: 2 additions & 7 deletions src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::str::FromStr;
use std::{fmt, io};

/// The different supported types of compression.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(u8)]
pub enum CompressionType {
/// Do not compress the blocks.
#[default]
None = 0,
/// Use the [`snap`] crate to de/compress the blocks.
///
Expand Down Expand Up @@ -55,12 +56,6 @@ impl FromStr for CompressionType {
}
}

impl Default for CompressionType {
fn default() -> CompressionType {
CompressionType::None
}
}

/// An invalid compression type have been read and the block can't be de/compressed.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct InvalidCompressionType;
Expand Down
53 changes: 51 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@
//!
//! // We can see that the sum of u32s is valid here.
//! assert_eq!(iter.next()?, Some((&b"first-counter"[..], &119_u32.to_ne_bytes()[..])));
//! assert_eq!(iter.next()?, Some((&b"second-counter"[..], &384_u32.to_ne_bytes()[..])));

Check failure on line 126 in src/lib.rs

View workflow job for this annotation

GitHub Actions / test (beta)

the method `next` exists for struct `MergerIter<Cursor<Vec<u8>>, fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, ...> {wrapping_sum_u32s}>`, but its trait bounds were not satisfied

Check failure on line 126 in src/lib.rs

View workflow job for this annotation

GitHub Actions / miri

the method `next` exists for struct `MergerIter<Cursor<Vec<u8>>, fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, ...> {wrapping_sum_u32s}>`, but its trait bounds were not satisfied
//! assert_eq!(iter.next()?, None);

Check failure on line 127 in src/lib.rs

View workflow job for this annotation

GitHub Actions / test (beta)

the method `next` exists for struct `MergerIter<Cursor<Vec<u8>>, fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, ...> {wrapping_sum_u32s}>`, but its trait bounds were not satisfied

Check failure on line 127 in src/lib.rs

View workflow job for this annotation

GitHub Actions / miri

the method `next` exists for struct `MergerIter<Cursor<Vec<u8>>, fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, ...> {wrapping_sum_u32s}>`, but its trait bounds were not satisfied
//! # Ok(()) }

Check failure on line 128 in src/lib.rs

View workflow job for this annotation

GitHub Actions / test (beta)

the method `next` exists for struct `MergerIter<Cursor<Vec<u8>>, fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, ...> {wrapping_sum_u32s}>`, but its trait bounds were not satisfied

Check failure on line 128 in src/lib.rs

View workflow job for this annotation

GitHub Actions / miri

the method `next` exists for struct `MergerIter<Cursor<Vec<u8>>, fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, ...> {wrapping_sum_u32s}>`, but its trait bounds were not satisfied
//! ```
//!
//! ## Use the `Sorter` struct
Expand Down Expand Up @@ -168,15 +168,15 @@
//! // We insert multiple entries with the same key but different values
//! // in arbitrary order, the sorter will take care of merging them for us.
//! sorter.insert("first-counter", 32_u32.to_ne_bytes())?;
//! sorter.insert("first-counter", 23_u32.to_ne_bytes())?;

Check failure on line 171 in src/lib.rs

View workflow job for this annotation

GitHub Actions / test (beta)

the method `insert` exists for struct `Sorter<fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, TryFromSliceError> {wrapping_sum_u32s}, CursorVec>`, but its trait bounds were not satisfied

Check failure on line 171 in src/lib.rs

View workflow job for this annotation

GitHub Actions / miri

the method `insert` exists for struct `Sorter<fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, TryFromSliceError> {wrapping_sum_u32s}, CursorVec>`, but its trait bounds were not satisfied
//! sorter.insert("first-counter", 64_u32.to_ne_bytes())?;

Check failure on line 172 in src/lib.rs

View workflow job for this annotation

GitHub Actions / test (beta)

the method `insert` exists for struct `Sorter<fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, TryFromSliceError> {wrapping_sum_u32s}, CursorVec>`, but its trait bounds were not satisfied

Check failure on line 172 in src/lib.rs

View workflow job for this annotation

GitHub Actions / miri

the method `insert` exists for struct `Sorter<fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, TryFromSliceError> {wrapping_sum_u32s}, CursorVec>`, but its trait bounds were not satisfied
//!

Check failure on line 173 in src/lib.rs

View workflow job for this annotation

GitHub Actions / test (beta)

the method `insert` exists for struct `Sorter<fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, TryFromSliceError> {wrapping_sum_u32s}, CursorVec>`, but its trait bounds were not satisfied

Check failure on line 173 in src/lib.rs

View workflow job for this annotation

GitHub Actions / miri

the method `insert` exists for struct `Sorter<fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, TryFromSliceError> {wrapping_sum_u32s}, CursorVec>`, but its trait bounds were not satisfied
//! sorter.insert("second-counter", 320_u32.to_ne_bytes())?;
//! sorter.insert("second-counter", 64_u32.to_ne_bytes())?;

Check failure on line 175 in src/lib.rs

View workflow job for this annotation

GitHub Actions / test (beta)

the method `insert` exists for struct `Sorter<fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, TryFromSliceError> {wrapping_sum_u32s}, CursorVec>`, but its trait bounds were not satisfied

Check failure on line 175 in src/lib.rs

View workflow job for this annotation

GitHub Actions / miri

the method `insert` exists for struct `Sorter<fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, TryFromSliceError> {wrapping_sum_u32s}, CursorVec>`, but its trait bounds were not satisfied
//!

Check failure on line 176 in src/lib.rs

View workflow job for this annotation

GitHub Actions / test (beta)

the method `insert` exists for struct `Sorter<fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, TryFromSliceError> {wrapping_sum_u32s}, CursorVec>`, but its trait bounds were not satisfied

Check failure on line 176 in src/lib.rs

View workflow job for this annotation

GitHub Actions / miri

the method `insert` exists for struct `Sorter<fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, TryFromSliceError> {wrapping_sum_u32s}, CursorVec>`, but its trait bounds were not satisfied
//! // We can iterate over the entries in key-order.
//! let mut iter = sorter.into_stream_merger_iter()?;
//!

Check failure on line 179 in src/lib.rs

View workflow job for this annotation

GitHub Actions / test (beta)

the method `into_stream_merger_iter` exists for struct `Sorter<fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, TryFromSliceError> {wrapping_sum_u32s}, CursorVec>`, but its trait bounds were not satisfied

Check failure on line 179 in src/lib.rs

View workflow job for this annotation

GitHub Actions / miri

the method `into_stream_merger_iter` exists for struct `Sorter<fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, TryFromSliceError> {wrapping_sum_u32s}, CursorVec>`, but its trait bounds were not satisfied
//! // We can see that the sum of u32s is valid here.
//! assert_eq!(iter.next()?, Some((&b"first-counter"[..], &119_u32.to_ne_bytes()[..])));
//! assert_eq!(iter.next()?, Some((&b"second-counter"[..], &384_u32.to_ne_bytes()[..])));
Expand All @@ -187,7 +187,8 @@
#[cfg(test)]
#[macro_use]
extern crate quickcheck;

use std::borrow::Cow;
use std::convert::Infallible;
use std::mem;

mod block;
Expand All @@ -202,6 +203,8 @@
mod varint;
mod writer;

use either::Either;

pub use self::compression::CompressionType;
pub use self::error::Error;
pub use self::merger::{Merger, MergerBuilder, MergerIter};
Expand All @@ -214,10 +217,56 @@
};
pub use self::writer::{Writer, WriterBuilder};

pub type Result<T, U = Infallible> = std::result::Result<T, Error<U>>;

// TODO move this elsewhere
pub trait MergeFunction {
type Error;
fn merge<'a>(
&self,
key: &[u8],
values: &[Cow<'a, [u8]>],
) -> std::result::Result<Cow<'a, [u8]>, Self::Error>;
}

impl<MF> MergeFunction for &MF
where
MF: MergeFunction,
{
type Error = MF::Error;

fn merge<'a>(
&self,
key: &[u8],
values: &[Cow<'a, [u8]>],
) -> std::result::Result<Cow<'a, [u8]>, Self::Error> {
(*self).merge(key, values)
}
}

impl<MFA, MFB> MergeFunction for Either<MFA, MFB>
where
MFA: MergeFunction,
MFB: MergeFunction<Error = MFA::Error>,
{
type Error = MFA::Error;

fn merge<'a>(
&self,
key: &[u8],
values: &[Cow<'a, [u8]>],
) -> std::result::Result<Cow<'a, [u8]>, Self::Error> {
match self {
Either::Left(mfa) => mfa.merge(key, values),
Either::Right(mfb) => mfb.merge(key, values),
}
}
}

/// Sometimes we need to use an unsafe trick to make the compiler happy.
/// You can read more about the issue [on the Rust's Github issues].
///
/// [on the Rust's Github issues]: https://github.com/rust-lang/rust/issues/47680
unsafe fn transmute_entry_to_static(key: &[u8], val: &[u8]) -> (&'static [u8], &'static [u8]) {
(mem::transmute(key), mem::transmute(val))
(mem::transmute::<&[u8], &'static [u8]>(key), mem::transmute::<&[u8], &'static [u8]>(val))
}
20 changes: 10 additions & 10 deletions src/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::io;
use std::iter::once;

use crate::{Error, ReaderCursor, Writer};
use crate::{Error, MergeFunction, ReaderCursor, Writer};

/// A struct that is used to configure a [`Merger`] with the sources to merge.
pub struct MergerBuilder<R, MF> {
Expand All @@ -20,7 +20,7 @@
}

/// Add a source to merge, this function can be chained.
pub fn add(mut self, source: ReaderCursor<R>) -> Self {

Check warning on line 23 in src/merger.rs

View workflow job for this annotation

GitHub Actions / lint

method `add` can be confused for the standard trait method `std::ops::Add::add`
self.push(source);
self
}
Expand Down Expand Up @@ -95,7 +95,7 @@
}

Ok(MergerIter {
merge: self.merge,
merge_function: self.merge,
heap,
current_key: Vec::new(),
merged_value: Vec::new(),
Expand All @@ -104,16 +104,16 @@
}
}

impl<R, MF, U> Merger<R, MF>
impl<R, MF> Merger<R, MF>
where
R: io::Read + io::Seek,
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, U>,
MF: MergeFunction,
{
/// Consumes this [`Merger`] and streams the entries to the [`Writer`] given in parameter.
pub fn write_into_stream_writer<W: io::Write>(
self,
writer: &mut Writer<W>,
) -> Result<(), Error<U>> {
) -> crate::Result<(), MF::Error> {
let mut iter = self.into_stream_merger_iter().map_err(Error::convert_merge_error)?;
while let Some((key, val)) = iter.next()? {
writer.insert(key, val)?;
Expand All @@ -124,21 +124,21 @@

/// An iterator that yield the merged entries in key-order.
pub struct MergerIter<R, MF> {
merge: MF,
merge_function: MF,
heap: BinaryHeap<Entry<R>>,
current_key: Vec<u8>,
merged_value: Vec<u8>,
/// We keep this buffer to avoid allocating a vec every time.
tmp_entries: Vec<Entry<R>>,
}

impl<R, MF, U> MergerIter<R, MF>
impl<R, MF> MergerIter<R, MF>
where
R: io::Read + io::Seek,
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, U>,
MF: MergeFunction,
{
/// Yield the entries in key-order where values have been merged when needed.
pub fn next(&mut self) -> Result<Option<(&[u8], &[u8])>, Error<U>> {
pub fn next(&mut self) -> crate::Result<Option<(&[u8], &[u8])>, MF::Error> {

Check warning on line 141 in src/merger.rs

View workflow job for this annotation

GitHub Actions / lint

method `next` can be confused for the standard trait method `std::iter::Iterator::next`

Check warning on line 141 in src/merger.rs

View workflow job for this annotation

GitHub Actions / lint

very complex type used. Consider factoring parts into `type` definitions
let first_entry = match self.heap.pop() {
Some(entry) => entry,
None => return Ok(None),
Expand Down Expand Up @@ -167,7 +167,7 @@
self.tmp_entries.iter().filter_map(|e| e.cursor.current().map(|(_, v)| v));
let values: Vec<_> = once(first_value).chain(other_values).map(Cow::Borrowed).collect();

match (self.merge)(first_key, &values) {
match self.merge_function.merge(first_key, &values) {
Ok(value) => {
self.current_key.clear();
self.current_key.extend_from_slice(first_key);
Expand Down
10 changes: 5 additions & 5 deletions src/reader/prefix_iter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::io;

use crate::{Error, ReaderCursor};
use crate::ReaderCursor;

/// An iterator that is able to yield all the entries with
/// a key that starts with a given prefix.
Expand All @@ -18,7 +18,7 @@
}

/// Returns the next entry that starts with the given prefix.
pub fn next(&mut self) -> Result<Option<(&[u8], &[u8])>, Error> {
pub fn next(&mut self) -> crate::Result<Option<(&[u8], &[u8])>> {

Check warning on line 21 in src/reader/prefix_iter.rs

View workflow job for this annotation

GitHub Actions / lint

method `next` can be confused for the standard trait method `std::iter::Iterator::next`
let entry = if self.move_on_first_prefix {
self.move_on_first_prefix = false;
self.cursor.move_on_key_greater_than_or_equal_to(&self.prefix)?
Expand Down Expand Up @@ -49,7 +49,7 @@
}

/// Returns the next entry that starts with the given prefix.
pub fn next(&mut self) -> Result<Option<(&[u8], &[u8])>, Error> {
pub fn next(&mut self) -> crate::Result<Option<(&[u8], &[u8])>> {

Check warning on line 52 in src/reader/prefix_iter.rs

View workflow job for this annotation

GitHub Actions / lint

method `next` can be confused for the standard trait method `std::iter::Iterator::next`
let entry = if self.move_on_last_prefix {
self.move_on_last_prefix = false;
move_on_last_prefix(&mut self.cursor, self.prefix.clone())?
Expand All @@ -68,7 +68,7 @@
fn move_on_last_prefix<R: io::Read + io::Seek>(
cursor: &mut ReaderCursor<R>,
prefix: Vec<u8>,
) -> Result<Option<(&[u8], &[u8])>, Error> {
) -> crate::Result<Option<(&[u8], &[u8])>> {
match advance_key(prefix) {
Some(next_prefix) => match cursor.move_on_key_lower_than_or_equal_to(&next_prefix)? {
Some((k, _)) if k == next_prefix => cursor.move_on_prev(),
Expand Down Expand Up @@ -108,7 +108,7 @@
let mut writer = Writer::memory();
for x in (10..24000u32).step_by(3) {
let x = x.to_be_bytes();
writer.insert(&x, &x).unwrap();
writer.insert(x, x).unwrap();
}

let bytes = writer.into_inner().unwrap();
Expand Down
18 changes: 9 additions & 9 deletions src/reader/range_iter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io;
use std::ops::{Bound, RangeBounds};

use crate::{Error, ReaderCursor};
use crate::ReaderCursor;

/// An iterator that is able to yield all the entries lying in a specified range.
#[derive(Clone)]
Expand All @@ -24,7 +24,7 @@
}

/// Returns the next entry that is inside of the given range.
pub fn next(&mut self) -> Result<Option<(&[u8], &[u8])>, Error> {
pub fn next(&mut self) -> crate::Result<Option<(&[u8], &[u8])>> {

Check warning on line 27 in src/reader/range_iter.rs

View workflow job for this annotation

GitHub Actions / lint

method `next` can be confused for the standard trait method `std::iter::Iterator::next`
let entry = if self.move_on_start {
self.move_on_start = false;
match self.range.start_bound() {
Expand Down Expand Up @@ -75,7 +75,7 @@
}

/// Returns the next entry that is inside of the given range.
pub fn next(&mut self) -> Result<Option<(&[u8], &[u8])>, Error> {
pub fn next(&mut self) -> crate::Result<Option<(&[u8], &[u8])>> {

Check warning on line 78 in src/reader/range_iter.rs

View workflow job for this annotation

GitHub Actions / lint

method `next` can be confused for the standard trait method `std::iter::Iterator::next`
let entry = if self.move_on_start {
self.move_on_start = false;
match self.range.end_bound() {
Expand Down Expand Up @@ -116,17 +116,17 @@
fn end_contains(end: Bound<&Vec<u8>>, key: &[u8]) -> bool {
match end {
Bound::Unbounded => true,
Bound::Included(end) => key <= end,
Bound::Excluded(end) => key < end,
Bound::Included(end) => key <= end.as_slice(),
Bound::Excluded(end) => key < end.as_slice(),
}
}

/// Returns weither the provided key doesn't outbound this start bound.
fn start_contains(end: Bound<&Vec<u8>>, key: &[u8]) -> bool {
match end {
Bound::Unbounded => true,
Bound::Included(end) => key >= end,
Bound::Excluded(end) => key > end,
Bound::Included(end) => key >= end.as_slice(),
Bound::Excluded(end) => key > end.as_slice(),
}
}

Expand All @@ -149,7 +149,7 @@
for x in (10..24000i32).step_by(3) {
nums.insert(x);
let x = x.to_be_bytes();
writer.insert(&x, &x).unwrap();
writer.insert(x, x).unwrap();
}

let bytes = writer.into_inner().unwrap();
Expand Down Expand Up @@ -186,7 +186,7 @@
for x in (10..24000i32).step_by(3) {
nums.insert(x);
let x = x.to_be_bytes();
writer.insert(&x, &x).unwrap();
writer.insert(x, x).unwrap();
}

let bytes = writer.into_inner().unwrap();
Expand Down
Loading
Loading