Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binary / row helpers #6096

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 168 additions & 2 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ use std::sync::Arc;
use arrow_array::cast::*;
use arrow_array::types::ArrowDictionaryKeyType;
use arrow_array::*;
use arrow_buffer::ArrowNativeType;
use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::*;
use variable::{decode_binary_view, decode_string_view};
Expand Down Expand Up @@ -738,6 +738,42 @@ impl RowConverter {
}
}

/// Create a new [Rows] instance from the given binary data.
Copy link
Contributor

@alamb alamb Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a doc example showing how to do this?

I think trying to give a basic example of converting rows, then to/from binary, will not only serve as good documentation it will make sure all the required APIs are pub (for example, I think RowParser needs to be pub..)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done.

///
/// ```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may also be worth adding a doc comment here about when this API will panic (when the data passed in was invalid or empty)

/// # use std::sync::Arc;
/// # use std::collections::HashSet;
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::StringArray;
/// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
/// # use arrow_schema::DataType;
/// #
/// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
/// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
/// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
///
/// // We can convert rows into binary format and back in batch.
/// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
/// let binary = rows.try_into_binary().expect("small");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got a little confused by .expect("small"). What does "small" mean in this context? Why not just .unwrap()?

/// let converted = converter.from_binary(binary.clone());
/// assert!(converted.iter().eq(values.iter().map(|r| r.row())));
/// ```
pub fn from_binary(&self, array: BinaryArray) -> Rows {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this function needs to be marked unsafe -- I am worried that someone inserts invalid data into Rows here (e.g. modifies the bytes to read in invalid UTF8).

However, I see that there is already a way to convert between Rows and [u8] and then from [u8] to Rows (e.g RowParser::parser)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's my understanding! This would definitely be unsafe without the validate_utf8 below... but with it, I believe this has the same safety properties as existing public API.

assert_eq!(
array.null_count(),
0,
"can't construct Rows instance from array with nulls"
);
Rows {
buffer: array.values().to_vec(),
offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(),
config: RowConfig {
fields: Arc::clone(&self.fields),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More for my curiosity than anything but why Arc::clone(&self.fields) instead of self.fields.clone()?

validate_utf8: true,
},
}
}

/// Convert raw bytes into [`ArrayRef`]
///
/// # Safety
Expand Down Expand Up @@ -878,6 +914,50 @@ impl Rows {
+ self.buffer.len()
+ self.offsets.len() * std::mem::size_of::<usize>()
}

/// Create a [BinaryArray] from the [Rows] data without reallocating the
/// underlying bytes.
///
///
/// ```
/// # use std::sync::Arc;
/// # use std::collections::HashSet;
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::StringArray;
/// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
/// # use arrow_schema::DataType;
/// #
/// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
/// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
/// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
///
/// // We can convert rows into binary format and back.
/// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
/// let binary = rows.try_into_binary().expect("small");
/// let parser = converter.parser();
/// let parsed: Vec<OwnedRow> =
/// binary.iter().flatten().map(|b| parser.parse(b).owned()).collect();
/// assert_eq!(values, parsed);
/// ```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is worth commenting here when this will return an error (aka when the data is more than 2GB)?

pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
if self.buffer.len() > i32::MAX as usize {
return Err(ArrowError::InvalidArgumentError(format!(
"{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
self.buffer.len()
)));
}
// We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well.
let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to check that the offsets don't overflow a i32 - this should be a try_into I think and the method should be like fn try_into_binary(self) -> Result<BinaryArray>

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My belief is that this is guaranteed by the assert above (which asserts that the len is not larger than i32::MAX) and the existing offset invariant (which guarantees that all offsets are valid indices into the binary data). So a more expensive O(n) check seemed redundant.

I'll go ahead and turn that assert into a Result::Err; let me know what you think about the other side of it!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @bkirwi 's logic here. If we assume that self.offsets is well formed with respect to self.buffer then we shouldn't need to check the individual offsets.

// SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer.
let array = unsafe {
BinaryArray::new_unchecked(
OffsetBuffer::new_unchecked(offsets_scalar),
Buffer::from_vec(self.buffer),
None,
)
};
Ok(array)
}
}

impl<'a> IntoIterator for &'a Rows {
Expand Down Expand Up @@ -961,6 +1041,11 @@ impl<'a> Row<'a> {
config: self.config.clone(),
}
}

/// The row's bytes, with the lifetime of the underlying data.
pub fn data(&self) -> &'a [u8] {
self.data
}
}

// Manually derive these as don't wish to include `fields`
Expand Down Expand Up @@ -1838,6 +1923,69 @@ mod tests {
converter.convert_rows(std::iter::once(utf8_row)).unwrap();
}

#[test]
#[should_panic(expected = "Encountered non UTF-8 data")]
fn test_invalid_utf8_array() {
let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
let rows = converter.convert_columns(&[array]).unwrap();
let binary_rows = rows.try_into_binary().expect("known-small rows");

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parsed = converter.from_binary(binary_rows);

converter.convert_rows(parsed.iter()).unwrap();
}


#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_empty() {
let binary_row: &[u8] = &[];

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parser = converter.parser();
let utf8_row = parser.parse(binary_row.as_ref());

converter.convert_rows(std::iter::once(utf8_row)).unwrap();
}

#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_empty_array() {
let row: &[u8] = &[];
let binary_rows = BinaryArray::from(vec![row]);

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parsed = converter.from_binary(binary_rows);

converter.convert_rows(parsed.iter()).unwrap();
}

#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_truncated() {
let binary_row: &[u8] = &[0x02];

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parser = converter.parser();
let utf8_row = parser.parse(binary_row.as_ref());

converter.convert_rows(std::iter::once(utf8_row)).unwrap();
}

#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_truncated_array() {
let row: &[u8] = &[0x02];
let binary_rows = BinaryArray::from(vec![row]);

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parsed = converter.from_binary(binary_rows);

converter.convert_rows(parsed.iter()).unwrap();
}

#[test]
#[should_panic(expected = "rows were not produced by this RowConverter")]
fn test_different_converter() {
Expand Down Expand Up @@ -2284,7 +2432,7 @@ mod tests {

let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();

let columns = options
let columns: Vec<SortField> = options
.into_iter()
.zip(&arrays)
.map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
Expand Down Expand Up @@ -2317,6 +2465,24 @@ mod tests {
actual.to_data().validate_full().unwrap();
dictionary_eq(actual, expected)
}

// Check that we can convert
let rows = rows.try_into_binary().expect("reasonable size");
let parser = converter.parser();
let back = converter
.convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
.unwrap();
for (actual, expected) in back.iter().zip(&arrays) {
actual.to_data().validate_full().unwrap();
dictionary_eq(actual, expected)
}

let rows = converter.from_binary(rows);
let back = converter.convert_rows(&rows).unwrap();
for (actual, expected) in back.iter().zip(&arrays) {
actual.to_data().validate_full().unwrap();
dictionary_eq(actual, expected)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add some negative tests (like create a BinaryArray from some random bytes and try to convert that back to an array and make sure it panics)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! I notice there's just one existing test for the parser, for utf8 data; I've matched that and added a couple more tests for interesting cases.

(This seems like great API surface to fuzz... but it's challenging to write a real fuzzer for, since panics are expected and miri is disabled for our existing fuzzer. May be interesting future work!)

}
}

Expand Down
Loading