Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: support fastlanes bitpacking #2886

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

broccoliSpicy
Copy link
Contributor

No description provided.

Copy link

ACTION NEEDED
Lance follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

For details on the error please inspect the "PR Title Check" action.

@broccoliSpicy
Copy link
Contributor Author

broccoliSpicy commented Sep 16, 2024

benchmark results:
before: f9f151d, enable bitpacking(comment out https://github.com/westonpace/lance/blob/ace3a400f973b5e3799d414493344ea7ffdd2f05/rust/lance-encoding/src/encoder.rs#L583-L586)
then use the bench_decode2 function in a1e3cdf:
Screenshot 2024-09-16 at 10 18 39 AM

using fastlanes:
a1e3cdf
Screenshot 2024-09-16 at 9 59 21 AM

ce0f798
Screenshot 2024-09-18 at 5 14 21 PM

machine info:

  *-cache:0
       description: L1 cache
       physical id: 5
       slot: L1-Cache
       size: 1536KiB
       capacity: 1536KiB
       capabilities: synchronous internal write-back instruction
       configuration: level=1
  *-cache:1
       description: L2 cache
       physical id: 6
       slot: L2-Cache
       size: 24MiB
       capacity: 24MiB
       capabilities: synchronous internal varies unified
       configuration: level=2
  *-cache:2
       description: L3 cache
       physical id: 7
       slot: L3-Cache
       size: 35MiB
       capacity: 35MiB
       capabilities: synchronous internal varies unified
       configuration: level=3
  *-memory
       description: System Memory
       physical id: 8
       slot: System board or motherboard
       size: 64GiB
     *-bank
          description: DIMM DDR4 Static column Pseudo-static Synchronous Window DRAM 2933 MHz (0.3 ns)
          physical id: 0
          size: 64GiB
          width: 64 bits
          clock: 2933MHz (0.3ns)
➜  ~ uname -a
Linux ip-172-31-2-120 6.1.0-23-cloud-amd64 #1 SMP PREEMPT_DYNAMIC Debian 6.1.99-1 (2024-07-15) x86_64 GNU/Linux
➜  ~ cat /proc/cpuinfo | grep 'model name' | uniq

model name	: Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz

machine net memory write throughput(single thread write):

Screenshot 2024-09-16 at 11 06 41 AM

machine net memory write throughput test code:

use tokio::runtime::Runtime;
use tokio::io::AsyncWriteExt;
use std::time::Instant;

async fn write_to_memory(size: usize) -> f64 {
    let mut buffer = Vec::with_capacity(size);
    let data = vec![0u8; size];

    let start = Instant::now();
    buffer.write_all(&data).await.unwrap();
    let duration = start.elapsed();

    let bytes_written = size as f64;
    let seconds = duration.as_secs_f64();
    let throughput = bytes_written / seconds;

    throughput
}

fn main() {
    let size = 1024 * 1024 * 1024 * 16;
    let rt = Runtime::new().unwrap();

    let throughput = rt.block_on(write_to_memory(size));

    // Convert throughput from bytes/sec to MB/sec
    let throughput_mb_per_sec = throughput / (1024.0 * 1024.0);

    println!("Throughput: {:.2} MB/sec", throughput_mb_per_sec);
}

rust
cargo run --release for the above code

@broccoliSpicy
Copy link
Contributor Author

broccoliSpicy commented Sep 17, 2024

I poked around a bit, I didn't know this before, to stress out memory write throughput, we need to do multi-threading, but when I do this, I killed the chance to do multi-threading by spawning many decode

        async move {
            let bytes = bytes.await?;
            let decompressed_output = bitpacked_for_non_neg_decode(
                compressed_bit_width,
                uncompressed_bits_per_value,
                &bytes,
                &bytes_idx_to_range_indices,
                num_rows,
            );
            Ok(Box::new(BitpackedForNonNegPageDecoder {
                uncompressed_bits_per_value,
                decompressed_buf: decompressed_output,
            }) as Box<dyn PrimitivePageDecoder>)
        }
        .boxed()

in this function:

fn bitpacked_for_non_neg_decode(
    compressed_bit_width: u64,
    uncompressed_bits_per_value: u64,
    data: &Vec<Bytes>,
    bytes_idx_to_range_indices: &Vec<Vec<std::ops::Range<u64>>>,
    num_rows: u64,
) -> Vec<u8> {
    match uncompressed_bits_per_value {
        8 => {
            let mut decompressed: Vec<u8> = Vec::with_capacity(num_rows as usize);
            let packed_chunk_size: usize = 1024 * compressed_bit_width as usize / 8;
            let mut decompress_chunk_buf = vec![0_u8; 1024];
            for (i, bytes) in data.iter().enumerate() {
                let mut j = 0;
                let mut ranges_idx = 0;
                let mut curr_range_start = bytes_idx_to_range_indices[i][0].start;
                while j * packed_chunk_size < bytes.len() {
                    let chunk: &[u8] = &bytes[j * packed_chunk_size..][..packed_chunk_size];
                    unsafe {
                        BitPacking::unchecked_unpack(
                            compressed_bit_width as usize,
                            chunk,
                            &mut decompress_chunk_buf[..1024],
                        );
                    }
                    loop {
                        if curr_range_start + 1024 < bytes_idx_to_range_indices[i][ranges_idx].end {
                            let this_part_len = 1024 - curr_range_start % 1024;
                            decompressed.extend_from_slice(
                                &decompress_chunk_buf[curr_range_start as usize % 1024..],
                            );
                            curr_range_start += this_part_len;
                            break;
                        } else {
                            let this_part_len =
                                bytes_idx_to_range_indices[i][ranges_idx].end - curr_range_start;
                            decompressed.extend_from_slice(
                                &decompress_chunk_buf[curr_range_start as usize % 1024..]
                                    [..this_part_len as usize],
                            );
                            ranges_idx += 1;
                            if ranges_idx == bytes_idx_to_range_indices[i].len() {
                                break;
                            }
                            curr_range_start = bytes_idx_to_range_indices[i][ranges_idx].start;
                        }
                    }
                    j += 1;
                }
            }
            decompressed
        }
        _ => panic!("Unsupported data type"),
    }
}

I can do some like this:

                while j * packed_chunk_size * 34 < bytes.len() {
                    // decode first chunk
                    // decode 32 chunks in parallel
                    // decode the last chunk

to decode 32 chunks in parallel, I found it is okay to do something like this in Rust:

 use std::sync::Arc;
use std::thread;
use std::ptr;

fn main() {
    let len = 100;

    // Pre-allocate a Vec<u8> with a specific length, initialized to zero
    let mut data = vec![0u8; len];

    let ptr = data.as_ptr() as u64;

    let mut handles = vec![];

    // Spawn 10 threads, each writing to a different section of the Vec
    for i in 0..10 {
        let ptr = ptr;
        let handle = thread::spawn(move || {
            let start = i * 10;
            let end = start + 10;

            // Each thread writes to its own non-overlapping section
            for j in start..end {
                unsafe {
                    // Use raw pointer to modify the Vec directly
                    (ptr as *mut u8).add(j).write((j - start) as u8);
                }
            }
        });
        handles.push(handle);
    }

    // Join all threads
    for handle in handles {
        handle.join().unwrap();
    }

    // Print the modified Vec
    println!("{:?}", data);
}

@broccoliSpicy
Copy link
Contributor Author

cargo clippy is unhappy with this:

    = help: to override `-W clippy::all` add `#[allow(clippy::uninit_vec)]`

warning: calling `set_len()` immediately after reserving a buffer creates uninitialized values
   --> rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs:636:17
    |
636 |                 let mut output: Vec<u32> = Vec::with_capacity(num_rows as usize);
    |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
637 |                 unsafe {
638 |                     output.set_len(num_rows as usize);
    |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |

should I just suppress it?

@broccoliSpicy broccoliSpicy changed the title feature: support fastlanes bitpacking for uint8 type(draft, wip) feature: support fastlanes bitpacking Sep 18, 2024
Copy link
Contributor

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments

Comment on lines 206 to 235
pub fn reinterpret_to_rust_native<T>(&mut self) -> Result<&[T]>
where
T: Copy, // Ensure `T` can be copied (as needed for safely reinterpreting bytes)
{
let buffer = self.borrow_and_clone();

let buffer = buffer.into_buffer();

// Get the raw byte slice from the buffer.
let byte_slice = buffer.as_slice();

// Safety check - ensure that the byte slice length is a multiple of `T`.
if byte_slice.len() % std::mem::size_of::<T>() != 0 {
return Err(Error::Internal {
message: "Buffer size is not a multiple of the target type size".to_string(),
location: location!(),
});
}

// Reinterpret the byte slice as a slice of `T`.
let typed_slice = unsafe {
std::slice::from_raw_parts(
byte_slice.as_ptr() as *const T,
byte_slice.len() / std::mem::size_of::<T>(),
)
};

Ok(typed_slice)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this different from borrow_to_typed_slice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is actually the same, I should delete this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines +193 to +203
// Items are bitpacked in a buffer
message BitpackedForNonNeg {
// the number of bits used for a value in the buffer
uint64 compressed_bits_per_value = 1;

// the number of bits of the uncompressed value. e.g. for a u32, this will be 32
uint64 uncompressed_bits_per_value = 2;

// The items in the list
Buffer buffer = 3;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think BitpackedWithNeg` will look like?

Copy link
Contributor Author

@broccoliSpicy broccoliSpicy Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to make it a cascading encoding of a BTreeMap of row number index to real value for a few very wide(bit-width) values (negative values) then bitpacking, for arrays that have too many negative values (for example: 50 percent), I think we should not use bitpacking on it.

Comment on lines +40 to +41
fastlanes = "0.1.5"
bytemuck = "=1.18.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these being used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, they are used in the bitpacking_fastlanes.rs

Comment on lines 28 to 29
// Compute the compressed_bit_width for a given array of unsigned integers
// the vortex approach is better, they compute all stastistics before encoding
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just say "todo: compute all statistics before encoding"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

.as_any()
.downcast_ref::<PrimitiveArray<UInt8Type>>()
.unwrap();
//println!("primitive_array: {:?}", primitive_array);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//println!("primitive_array: {:?}", primitive_array);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}

impl ArrayEncoder for BitpackedForNonNegArrayEncoder {
fn encode(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely a lot of repetition in this method but we can try and simplify later if it is not easy to collapse these cases (e.g. if it requires a macro lets tackle it later)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I just need a way to map say, both DataType::UInt8 and 'DataType::Int8tou8` then I should know how to simplify it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines 602 to 606
// I did an extra copy here, not sure how to avoid it and whether it's safe to avoid it
let mut output = Vec::with_capacity(num_rows as usize);
output.extend_from_slice(
&self.decompressed_buf[rows_to_skip as usize..][..num_rows as usize],
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be safe to avoid this copy if I understand what you are doing here. We should be able to slice a LanceBuffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I don't know if it's safe for the consumer of decode to get a Borrowed LanceBuffer, like I don't know if they will modify it,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. Returning a borrowed buffer is always preferable to making a copy just to return an owned buffer.

In the first case the consumer might or might not have to make a copy. If they have to make one, then its no more expensive for them to make the copy themselves.

In the second case we guarantee a copy happens, even if the consumer is able to work with a borrowed buffer.

Consumers should never assume they have an owned buffer. If they need an owned buffer they should use LanceBuffer::into_owned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, will modify accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines 1465 to 1577
let values: Vec<i16> = vec![0; 10000];
let array = Int16Array::from(values);
let arr = Arc::new(array) as ArrayRef;
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await;
let values: Vec<i16> = vec![88; 10000];
let array = Int16Array::from(values);
let arr = Arc::new(array) as ArrayRef;
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await;
let values: Vec<i16> = vec![300; 100];
let array = Int16Array::from(values);
let arr = Arc::new(array) as ArrayRef;
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await;
let values: Vec<i16> = vec![800; 100];
let array = Int16Array::from(values);
let arr = Arc::new(array) as ArrayRef;
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await;
let arr = lance_datagen::gen()
.anon_col(lance_datagen::array::rand_type(
&DataType::Int16,
))
.into_batch_rows(RowCount::from(1))
.unwrap()
.column(0)
.clone();
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await;
let arr = lance_datagen::gen()
.anon_col(lance_datagen::array::rand_type(
&DataType::Int16,
))
.into_batch_rows(RowCount::from(20))
.unwrap()
.column(0)
.clone();
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await;
let arr = lance_datagen::gen()
.anon_col(lance_datagen::array::rand_type(
&DataType::Int16,
))
.into_batch_rows(RowCount::from(50))
.unwrap()
.column(0)
.clone();
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await;
let arr = lance_datagen::gen()
.anon_col(lance_datagen::array::rand_type(
&DataType::Int16,
))
.into_batch_rows(RowCount::from(100))
.unwrap()
.column(0)
.clone();
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await;
let arr = lance_datagen::gen()
.anon_col(lance_datagen::array::rand_type(
&DataType::Int16,
))
.into_batch_rows(RowCount::from(1000))
.unwrap()
.column(0)
.clone();
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await;
let arr = lance_datagen::gen()
.anon_col(lance_datagen::array::rand_type(
&DataType::Int16,
))
.into_batch_rows(RowCount::from(1024))
.unwrap()
.column(0)
.clone();
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await;
let arr = lance_datagen::gen()
.anon_col(lance_datagen::array::rand_type(
&DataType::Int16,
))
.into_batch_rows(RowCount::from(2000))
.unwrap()
.column(0)
.clone();
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await;
let arr = lance_datagen::gen()
.anon_col(lance_datagen::array::rand_type(
&DataType::Int16,
))
.into_batch_rows(RowCount::from(3000))
.unwrap()
.column(0)
.clone();
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await;
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean this up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines 336 to 352
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
compressed_bit_width as usize,
data_type.clone(),
)))
}

// for signed integers, I intend to make it a cascaded encoding, a sparse array for the negative values and very wide(bit-width) values,
// then a bitpacked array for the narrow(bit-width) values, I need `BitpackedForNeg` to be merged first
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
compressed_bit_width as usize,
data_type.clone(),
)))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure we only use the bitpacking encoder if the version is 2.1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@broccoliSpicy
Copy link
Contributor Author

Screenshot 2024-09-19 at 11 55 06 AM

benchmark results on my mac(2023 M2 Pro) 16GiB of memory:
3ad773c

to reproduce:
change RowCount in bench_decode to 512 * 1024 * 1024:

     for data_type in PRIMITIVE_TYPES {
         let data = lance_datagen::gen()
             .anon_col(lance_datagen::array::rand_type(data_type))
-            .into_batch_rows(lance_datagen::RowCount::from(1024 * 1024))
+            .into_batch_rows(lance_datagen::RowCount::from(512 * 1024 * 1024))
             .unwrap();

then run rustup run nightly cargo bench --bench decoder -- decode_primitive/uint32

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants