diff --git a/vortex-array/src/array/chunked/mod.rs b/vortex-array/src/array/chunked/mod.rs index 4f4c81e0c..7e4cd26bb 100644 --- a/vortex-array/src/array/chunked/mod.rs +++ b/vortex-array/src/array/chunked/mod.rs @@ -10,7 +10,7 @@ use vortex_error::{vortex_bail, vortex_panic, VortexExpect as _, VortexResult}; use vortex_scalar::Scalar; use crate::array::primitive::PrimitiveArray; -use crate::compute::unary::{scalar_at, subtract_scalar, SubtractScalarFn}; +use crate::compute::unary::{scalar_at, scalar_at_unchecked, subtract_scalar, SubtractScalarFn}; use crate::compute::{search_sorted, SearchSortedSide}; use crate::encoding::ids; use crate::iter::{ArrayIterator, ArrayIteratorAdapter}; @@ -72,8 +72,12 @@ impl ChunkedArray { #[inline] pub fn chunk(&self, idx: usize) -> VortexResult { - let chunk_start = usize::try_from(&scalar_at(&self.chunk_offsets(), idx)?)?; - let chunk_end = usize::try_from(&scalar_at(&self.chunk_offsets(), idx + 1)?)?; + if idx >= self.nchunks() { + vortex_bail!("chunk index {} > num chunks ({})", idx, self.nchunks()); + } + + let chunk_start = usize::try_from(&scalar_at_unchecked(&self.chunk_offsets(), idx))?; + let chunk_end = usize::try_from(&scalar_at_unchecked(&self.chunk_offsets(), idx + 1))?; // Offset the index since chunk_ends is child 0. self.as_ref() diff --git a/vortex-array/src/array/null/mod.rs b/vortex-array/src/array/null/mod.rs index 454e1bdb2..2844b65c9 100644 --- a/vortex-array/src/array/null/mod.rs +++ b/vortex-array/src/array/null/mod.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use serde::{Deserialize, Serialize}; use vortex_dtype::DType; use vortex_error::{VortexExpect as _, VortexResult}; @@ -26,7 +24,7 @@ impl NullArray { DType::Null, len, NullMetadata { len }, - Arc::new([]), + [].into(), StatsSet::nulls(len, &DType::Null), ) .vortex_expect("NullArray::new should never fail!") diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index 927ddbe6c..99fd2e585 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -120,6 +120,15 @@ impl Array { ArrayChildrenIterator::new(self.clone()) } + /// Count the number of cumulative buffers encoded by self. + pub fn cumulative_nbuffers(&self) -> usize { + self.children() + .iter() + .map(|child| child.cumulative_nbuffers()) + .sum::() + + if self.buffer().is_some() { 1 } else { 0 } + } + /// Return the buffer offsets and the total length of all buffers, assuming the given alignment. /// This includes all child buffers. pub fn all_buffer_offsets(&self, alignment: usize) -> Vec { diff --git a/vortex-array/src/view.rs b/vortex-array/src/view.rs index 660eba78c..295c5893e 100644 --- a/vortex-array/src/view.rs +++ b/vortex-array/src/view.rs @@ -6,7 +6,7 @@ use itertools::Itertools; use log::warn; use vortex_buffer::Buffer; use vortex_dtype::{DType, Nullability}; -use vortex_error::{vortex_bail, vortex_err, VortexError, VortexExpect as _, VortexResult}; +use vortex_error::{vortex_err, VortexError, VortexExpect as _, VortexResult}; use vortex_scalar::{PValue, Scalar, ScalarValue}; use crate::encoding::EncodingRef; @@ -22,8 +22,7 @@ pub struct ArrayView { len: usize, flatbuffer: Buffer, flatbuffer_loc: usize, - // TODO(ngates): create an RC'd vector that can be lazily sliced. - buffers: Vec, + buffers: Arc<[Buffer]>, ctx: Arc, // TODO(ngates): a store a Projection. A projected ArrayView contains the full fb::Array // metadata, but only the buffers from the selected columns. Therefore we need to know @@ -60,20 +59,13 @@ impl ArrayView { .lookup_encoding(array.encoding()) .ok_or_else(|| vortex_err!(InvalidSerde: "Encoding ID out of bounds"))?; - if buffers.len() != Self::cumulative_nbuffers(array) { - vortex_bail!(InvalidSerde: - "Incorrect number of buffers {}, expected {}", - buffers.len(), - Self::cumulative_nbuffers(array) - ) - } let view = Self { encoding, dtype, len, flatbuffer, flatbuffer_loc, - buffers, + buffers: buffers.into(), ctx, }; @@ -130,26 +122,13 @@ impl ArrayView { Box::leak(Box::new(OpaqueEncoding(child.encoding()))) }); - // Figure out how many buffers to skip... - // We store them depth-first. - let buffer_offset = self - .flatbuffer() - .children() - .ok_or_else(|| vortex_err!("flatbuffer children not found"))? - .iter() - .take(idx) - .map(|child| Self::cumulative_nbuffers(child)) - .sum::() - + self.has_buffer() as usize; - let buffer_count = Self::cumulative_nbuffers(child); - Ok(Self { encoding, dtype: dtype.clone(), len, flatbuffer: self.flatbuffer.clone(), flatbuffer_loc, - buffers: self.buffers[buffer_offset..][0..buffer_count].to_vec(), + buffers: self.buffers.clone(), ctx: self.ctx.clone(), }) } @@ -173,20 +152,13 @@ impl ArrayView { /// Whether the current Array makes use of a buffer pub fn has_buffer(&self) -> bool { - self.flatbuffer().has_buffer() - } - - /// The number of buffers used by the current Array and all its children. - fn cumulative_nbuffers(array: fb::Array) -> usize { - let mut nbuffers = if array.has_buffer() { 1 } else { 0 }; - for child in array.children().unwrap_or_default() { - nbuffers += Self::cumulative_nbuffers(child) - } - nbuffers + self.flatbuffer().buffer_index().is_some() } pub fn buffer(&self) -> Option<&Buffer> { - self.has_buffer().then(|| &self.buffers[0]) + self.flatbuffer() + .buffer_index() + .map(|idx| &self.buffers[idx as usize]) } pub fn statistics(&self) -> &dyn Statistics { diff --git a/vortex-flatbuffers/flatbuffers/vortex-array/array.fbs b/vortex-flatbuffers/flatbuffers/vortex-array/array.fbs index 8c7dcc174..fc32282f2 100644 --- a/vortex-flatbuffers/flatbuffers/vortex-array/array.fbs +++ b/vortex-flatbuffers/flatbuffers/vortex-array/array.fbs @@ -6,7 +6,7 @@ enum Version: uint8 { table Array { version: Version = V0; - has_buffer: bool; + buffer_index: uint64 = null; encoding: uint16; metadata: [ubyte]; stats: ArrayStats; diff --git a/vortex-flatbuffers/src/generated/array.rs b/vortex-flatbuffers/src/generated/array.rs index 8f076eaa9..5eed6a7c8 100644 --- a/vortex-flatbuffers/src/generated/array.rs +++ b/vortex-flatbuffers/src/generated/array.rs @@ -93,7 +93,7 @@ impl<'a> flatbuffers::Verifiable for Version { impl flatbuffers::SimpleToVerifyInSlice for Version {} pub enum ArrayOffset {} -#[derive(Copy, Clone, PartialEq, Eq)] +#[derive(Copy, Clone, PartialEq)] pub struct Array<'a> { pub _tab: flatbuffers::Table<'a>, @@ -109,7 +109,7 @@ impl<'a> flatbuffers::Follow<'a> for Array<'a> { impl<'a> Array<'a> { pub const VT_VERSION: flatbuffers::VOffsetT = 4; - pub const VT_HAS_BUFFER: flatbuffers::VOffsetT = 6; + pub const VT_BUFFER_INDEX: flatbuffers::VOffsetT = 6; pub const VT_ENCODING: flatbuffers::VOffsetT = 8; pub const VT_METADATA: flatbuffers::VOffsetT = 10; pub const VT_STATS: flatbuffers::VOffsetT = 12; @@ -125,11 +125,11 @@ impl<'a> Array<'a> { args: &'args ArrayArgs<'args> ) -> flatbuffers::WIPOffset> { let mut builder = ArrayBuilder::new(_fbb); + if let Some(x) = args.buffer_index { builder.add_buffer_index(x); } if let Some(x) = args.children { builder.add_children(x); } if let Some(x) = args.stats { builder.add_stats(x); } if let Some(x) = args.metadata { builder.add_metadata(x); } builder.add_encoding(args.encoding); - builder.add_has_buffer(args.has_buffer); builder.add_version(args.version); builder.finish() } @@ -143,11 +143,11 @@ impl<'a> Array<'a> { unsafe { self._tab.get::(Array::VT_VERSION, Some(Version::V0)).unwrap()} } #[inline] - pub fn has_buffer(&self) -> bool { + pub fn buffer_index(&self) -> Option { // Safety: // Created from valid Table for this object // which contains a valid value in this slot - unsafe { self._tab.get::(Array::VT_HAS_BUFFER, Some(false)).unwrap()} + unsafe { self._tab.get::(Array::VT_BUFFER_INDEX, None)} } #[inline] pub fn encoding(&self) -> u16 { @@ -187,7 +187,7 @@ impl flatbuffers::Verifiable for Array<'_> { use self::flatbuffers::Verifiable; v.visit_table(pos)? .visit_field::("version", Self::VT_VERSION, false)? - .visit_field::("has_buffer", Self::VT_HAS_BUFFER, false)? + .visit_field::("buffer_index", Self::VT_BUFFER_INDEX, false)? .visit_field::("encoding", Self::VT_ENCODING, false)? .visit_field::>>("metadata", Self::VT_METADATA, false)? .visit_field::>("stats", Self::VT_STATS, false)? @@ -198,7 +198,7 @@ impl flatbuffers::Verifiable for Array<'_> { } pub struct ArrayArgs<'a> { pub version: Version, - pub has_buffer: bool, + pub buffer_index: Option, pub encoding: u16, pub metadata: Option>>, pub stats: Option>>, @@ -209,7 +209,7 @@ impl<'a> Default for ArrayArgs<'a> { fn default() -> Self { ArrayArgs { version: Version::V0, - has_buffer: false, + buffer_index: None, encoding: 0, metadata: None, stats: None, @@ -228,8 +228,8 @@ impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> ArrayBuilder<'a, 'b, A> { self.fbb_.push_slot::(Array::VT_VERSION, version, Version::V0); } #[inline] - pub fn add_has_buffer(&mut self, has_buffer: bool) { - self.fbb_.push_slot::(Array::VT_HAS_BUFFER, has_buffer, false); + pub fn add_buffer_index(&mut self, buffer_index: u64) { + self.fbb_.push_slot_always::(Array::VT_BUFFER_INDEX, buffer_index); } #[inline] pub fn add_encoding(&mut self, encoding: u16) { @@ -266,7 +266,7 @@ impl core::fmt::Debug for Array<'_> { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { let mut ds = f.debug_struct("Array"); ds.field("version", &self.version()); - ds.field("has_buffer", &self.has_buffer()); + ds.field("buffer_index", &self.buffer_index()); ds.field("encoding", &self.encoding()); ds.field("metadata", &self.metadata()); ds.field("stats", &self.stats()); @@ -275,7 +275,7 @@ impl core::fmt::Debug for Array<'_> { } } pub enum ArrayStatsOffset {} -#[derive(Copy, Clone, PartialEq, Eq)] +#[derive(Copy, Clone, PartialEq)] pub struct ArrayStats<'a> { pub _tab: flatbuffers::Table<'a>, diff --git a/vortex-flatbuffers/src/generated/dtype.rs b/vortex-flatbuffers/src/generated/dtype.rs index 7c2c24b92..b86459ec7 100644 --- a/vortex-flatbuffers/src/generated/dtype.rs +++ b/vortex-flatbuffers/src/generated/dtype.rs @@ -250,7 +250,7 @@ impl flatbuffers::SimpleToVerifyInSlice for Type {} pub struct TypeUnionTableOffset {} pub enum NullOffset {} -#[derive(Copy, Clone, PartialEq, Eq)] +#[derive(Copy, Clone, PartialEq)] pub struct Null<'a> { pub _tab: flatbuffers::Table<'a>, @@ -329,7 +329,7 @@ impl core::fmt::Debug for Null<'_> { } } pub enum BoolOffset {} -#[derive(Copy, Clone, PartialEq, Eq)] +#[derive(Copy, Clone, PartialEq)] pub struct Bool<'a> { pub _tab: flatbuffers::Table<'a>, @@ -540,7 +540,7 @@ impl core::fmt::Debug for Primitive<'_> { } } pub enum DecimalOffset {} -#[derive(Copy, Clone, PartialEq, Eq)] +#[derive(Copy, Clone, PartialEq)] pub struct Decimal<'a> { pub _tab: flatbuffers::Table<'a>, @@ -673,7 +673,7 @@ impl core::fmt::Debug for Decimal<'_> { } } pub enum Utf8Offset {} -#[derive(Copy, Clone, PartialEq, Eq)] +#[derive(Copy, Clone, PartialEq)] pub struct Utf8<'a> { pub _tab: flatbuffers::Table<'a>, @@ -770,7 +770,7 @@ impl core::fmt::Debug for Utf8<'_> { } } pub enum BinaryOffset {} -#[derive(Copy, Clone, PartialEq, Eq)] +#[derive(Copy, Clone, PartialEq)] pub struct Binary<'a> { pub _tab: flatbuffers::Table<'a>, diff --git a/vortex-flatbuffers/src/generated/message.rs b/vortex-flatbuffers/src/generated/message.rs index c70bf5a18..17c43298a 100644 --- a/vortex-flatbuffers/src/generated/message.rs +++ b/vortex-flatbuffers/src/generated/message.rs @@ -4,8 +4,8 @@ // @generated use crate::scalar::*; -use crate::array::*; use crate::dtype::*; +use crate::array::*; use core::mem; use core::cmp::Ordering; diff --git a/vortex-serde/src/layouts/write/writer.rs b/vortex-serde/src/layouts/write/writer.rs index 1b5d2d756..1b36a675e 100644 --- a/vortex-serde/src/layouts/write/writer.rs +++ b/vortex-serde/src/layouts/write/writer.rs @@ -117,7 +117,6 @@ impl LayoutWriter { async fn write_metadata_arrays(&mut self) -> VortexResult { let mut column_layouts = VecDeque::with_capacity(self.column_chunks.len()); - for mut chunk in mem::take(&mut self.column_chunks) { let len = chunk.byte_offsets.len() - 1; let mut chunks: VecDeque = chunk diff --git a/vortex-serde/src/messages.rs b/vortex-serde/src/messages.rs index 1913f0c91..95b619323 100644 --- a/vortex-serde/src/messages.rs +++ b/vortex-serde/src/messages.rs @@ -18,7 +18,7 @@ pub enum IPCMessage<'a> { pub struct IPCSchema<'a>(pub &'a DType); pub struct IPCBatch<'a>(pub &'a Array); -pub struct IPCArray<'a>(pub &'a Array); +pub struct IPCArray<'a>(pub &'a Array, usize); pub struct IPCPage<'a>(pub &'a Buffer); pub struct IPCDType(pub DType); @@ -87,13 +87,14 @@ impl<'a> WriteFlatBuffer for IPCBatch<'a> { fbb: &mut FlatBufferBuilder<'fb>, ) -> WIPOffset> { let array_data = self.0; - let array = Some(IPCArray(array_data).write_flatbuffer(fbb)); + let array = Some(IPCArray(array_data, 0).write_flatbuffer(fbb)); let length = array_data.len() as u64; // Walk the ColumnData depth-first to compute the buffer offsets. let mut buffers = vec![]; let mut offset = 0; + for array_data in array_data.depth_first_traversal() { if let Some(buffer) = array_data.buffer() { let aligned_size = (buffer.len() + (ALIGNMENT - 1)) & !(ALIGNMENT - 1); @@ -146,10 +147,21 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> { ), }; + // Assign buffer indices for all child arrays. + // The second tuple element holds the buffer_index for this Array subtree. If this array + // has a buffer, that is its buffer index. If it does not, that buffer index belongs + // to one of the children. + let child_buffer_offset = self.1 + if self.0.buffer().is_some() { 1 } else { 0 }; + let children = column_data .children() .iter() - .map(|child| IPCArray(child).write_flatbuffer(fbb)) + .scan(child_buffer_offset, |buffer_offset, child| { + // Update the number of buffers required. + let msg = IPCArray(child, *buffer_offset).write_flatbuffer(fbb); + *buffer_offset += child.cumulative_nbuffers(); + Some(msg) + }) .collect_vec(); let children = Some(fbb.create_vector(&children)); @@ -159,7 +171,7 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> { fbb, &fba::ArrayArgs { version: Default::default(), - has_buffer: column_data.buffer().is_some(), + buffer_index: self.0.buffer().is_some().then_some(self.1 as u64), encoding, metadata, stats,