Skip to content

Commit

Permalink
refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
wangfenjin committed Jul 19, 2023
1 parent d31d78b commit a470462
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 80 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ vtab = []
vtab-loadable = ["vtab", "duckdb-loadable-macros"]
vtab-excel = ["vtab", "calamine"]
vtab-arrow = ["vtab", "num"]
vtab-full = ["vtab-excel", "vtab-arrow"]
appender-arrow = ["vtab-arrow"]
vtab-full = ["vtab-excel", "vtab-arrow", "appender-arrow"]
extensions-full = ["httpfs", "json", "parquet", "vtab-full"]
buildtime_bindgen = ["libduckdb-sys/buildtime_bindgen"]
modern-full = ["chrono", "serde_json", "url", "r2d2", "uuid", "polars"]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ See to [Contributing.md](CONTRIBUTING.md)

### Checklist

- Run `cargo fmt` to ensure your Rust code is correctly formatted.
- Run `cargo +nightly fmt` to ensure your Rust code is correctly formatted.
- Run `cargo clippy --fix --allow-dirty --all-targets --workspace --all-features -- -D warnings` to fix all clippy issues.
- Ensure `cargo test --all-targets --workspace --features "modern-full extensions-full"` reports no failures.

Expand Down
82 changes: 82 additions & 0 deletions src/appender/arrow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use super::{ffi, Appender, Result};
use crate::{
error::result_from_duckdb_appender,
vtab::{record_batch_to_duckdb_data_chunk, to_duckdb_logical_type, DataChunk, LogicalType},
Error,
};
use arrow::record_batch::RecordBatch;
use ffi::duckdb_append_data_chunk;

impl Appender<'_> {
/// Append one record_batch
///
/// ## Example
///
/// ```rust,no_run
/// # use duckdb::{Connection, Result, params};
/// use arrow::record_batch::RecordBatch;
/// fn insert_record_batch(conn: &Connection,record_batch:RecordBatch) -> Result<()> {
/// let mut app = conn.appender("foo")?;
/// app.append_record_batch(record_batch)?;
/// Ok(())
/// }
/// ```
///
/// # Failure
///
/// Will return `Err` if append column count not the same with the table schema
#[inline]
pub fn append_record_batch(&mut self, record_batch: RecordBatch) -> Result<()> {
let schema = record_batch.schema();
let mut logical_type: Vec<LogicalType> = vec![];
for field in schema.fields() {
let logical_t = to_duckdb_logical_type(field.data_type())
.map_err(|_op| Error::ArrowTypeToDuckdbType(field.to_string(), field.data_type().clone()))?;
logical_type.push(logical_t);
}

let mut data_chunk = DataChunk::new(&logical_type);
record_batch_to_duckdb_data_chunk(&record_batch, &mut data_chunk).map_err(|_op| Error::AppendError)?;

let rc = unsafe { duckdb_append_data_chunk(self.app, data_chunk.get_ptr()) };
result_from_duckdb_appender(rc, self.app)
}
}

#[cfg(test)]
mod test {
use crate::{Connection, Result};
use arrow::{
array::{Int8Array, StringArray},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use std::sync::Arc;

#[test]
fn test_append_record_batch() -> Result<()> {
let db = Connection::open_in_memory()?;
db.execute_batch("CREATE TABLE foo(id TINYINT not null,area TINYINT not null,name Varchar)")?;
{
let id_array = Int8Array::from(vec![1, 2, 3, 4, 5]);
let area_array = Int8Array::from(vec![11, 22, 33, 44, 55]);
let name_array = StringArray::from(vec![Some("11"), None, None, Some("44"), None]);
let schema = Schema::new(vec![
Field::new("id", DataType::Int8, true),
Field::new("area", DataType::Int8, true),
Field::new("area", DataType::Utf8, true),
]);
let record_batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(id_array), Arc::new(area_array), Arc::new(name_array)],
)
.unwrap();
let mut app = db.appender("foo")?;
app.append_record_batch(record_batch)?;
}
let mut stmt = db.prepare("SELECT id, area,name FROM foo")?;
let rbs: Vec<RecordBatch> = stmt.query_arrow([])?.collect();
assert_eq!(rbs.iter().map(|op| op.num_rows()).sum::<usize>(), 5);
Ok(())
}
}
80 changes: 3 additions & 77 deletions src/appender.rs → src/appender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,15 @@ use crate::{
Error,
};

#[cfg(feature = "vtab-arrow,vtab")]
use arrow::record_batch::RecordBatch;
#[cfg(feature = "vtab-arrow,vtab")]
use ffi::duckdb_append_data_chunk;

#[cfg(feature = "vtab-arrow,vtab")]
use vtab::{record_batch_to_duckdb_data_chunk, to_duckdb_logical_type, DataChunk, LogicalType};

/// Appender for fast import data
pub struct Appender<'conn> {
conn: &'conn Connection,
app: ffi::duckdb_appender,
}

#[cfg(feature = "appender-arrow")]
mod arrow;

impl Appender<'_> {
/// Append multiple rows from Iterator
///
Expand Down Expand Up @@ -75,41 +70,6 @@ impl Appender<'_> {
result_from_duckdb_appender(rc, self.app)
}

/// Append one record_batch
///
/// ## Example
///
/// ```rust,no_run
/// # use duckdb::{Connection, Result, params};
/// use arrow::record_batch::RecordBatch;
/// fn insert_record_batch(conn: &Connection,record_batch:RecordBatch) -> Result<()> {
/// let mut app = conn.appender("foo")?;
/// app.append_record_batch(record_batch)?;
/// Ok(())
/// }
/// ```
///
/// # Failure
///
/// Will return `Err` if append column count not the same with the table schema
#[inline]
#[cfg(feature = "vtab-arrow,vtab")]
pub fn append_record_batch(&mut self, record_batch: RecordBatch) -> Result<()> {
let schema = record_batch.schema();
let mut logical_type: Vec<LogicalType> = vec![];
for field in schema.fields() {
let logical_t = to_duckdb_logical_type(field.data_type())
.map_err(|_op| Error::ArrowTypeToDuckdbType(field.to_string(), field.data_type().clone()))?;
logical_type.push(logical_t);
}

let mut data_chunk = DataChunk::new(&logical_type);
record_batch_to_duckdb_data_chunk(&record_batch, &mut data_chunk).map_err(|_op| Error::AppendError)?;

let rc = unsafe { duckdb_append_data_chunk(self.app, data_chunk.get_ptr()) };
result_from_duckdb_appender(rc, self.app)
}

#[inline]
pub(crate) fn bind_parameters<P>(&mut self, params: P) -> Result<()>
where
Expand Down Expand Up @@ -278,40 +238,6 @@ mod test {
Ok(())
}

#[test]
#[cfg(feature = "vtab-arrow,vtab")]
fn test_append_record_batch() -> Result<()> {
use arrow::{
array::{Int8Array, StringArray},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use std::sync::Arc;
let db = Connection::open_in_memory()?;
db.execute_batch("CREATE TABLE foo(id TINYINT not null,area TINYINT not null,name Varchar)")?;
{
let id_array = Int8Array::from(vec![1, 2, 3, 4, 5]);
let area_array = Int8Array::from(vec![11, 22, 33, 44, 55]);
let name_array = StringArray::from(vec![Some("11"), None, None, Some("44"), None]);
let schema = Schema::new(vec![
Field::new("id", DataType::Int8, true),
Field::new("area", DataType::Int8, true),
Field::new("area", DataType::Utf8, true),
]);
let record_batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(id_array), Arc::new(area_array), Arc::new(name_array)],
)
.unwrap();
let mut app = db.appender("foo")?;
app.append_record_batch(record_batch)?;
}
let mut stmt = db.prepare("SELECT id, area,name FROM foo")?;
let rbs: Vec<RecordBatch> = stmt.query_arrow([])?.collect();
assert_eq!(rbs.iter().map(|op| op.num_rows()).sum::<usize>(), 5);
Ok(())
}

#[test]
fn test_append_timestamp() -> Result<()> {
use std::time::Duration;
Expand Down
3 changes: 2 additions & 1 deletion src/vtab/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl VTab for ArrowVTab {
}
}

/// Convert arrow DataType to duckdb type id
pub fn to_duckdb_type_id(data_type: &DataType) -> Result<LogicalTypeId, Box<dyn std::error::Error>> {
use LogicalTypeId::*;

Expand Down Expand Up @@ -160,6 +161,7 @@ pub fn to_duckdb_type_id(data_type: &DataType) -> Result<LogicalTypeId, Box<dyn
Ok(type_id)
}

/// Convert arrow DataType to duckdb logical type
pub fn to_duckdb_logical_type(data_type: &DataType) -> Result<LogicalType, Box<dyn std::error::Error>> {
if data_type.is_primitive()
|| matches!(
Expand Down Expand Up @@ -490,7 +492,6 @@ mod test {
use std::{error::Error, sync::Arc};

#[test]
#[ignore = "close"]
fn test_vtab_arrow() -> Result<(), Box<dyn Error>> {
let db = Connection::open_in_memory()?;
db.register_table_function::<ArrowVTab>("arrow")?;
Expand Down

0 comments on commit a470462

Please sign in to comment.