-
Notifications
You must be signed in to change notification settings - Fork 208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: support fastlanes bitpacking #2886
base: main
Are you sure you want to change the base?
Conversation
ACTION NEEDED The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. For details on the error please inspect the "PR Title Check" action. |
benchmark results: using fastlanes: machine info:
machine net memory write throughput(single thread write): machine net memory write throughput test code: use tokio::runtime::Runtime;
use tokio::io::AsyncWriteExt;
use std::time::Instant;
async fn write_to_memory(size: usize) -> f64 {
let mut buffer = Vec::with_capacity(size);
let data = vec![0u8; size];
let start = Instant::now();
buffer.write_all(&data).await.unwrap();
let duration = start.elapsed();
let bytes_written = size as f64;
let seconds = duration.as_secs_f64();
let throughput = bytes_written / seconds;
throughput
}
fn main() {
let size = 1024 * 1024 * 1024 * 16;
let rt = Runtime::new().unwrap();
let throughput = rt.block_on(write_to_memory(size));
// Convert throughput from bytes/sec to MB/sec
let throughput_mb_per_sec = throughput / (1024.0 * 1024.0);
println!("Throughput: {:.2} MB/sec", throughput_mb_per_sec);
} rust |
I poked around a bit, I didn't know this before, to stress out memory write throughput, we need to do multi-threading, but when I do this, I killed the chance to do multi-threading by spawning many async move {
let bytes = bytes.await?;
let decompressed_output = bitpacked_for_non_neg_decode(
compressed_bit_width,
uncompressed_bits_per_value,
&bytes,
&bytes_idx_to_range_indices,
num_rows,
);
Ok(Box::new(BitpackedForNonNegPageDecoder {
uncompressed_bits_per_value,
decompressed_buf: decompressed_output,
}) as Box<dyn PrimitivePageDecoder>)
}
.boxed() in this function: fn bitpacked_for_non_neg_decode(
compressed_bit_width: u64,
uncompressed_bits_per_value: u64,
data: &Vec<Bytes>,
bytes_idx_to_range_indices: &Vec<Vec<std::ops::Range<u64>>>,
num_rows: u64,
) -> Vec<u8> {
match uncompressed_bits_per_value {
8 => {
let mut decompressed: Vec<u8> = Vec::with_capacity(num_rows as usize);
let packed_chunk_size: usize = 1024 * compressed_bit_width as usize / 8;
let mut decompress_chunk_buf = vec![0_u8; 1024];
for (i, bytes) in data.iter().enumerate() {
let mut j = 0;
let mut ranges_idx = 0;
let mut curr_range_start = bytes_idx_to_range_indices[i][0].start;
while j * packed_chunk_size < bytes.len() {
let chunk: &[u8] = &bytes[j * packed_chunk_size..][..packed_chunk_size];
unsafe {
BitPacking::unchecked_unpack(
compressed_bit_width as usize,
chunk,
&mut decompress_chunk_buf[..1024],
);
}
loop {
if curr_range_start + 1024 < bytes_idx_to_range_indices[i][ranges_idx].end {
let this_part_len = 1024 - curr_range_start % 1024;
decompressed.extend_from_slice(
&decompress_chunk_buf[curr_range_start as usize % 1024..],
);
curr_range_start += this_part_len;
break;
} else {
let this_part_len =
bytes_idx_to_range_indices[i][ranges_idx].end - curr_range_start;
decompressed.extend_from_slice(
&decompress_chunk_buf[curr_range_start as usize % 1024..]
[..this_part_len as usize],
);
ranges_idx += 1;
if ranges_idx == bytes_idx_to_range_indices[i].len() {
break;
}
curr_range_start = bytes_idx_to_range_indices[i][ranges_idx].start;
}
}
j += 1;
}
}
decompressed
}
_ => panic!("Unsupported data type"),
}
} I can do some like this: while j * packed_chunk_size * 34 < bytes.len() {
// decode first chunk
// decode 32 chunks in parallel
// decode the last chunk to decode 32 chunks in parallel, I found it is okay to do something like this in Rust: use std::sync::Arc;
use std::thread;
use std::ptr;
fn main() {
let len = 100;
// Pre-allocate a Vec<u8> with a specific length, initialized to zero
let mut data = vec![0u8; len];
let ptr = data.as_ptr() as u64;
let mut handles = vec![];
// Spawn 10 threads, each writing to a different section of the Vec
for i in 0..10 {
let ptr = ptr;
let handle = thread::spawn(move || {
let start = i * 10;
let end = start + 10;
// Each thread writes to its own non-overlapping section
for j in start..end {
unsafe {
// Use raw pointer to modify the Vec directly
(ptr as *mut u8).add(j).write((j - start) as u8);
}
}
});
handles.push(handle);
}
// Join all threads
for handle in handles {
handle.join().unwrap();
}
// Print the modified Vec
println!("{:?}", data);
} |
cargo clippy is unhappy with this:
should I just suppress it? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial comments
rust/lance-encoding/src/buffer.rs
Outdated
pub fn reinterpret_to_rust_native<T>(&mut self) -> Result<&[T]> | ||
where | ||
T: Copy, // Ensure `T` can be copied (as needed for safely reinterpreting bytes) | ||
{ | ||
let buffer = self.borrow_and_clone(); | ||
|
||
let buffer = buffer.into_buffer(); | ||
|
||
// Get the raw byte slice from the buffer. | ||
let byte_slice = buffer.as_slice(); | ||
|
||
// Safety check - ensure that the byte slice length is a multiple of `T`. | ||
if byte_slice.len() % std::mem::size_of::<T>() != 0 { | ||
return Err(Error::Internal { | ||
message: "Buffer size is not a multiple of the target type size".to_string(), | ||
location: location!(), | ||
}); | ||
} | ||
|
||
// Reinterpret the byte slice as a slice of `T`. | ||
let typed_slice = unsafe { | ||
std::slice::from_raw_parts( | ||
byte_slice.as_ptr() as *const T, | ||
byte_slice.len() / std::mem::size_of::<T>(), | ||
) | ||
}; | ||
|
||
Ok(typed_slice) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this different from borrow_to_typed_slice
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, this is actually the same, I should delete this function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
// Items are bitpacked in a buffer | ||
message BitpackedForNonNeg { | ||
// the number of bits used for a value in the buffer | ||
uint64 compressed_bits_per_value = 1; | ||
|
||
// the number of bits of the uncompressed value. e.g. for a u32, this will be 32 | ||
uint64 uncompressed_bits_per_value = 2; | ||
|
||
// The items in the list | ||
Buffer buffer = 3; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think BitpackedWithNeg` will look like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to make it a cascading encoding of a BTreeMap
of row number index
to real value
for a few very wide(bit-width) values (negative values) then bitpacking
, for arrays that have too many negative values (for example: 50 percent), I think we should not use bitpacking
on it.
fastlanes = "0.1.5" | ||
bytemuck = "=1.18.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these being used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, they are used in the bitpacking_fastlanes.rs
// Compute the compressed_bit_width for a given array of unsigned integers | ||
// the vortex approach is better, they compute all stastistics before encoding |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just say "todo: compute all statistics before encoding"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
.as_any() | ||
.downcast_ref::<PrimitiveArray<UInt8Type>>() | ||
.unwrap(); | ||
//println!("primitive_array: {:?}", primitive_array); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//println!("primitive_array: {:?}", primitive_array); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
} | ||
|
||
impl ArrayEncoder for BitpackedForNonNegArrayEncoder { | ||
fn encode( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely a lot of repetition in this method but we can try and simplify later if it is not easy to collapse these cases (e.g. if it requires a macro lets tackle it later)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I just need a way to map say, both DataType::UInt8
and 'DataType::Int8to
u8` then I should know how to simplify it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs
Outdated
Show resolved
Hide resolved
// I did an extra copy here, not sure how to avoid it and whether it's safe to avoid it | ||
let mut output = Vec::with_capacity(num_rows as usize); | ||
output.extend_from_slice( | ||
&self.decompressed_buf[rows_to_skip as usize..][..num_rows as usize], | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be safe to avoid this copy if I understand what you are doing here. We should be able to slice a LanceBuffer
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I don't know if it's safe for the consumer of decode
to get a Borrowed LanceBuffer
, like I don't know if they will modify it,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, ok. Returning a borrowed buffer is always preferable to making a copy just to return an owned buffer.
In the first case the consumer might or might not have to make a copy. If they have to make one, then its no more expensive for them to make the copy themselves.
In the second case we guarantee a copy happens, even if the consumer is able to work with a borrowed buffer.
Consumers should never assume they have an owned buffer. If they need an owned buffer they should use LanceBuffer::into_owned
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha, will modify accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
let values: Vec<i16> = vec![0; 10000]; | ||
let array = Int16Array::from(values); | ||
let arr = Arc::new(array) as ArrayRef; | ||
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await; | ||
let values: Vec<i16> = vec![88; 10000]; | ||
let array = Int16Array::from(values); | ||
let arr = Arc::new(array) as ArrayRef; | ||
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await; | ||
let values: Vec<i16> = vec![300; 100]; | ||
let array = Int16Array::from(values); | ||
let arr = Arc::new(array) as ArrayRef; | ||
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await; | ||
let values: Vec<i16> = vec![800; 100]; | ||
let array = Int16Array::from(values); | ||
let arr = Arc::new(array) as ArrayRef; | ||
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await; | ||
let arr = lance_datagen::gen() | ||
.anon_col(lance_datagen::array::rand_type( | ||
&DataType::Int16, | ||
)) | ||
.into_batch_rows(RowCount::from(1)) | ||
.unwrap() | ||
.column(0) | ||
.clone(); | ||
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await; | ||
let arr = lance_datagen::gen() | ||
.anon_col(lance_datagen::array::rand_type( | ||
&DataType::Int16, | ||
)) | ||
.into_batch_rows(RowCount::from(20)) | ||
.unwrap() | ||
.column(0) | ||
.clone(); | ||
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await; | ||
let arr = lance_datagen::gen() | ||
.anon_col(lance_datagen::array::rand_type( | ||
&DataType::Int16, | ||
)) | ||
.into_batch_rows(RowCount::from(50)) | ||
.unwrap() | ||
.column(0) | ||
.clone(); | ||
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await; | ||
let arr = lance_datagen::gen() | ||
.anon_col(lance_datagen::array::rand_type( | ||
&DataType::Int16, | ||
)) | ||
.into_batch_rows(RowCount::from(100)) | ||
.unwrap() | ||
.column(0) | ||
.clone(); | ||
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await; | ||
let arr = lance_datagen::gen() | ||
.anon_col(lance_datagen::array::rand_type( | ||
&DataType::Int16, | ||
)) | ||
.into_batch_rows(RowCount::from(1000)) | ||
.unwrap() | ||
.column(0) | ||
.clone(); | ||
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await; | ||
let arr = lance_datagen::gen() | ||
.anon_col(lance_datagen::array::rand_type( | ||
&DataType::Int16, | ||
)) | ||
.into_batch_rows(RowCount::from(1024)) | ||
.unwrap() | ||
.column(0) | ||
.clone(); | ||
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await; | ||
let arr = lance_datagen::gen() | ||
.anon_col(lance_datagen::array::rand_type( | ||
&DataType::Int16, | ||
)) | ||
.into_batch_rows(RowCount::from(2000)) | ||
.unwrap() | ||
.column(0) | ||
.clone(); | ||
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await; | ||
let arr = lance_datagen::gen() | ||
.anon_col(lance_datagen::array::rand_type( | ||
&DataType::Int16, | ||
)) | ||
.into_batch_rows(RowCount::from(3000)) | ||
.unwrap() | ||
.column(0) | ||
.clone(); | ||
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await; | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean this up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
rust/lance-encoding/src/encoder.rs
Outdated
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => { | ||
let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays); | ||
Ok(Box::new(BitpackedForNonNegArrayEncoder::new( | ||
compressed_bit_width as usize, | ||
data_type.clone(), | ||
))) | ||
} | ||
|
||
// for signed integers, I intend to make it a cascaded encoding, a sparse array for the negative values and very wide(bit-width) values, | ||
// then a bitpacked array for the narrow(bit-width) values, I need `BitpackedForNeg` to be merged first | ||
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => { | ||
let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays); | ||
Ok(Box::new(BitpackedForNonNegArrayEncoder::new( | ||
compressed_bit_width as usize, | ||
data_type.clone(), | ||
))) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure we only use the bitpacking encoder if the version is 2.1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
benchmark results on my mac(2023 M2 Pro) 16GiB of memory: to reproduce: for data_type in PRIMITIVE_TYPES {
let data = lance_datagen::gen()
.anon_col(lance_datagen::array::rand_type(data_type))
- .into_batch_rows(lance_datagen::RowCount::from(1024 * 1024))
+ .into_batch_rows(lance_datagen::RowCount::from(512 * 1024 * 1024))
.unwrap(); then run |
…n encoder for dict indices
No description provided.