-
Notifications
You must be signed in to change notification settings - Fork 745
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Binary / row helpers #6096
base: master
Are you sure you want to change the base?
Binary / row helpers #6096
Changes from all commits
ad3d611
24a622a
980ec14
6c2a980
3a646e5
c3b6b36
a941b94
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -132,7 +132,7 @@ use std::sync::Arc; | |
use arrow_array::cast::*; | ||
use arrow_array::types::ArrowDictionaryKeyType; | ||
use arrow_array::*; | ||
use arrow_buffer::ArrowNativeType; | ||
use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer}; | ||
use arrow_data::ArrayDataBuilder; | ||
use arrow_schema::*; | ||
use variable::{decode_binary_view, decode_string_view}; | ||
|
@@ -738,6 +738,42 @@ impl RowConverter { | |
} | ||
} | ||
|
||
/// Create a new [Rows] instance from the given binary data. | ||
/// | ||
/// ``` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it may also be worth adding a doc comment here about when this API will panic (when the data passed in was invalid or empty) |
||
/// # use std::sync::Arc; | ||
/// # use std::collections::HashSet; | ||
/// # use arrow_array::cast::AsArray; | ||
/// # use arrow_array::StringArray; | ||
/// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField}; | ||
/// # use arrow_schema::DataType; | ||
/// # | ||
/// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); | ||
/// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]); | ||
/// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap(); | ||
/// | ||
/// // We can convert rows into binary format and back in batch. | ||
/// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect(); | ||
/// let binary = rows.try_into_binary().expect("small"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I got a little confused by |
||
/// let converted = converter.from_binary(binary.clone()); | ||
/// assert!(converted.iter().eq(values.iter().map(|r| r.row()))); | ||
/// ``` | ||
pub fn from_binary(&self, array: BinaryArray) -> Rows { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if this function needs to be marked However, I see that there is already a way to convert between There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that's my understanding! This would definitely be unsafe without the |
||
assert_eq!( | ||
array.null_count(), | ||
0, | ||
"can't construct Rows instance from array with nulls" | ||
); | ||
Rows { | ||
buffer: array.values().to_vec(), | ||
offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(), | ||
config: RowConfig { | ||
fields: Arc::clone(&self.fields), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. More for my curiosity than anything but why |
||
validate_utf8: true, | ||
}, | ||
} | ||
} | ||
|
||
/// Convert raw bytes into [`ArrayRef`] | ||
/// | ||
/// # Safety | ||
|
@@ -878,6 +914,50 @@ impl Rows { | |
+ self.buffer.len() | ||
+ self.offsets.len() * std::mem::size_of::<usize>() | ||
} | ||
|
||
/// Create a [BinaryArray] from the [Rows] data without reallocating the | ||
/// underlying bytes. | ||
/// | ||
/// | ||
/// ``` | ||
/// # use std::sync::Arc; | ||
/// # use std::collections::HashSet; | ||
/// # use arrow_array::cast::AsArray; | ||
/// # use arrow_array::StringArray; | ||
/// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField}; | ||
/// # use arrow_schema::DataType; | ||
/// # | ||
/// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); | ||
/// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]); | ||
/// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap(); | ||
/// | ||
/// // We can convert rows into binary format and back. | ||
/// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect(); | ||
/// let binary = rows.try_into_binary().expect("small"); | ||
/// let parser = converter.parser(); | ||
/// let parsed: Vec<OwnedRow> = | ||
/// binary.iter().flatten().map(|b| parser.parse(b).owned()).collect(); | ||
/// assert_eq!(values, parsed); | ||
/// ``` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it is worth commenting here when this will return an error (aka when the data is more than 2GB)? |
||
pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> { | ||
if self.buffer.len() > i32::MAX as usize { | ||
return Err(ArrowError::InvalidArgumentError(format!( | ||
"{}-byte rows buffer too long to convert into a i32-indexed BinaryArray", | ||
self.buffer.len() | ||
))); | ||
} | ||
// We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well. | ||
let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs to check that the offsets don't overflow a i32 - this should be a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My belief is that this is guaranteed by the assert above (which asserts that the len is not larger than I'll go ahead and turn that assert into a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with @bkirwi 's logic here. If we assume that |
||
// SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer. | ||
let array = unsafe { | ||
BinaryArray::new_unchecked( | ||
OffsetBuffer::new_unchecked(offsets_scalar), | ||
Buffer::from_vec(self.buffer), | ||
None, | ||
) | ||
}; | ||
Ok(array) | ||
} | ||
} | ||
|
||
impl<'a> IntoIterator for &'a Rows { | ||
|
@@ -961,6 +1041,11 @@ impl<'a> Row<'a> { | |
config: self.config.clone(), | ||
} | ||
} | ||
|
||
/// The row's bytes, with the lifetime of the underlying data. | ||
pub fn data(&self) -> &'a [u8] { | ||
self.data | ||
} | ||
} | ||
|
||
// Manually derive these as don't wish to include `fields` | ||
|
@@ -1838,6 +1923,69 @@ mod tests { | |
converter.convert_rows(std::iter::once(utf8_row)).unwrap(); | ||
} | ||
|
||
#[test] | ||
#[should_panic(expected = "Encountered non UTF-8 data")] | ||
fn test_invalid_utf8_array() { | ||
let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap(); | ||
let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _; | ||
let rows = converter.convert_columns(&[array]).unwrap(); | ||
let binary_rows = rows.try_into_binary().expect("known-small rows"); | ||
|
||
let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); | ||
let parsed = converter.from_binary(binary_rows); | ||
|
||
converter.convert_rows(parsed.iter()).unwrap(); | ||
} | ||
|
||
|
||
#[test] | ||
#[should_panic(expected = "index out of bounds")] | ||
fn test_invalid_empty() { | ||
let binary_row: &[u8] = &[]; | ||
|
||
let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); | ||
let parser = converter.parser(); | ||
let utf8_row = parser.parse(binary_row.as_ref()); | ||
|
||
converter.convert_rows(std::iter::once(utf8_row)).unwrap(); | ||
} | ||
|
||
#[test] | ||
#[should_panic(expected = "index out of bounds")] | ||
fn test_invalid_empty_array() { | ||
let row: &[u8] = &[]; | ||
let binary_rows = BinaryArray::from(vec![row]); | ||
|
||
let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); | ||
let parsed = converter.from_binary(binary_rows); | ||
|
||
converter.convert_rows(parsed.iter()).unwrap(); | ||
} | ||
|
||
#[test] | ||
#[should_panic(expected = "index out of bounds")] | ||
fn test_invalid_truncated() { | ||
let binary_row: &[u8] = &[0x02]; | ||
|
||
let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); | ||
let parser = converter.parser(); | ||
let utf8_row = parser.parse(binary_row.as_ref()); | ||
|
||
converter.convert_rows(std::iter::once(utf8_row)).unwrap(); | ||
} | ||
|
||
#[test] | ||
#[should_panic(expected = "index out of bounds")] | ||
fn test_invalid_truncated_array() { | ||
let row: &[u8] = &[0x02]; | ||
let binary_rows = BinaryArray::from(vec![row]); | ||
|
||
let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); | ||
let parsed = converter.from_binary(binary_rows); | ||
|
||
converter.convert_rows(parsed.iter()).unwrap(); | ||
} | ||
|
||
#[test] | ||
#[should_panic(expected = "rows were not produced by this RowConverter")] | ||
fn test_different_converter() { | ||
|
@@ -2284,7 +2432,7 @@ mod tests { | |
|
||
let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap(); | ||
|
||
let columns = options | ||
let columns: Vec<SortField> = options | ||
.into_iter() | ||
.zip(&arrays) | ||
.map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o)) | ||
|
@@ -2317,6 +2465,24 @@ mod tests { | |
actual.to_data().validate_full().unwrap(); | ||
dictionary_eq(actual, expected) | ||
} | ||
|
||
// Check that we can convert | ||
let rows = rows.try_into_binary().expect("reasonable size"); | ||
let parser = converter.parser(); | ||
let back = converter | ||
.convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes")))) | ||
.unwrap(); | ||
for (actual, expected) in back.iter().zip(&arrays) { | ||
actual.to_data().validate_full().unwrap(); | ||
dictionary_eq(actual, expected) | ||
} | ||
|
||
let rows = converter.from_binary(rows); | ||
let back = converter.convert_rows(&rows).unwrap(); | ||
for (actual, expected) in back.iter().zip(&arrays) { | ||
actual.to_data().validate_full().unwrap(); | ||
dictionary_eq(actual, expected) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you also add some negative tests (like create a BinaryArray from some random bytes and try to convert that back to an array and make sure it panics)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure! I notice there's just one existing test for the parser, for utf8 data; I've matched that and added a couple more tests for interesting cases. (This seems like great API surface to fuzz... but it's challenging to write a real fuzzer for, since panics are expected and miri is disabled for our existing fuzzer. May be interesting future work!) |
||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a doc example showing how to do this?
I think trying to give a basic example of converting rows, then to/from binary, will not only serve as good documentation it will make sure all the required APIs are pub (for example, I think
RowParser
needs to be pub..)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done.