Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use NullBuffer in ArrayData (#3775) #3778

Merged
merged 6 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions arrow-arith/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ where
.map(|i| unsafe { array.value_unchecked(i) })
.reduce(|acc, item| if cmp(&acc, &item) { item } else { acc })
} else {
let null_buffer = array.data_ref().null_buffer().unwrap();
let iter = BitIndexIterator::new(null_buffer, array.offset(), array.len());
let nulls = array.data().nulls().unwrap();
let iter = BitIndexIterator::new(nulls.validity(), nulls.offset(), nulls.len());
unsafe {
let idx = iter.reduce(|acc_idx, idx| {
let acc = array.value_unchecked(acc_idx);
Expand Down Expand Up @@ -288,20 +288,20 @@ where

let data: &[T::Native] = array.values();

match array.data().null_buffer() {
match array.data().nulls() {
None => {
let sum = data.iter().fold(T::default_value(), |accumulator, value| {
accumulator.add_wrapping(*value)
});

Some(sum)
}
Some(buffer) => {
Some(nulls) => {
let mut sum = T::default_value();
let data_chunks = data.chunks_exact(64);
let remainder = data_chunks.remainder();

let bit_chunks = buffer.bit_chunks(array.offset(), array.len());
let bit_chunks = nulls.inner().bit_chunks();
data_chunks
.zip(bit_chunks.iter())
.for_each(|(chunk, mask)| {
Expand Down Expand Up @@ -347,7 +347,7 @@ where

let data: &[T::Native] = array.values();

match array.data().null_buffer() {
match array.data().nulls() {
None => {
let sum = data
.iter()
Expand All @@ -357,14 +357,14 @@ where

Ok(Some(sum))
}
Some(buffer) => {
Some(nulls) => {
let mut sum = T::default_value();

try_for_each_valid_idx(
array.len(),
array.offset(),
null_count,
Some(buffer.as_slice()),
nulls.len(),
nulls.offset(),
nulls.null_count(),
Some(nulls.validity()),
|idx| {
unsafe { sum = sum.add_checked(array.value_unchecked(idx))? };
Ok::<_, ArrowError>(())
Expand Down Expand Up @@ -665,7 +665,7 @@ mod simd {
let mut chunk_acc = A::init_accumulator_chunk();
let mut rem_acc = A::init_accumulator_scalar();

match array.data().null_buffer() {
match array.data().nulls() {
None => {
let data_chunks = data.chunks_exact(64);
let remainder = data_chunks.remainder();
Expand All @@ -681,12 +681,12 @@ mod simd {
A::accumulate_scalar(&mut rem_acc, *value);
});
}
Some(buffer) => {
Some(nulls) => {
// process data in chunks of 64 elements since we also get 64 bits of validity information at a time
let data_chunks = data.chunks_exact(64);
let remainder = data_chunks.remainder();

let bit_chunks = buffer.bit_chunks(array.offset(), array.len());
let bit_chunks = nulls.inner().bit_chunks();
let remainder_bits = bit_chunks.remainder_bits();

data_chunks.zip(bit_chunks).for_each(|(chunk, mut mask)| {
Expand Down
11 changes: 7 additions & 4 deletions arrow-arith/src/arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,7 @@ mod tests {
use arrow_array::builder::{
BooleanBufferBuilder, BufferBuilder, PrimitiveDictionaryBuilder,
};
use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
use arrow_buffer::i256;
use arrow_data::ArrayDataBuilder;
use chrono::NaiveDate;
Expand Down Expand Up @@ -3057,15 +3058,19 @@ mod tests {
// `count_set_bits_offset` takes len in bits as parameter.
assert_eq!(null_buffer.count_set_bits_offset(0, 13), 0);

let nulls = BooleanBuffer::new(null_buffer, 0, 13);
assert_eq!(nulls.count_set_bits(), 0);
let nulls = NullBuffer::new(nulls);
assert_eq!(nulls.null_count(), 13);

let mut data_buffer_builder = BufferBuilder::<i32>::new(13);
data_buffer_builder.append_slice(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]);
let data_buffer = data_buffer_builder.finish();

let arg1: Int32Array = ArrayDataBuilder::new(DataType::Int32)
.len(13)
.null_count(13)
.nulls(Some(nulls))
.buffers(vec![data_buffer])
.null_bit_buffer(Some(null_buffer))
.build()
.unwrap()
.into();
Expand All @@ -3078,9 +3083,7 @@ mod tests {

let arg2: Int32Array = ArrayDataBuilder::new(DataType::Int32)
.len(13)
.null_count(0)
.buffers(vec![data_buffer])
.null_bit_buffer(None)
.build()
.unwrap()
.into();
Expand Down
14 changes: 3 additions & 11 deletions arrow-arith/src/arity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use arrow_array::builder::BufferBuilder;
use arrow_array::iterator::ArrayIter;
use arrow_array::*;
use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_data::bit_iterator::try_for_each_valid_idx;
use arrow_data::bit_mask::combine_option_bitmap;
Expand Down Expand Up @@ -276,10 +277,7 @@ where
let len = a.len();

let null_buffer = combine_option_bitmap(&[a.data(), b.data()], len);
let null_count = null_buffer
.as_ref()
.map(|x| len - x.count_set_bits_offset(0, len))
.unwrap_or_default();
let nulls = null_buffer.map(|b| NullBuffer::new(BooleanBuffer::new(b, 0, len)));

let mut builder = a.into_builder()?;

Expand All @@ -289,13 +287,7 @@ where
.zip(b.values())
.for_each(|(l, r)| *l = op(*l, *r));

let array_builder = builder
.finish()
.data()
.clone()
.into_builder()
.null_bit_buffer(null_buffer)
.null_count(null_count);
let array_builder = builder.finish().into_data().into_builder().nulls(nulls);

let array_data = unsafe { array_builder.build_unchecked() };
Ok(Ok(PrimitiveArray::<T>::from(array_data)))
Expand Down
Loading