From 2e638f11bc0940219c22c0f5ebf4b9565d9a1b97 Mon Sep 17 00:00:00 2001 From: Swoorup Joshi Date: Thu, 18 Apr 2024 03:08:10 +1000 Subject: [PATCH] Added struct type support in arrow feature (#279) --- src/vtab/arrow.rs | 161 +++++++++++++++++++++++++++------------------- 1 file changed, 95 insertions(+), 66 deletions(-) diff --git a/src/vtab/arrow.rs b/src/vtab/arrow.rs index e0c7a137..9e6e85be 100644 --- a/src/vtab/arrow.rs +++ b/src/vtab/arrow.rs @@ -1,13 +1,13 @@ use super::{ vector::{FlatVector, ListVector, Vector}, - BindInfo, DataChunk, Free, FunctionInfo, InitInfo, LogicalType, LogicalTypeId, VTab, + BindInfo, DataChunk, Free, FunctionInfo, InitInfo, LogicalType, LogicalTypeId, StructVector, VTab, }; use crate::vtab::vector::Inserter; use arrow::array::{ - as_boolean_array, as_large_list_array, as_list_array, as_primitive_array, as_string_array, Array, ArrayData, - AsArray, BooleanArray, Decimal128Array, FixedSizeListArray, GenericListArray, OffsetSizeTrait, PrimitiveArray, - StringArray, StructArray, + as_boolean_array, as_large_list_array, as_list_array, as_primitive_array, as_string_array, as_struct_array, Array, + ArrayData, AsArray, BooleanArray, Decimal128Array, FixedSizeListArray, GenericListArray, OffsetSizeTrait, + PrimitiveArray, StringArray, StructArray, }; use arrow::{ @@ -181,15 +181,12 @@ pub fn to_duckdb_logical_type(data_type: &DataType) -> Result Result { fixed_size_list_array_to_vector(as_fixed_size_list_array(col.as_ref()), &mut chunk.list_vector(i)); } - // DataType::Struct(_) => { - // let struct_array = as_struct_array(col.as_ref()); - // let mut struct_vector = chunk.struct_vector(i); - // struct_array_to_vector(struct_array, &mut struct_vector); - // } + DataType::Struct(_) => { + let struct_array = as_struct_array(col.as_ref()); + let mut struct_vector = chunk.struct_vector(i); + struct_array_to_vector(struct_array, &mut struct_vector); + } _ => { - println!( + unimplemented!( "column {} is not supported yet, please file an issue https://github.com/wangfenjin/duckdb-rs", batch.schema().field(i) ); - todo!() } } } @@ -458,46 +455,42 @@ fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray { arr.as_any().downcast_ref::().unwrap() } -// fn struct_array_to_vector(array: &StructArray, out: &mut StructVector) { -// for i in 0..array.num_columns() { -// let column = array.column(i); -// match column.data_type() { -// dt if dt.is_primitive() || matches!(dt, DataType::Boolean) => { -// primitive_array_to_vector(column, &mut out.child(i)); -// } -// DataType::Utf8 => { -// string_array_to_vector(as_string_array(column.as_ref()), &mut out.child(i)); -// } -// DataType::List(_) => { -// list_array_to_vector( -// as_list_array(column.as_ref()), -// &mut out.list_vector_child(i), -// ); -// } -// DataType::LargeList(_) => { -// list_array_to_vector( -// as_large_list_array(column.as_ref()), -// &mut out.list_vector_child(i), -// ); -// } -// DataType::FixedSizeList(_, _) => { -// fixed_size_list_array_to_vector( -// as_fixed_size_list_array(column.as_ref()), -// &mut out.list_vector_child(i), -// ); -// } -// DataType::Struct(_) => { -// let struct_array = as_struct_array(column.as_ref()); -// let mut struct_vector = out.struct_vector_child(i); -// struct_array_to_vector(struct_array, &mut struct_vector); -// } -// _ => { -// println!("Unsupported data type: {}, please file an issue https://github.com/wangfenjin/duckdb-rs", column.data_type()); -// todo!() -// } -// } -// } -// } +fn struct_array_to_vector(array: &StructArray, out: &mut StructVector) { + for i in 0..array.num_columns() { + let column = array.column(i); + match column.data_type() { + dt if dt.is_primitive() || matches!(dt, DataType::Boolean) => { + primitive_array_to_vector(column, &mut out.child(i)); + } + DataType::Utf8 => { + string_array_to_vector(as_string_array(column.as_ref()), &mut out.child(i)); + } + DataType::List(_) => { + list_array_to_vector(as_list_array(column.as_ref()), &mut out.list_vector_child(i)); + } + DataType::LargeList(_) => { + list_array_to_vector(as_large_list_array(column.as_ref()), &mut out.list_vector_child(i)); + } + DataType::FixedSizeList(_, _) => { + fixed_size_list_array_to_vector( + as_fixed_size_list_array(column.as_ref()), + &mut out.list_vector_child(i), + ); + } + DataType::Struct(_) => { + let struct_array = as_struct_array(column.as_ref()); + let mut struct_vector = out.struct_vector_child(i); + struct_array_to_vector(struct_array, &mut struct_vector); + } + _ => { + unimplemented!( + "Unsupported data type: {}, please file an issue https://github.com/wangfenjin/duckdb-rs", + column.data_type() + ); + } + } + } +} /// Pass RecordBatch to duckdb. /// @@ -538,11 +531,11 @@ mod test { use crate::{Connection, Result}; use arrow::{ array::{ - Array, AsArray, Date32Array, Date64Array, Float64Array, Int32Array, PrimitiveArray, StringArray, - Time32SecondArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, + Array, ArrayRef, AsArray, Date32Array, Date64Array, Float64Array, Int32Array, PrimitiveArray, StringArray, + StructArray, Time32SecondArray, Time64MicrosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, }, - datatypes::{ArrowPrimitiveType, DataType, Field, Schema}, + datatypes::{ArrowPrimitiveType, DataType, Field, Fields, Schema}, record_batch::RecordBatch, }; use std::{error::Error, sync::Arc}; @@ -588,6 +581,42 @@ mod test { Ok(()) } + #[test] + fn test_append_struct() -> Result<(), Box> { + let db = Connection::open_in_memory()?; + db.execute_batch("CREATE TABLE t1 (s STRUCT(v VARCHAR, i INTEGER))")?; + { + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("v", DataType::Utf8, true)), + Arc::new(StringArray::from(vec![Some("foo"), Some("bar")])) as ArrayRef, + ), + ( + Arc::new(Field::new("i", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef, + ), + ]); + + let schema = Schema::new(vec![Field::new( + "s", + DataType::Struct(Fields::from(vec![ + Field::new("v", DataType::Utf8, true), + Field::new("i", DataType::Int32, true), + ])), + true, + )]); + + let record_batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)])?; + let mut app = db.appender("t1")?; + app.append_record_batch(record_batch)?; + } + let mut stmt = db.prepare("SELECT s FROM t1")?; + let rbs: Vec = stmt.query_arrow([])?.collect(); + assert_eq!(rbs.iter().map(|op| op.num_rows()).sum::(), 2); + + Ok(()) + } + fn check_rust_primitive_array_roundtrip( input_array: PrimitiveArray, expected_array: PrimitiveArray,