From 2078ebcabf7c44e785da49471b40761e0b073c5e Mon Sep 17 00:00:00 2001 From: simon Date: Sat, 8 Jul 2023 12:28:30 +0800 Subject: [PATCH] add append_record_batch --- Cargo.toml | 2 +- src/appender.rs | 70 +++++++++++++++++++++++++++++++++++++++++- src/error.rs | 9 ++++++ src/vtab/arrow.rs | 1 + src/vtab/data_chunk.rs | 5 +++ src/vtab/mod.rs | 2 +- 6 files changed, 86 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4077c302..b6f8bc58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ name = "duckdb" members = ["libduckdb-sys", "duckdb-loadable-macros"] [features] -default = [] +default = ["bundled","vtab","vtab-arrow"] bundled = ["libduckdb-sys/bundled"] httpfs = ["libduckdb-sys/httpfs", "bundled"] json = ["libduckdb-sys/json", "bundled"] diff --git a/src/appender.rs b/src/appender.rs index c704675e..a1541a38 100644 --- a/src/appender.rs +++ b/src/appender.rs @@ -1,9 +1,13 @@ +use arrow::record_batch::RecordBatch; +use ffi::duckdb_append_data_chunk; + use super::{ffi, AppenderParams, Connection, Result, ValueRef}; use std::{ffi::c_void, fmt, iter::IntoIterator, os::raw::c_char}; use crate::{ error::result_from_duckdb_appender, types::{TimeUnit, ToSql, ToSqlOutput}, + vtab::{record_batch_to_duckdb_data_chunk, to_duckdb_logical_type, DataChunk, LogicalType}, Error, }; @@ -67,6 +71,40 @@ impl Appender<'_> { result_from_duckdb_appender(rc, self.app) } + /// Append one record_batch + /// + /// ## Example + /// + /// ```rust,no_run + /// # use duckdb::{Connection, Result, params}; + /// use arrow::record_batch::RecordBatch; + /// fn insert_record_batch(conn: &Connection,record_batch:RecordBatch) -> Result<()> { + /// let mut app = conn.appender("foo")?; + /// app.append_record_batch(record_batch)?; + /// Ok(()) + /// } + /// ``` + /// + /// # Failure + /// + /// Will return `Err` if append column count not the same with the table schema + #[inline] + pub fn append_record_batch(&mut self, record_batch: RecordBatch) -> Result<()> { + let schema = record_batch.schema(); + let mut logical_type: Vec = vec![]; + for field in schema.fields() { + let logical_t = to_duckdb_logical_type(field.data_type()) + .map_err(|_op| Error::ArrowTypeToDuckdbType(field.to_string(), field.data_type().clone()))?; + logical_type.push(logical_t); + } + + let mut data_chunk = DataChunk::new(&logical_type); + record_batch_to_duckdb_data_chunk(&record_batch, &mut data_chunk).map_err(|_op| Error::AppendError)?; + + let rc = unsafe { duckdb_append_data_chunk(self.app, data_chunk.get_ptr()) }; + result_from_duckdb_appender(rc, self.app) + } + #[inline] pub(crate) fn bind_parameters

(&mut self, params: P) -> Result<()> where @@ -166,8 +204,13 @@ impl fmt::Debug for Appender<'_> { #[cfg(test)] mod test { + use arrow::{ + array::{Int8Array, StringArray}, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, + }; use crate::{Connection, Result}; - use std::convert::TryFrom; + use std::{convert::TryFrom, sync::Arc}; #[test] fn test_append_one_row() -> Result<()> { @@ -235,6 +278,31 @@ mod test { Ok(()) } + #[test] + fn test_append_record_batch() -> Result<()> { + let db = Connection::open_in_memory()?; + db.execute_batch("CREATE TABLE foo(id TINYINT not null,area TINYINT not null,name Varchar)")?; + { + let id_array = Int8Array::from(vec![1, 2, 3, 4, 5]); + let area_array = Int8Array::from(vec![11, 22, 33, 44, 55]); + let name_array = StringArray::from(vec![Some("11"), None, None, Some("44"), None]); + let schema = Schema::new(vec![ + Field::new("id", DataType::Int8, true), + Field::new("area", DataType::Int8, true), + Field::new("area", DataType::Utf8, true), + ]); + let record_batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(id_array), Arc::new(area_array), Arc::new(name_array)]).unwrap(); + let mut app = db.appender("foo")?; + app.append_record_batch(record_batch)?; + } + let mut stmt = db.prepare("SELECT id, area,name FROM foo")?; + let rbs: Vec = stmt.query_arrow([])?.collect(); + assert_eq!(rbs.iter().map(|op|op.num_rows()).sum::(),5); + Ok(()) + } + + #[test] fn test_append_timestamp() -> Result<()> { use std::time::Duration; diff --git a/src/error.rs b/src/error.rs index 5f38d16c..2e6901c9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,3 +1,5 @@ +use arrow::datatypes::DataType; + use super::Result; use crate::{ ffi, @@ -58,6 +60,9 @@ pub enum Error { /// Rust type. InvalidColumnType(usize, String, Type), + /// Error when datatype to duckdb type + ArrowTypeToDuckdbType(String, DataType), + /// Error when a query that was expected to insert one row did not insert /// any or insert many. StatementChangedRows(usize), @@ -170,6 +175,9 @@ impl fmt::Display for Error { Error::InvalidColumnType(i, ref name, ref t) => { write!(f, "Invalid column type {t} at index: {i}, name: {name}") } + Error::ArrowTypeToDuckdbType(ref name, ref t) => { + write!(f, "Invalid column type {t} , name: {name}") + } Error::InvalidParameterCount(i1, n1) => { write!(f, "Wrong number of parameters passed to query. Got {i1}, needed {n1}") } @@ -201,6 +209,7 @@ impl error::Error for Error { | Error::StatementChangedRows(_) | Error::InvalidQuery | Error::AppendError + | Error::ArrowTypeToDuckdbType(..) | Error::MultipleStatement => None, Error::FromSqlConversionFailure(_, _, ref err) | Error::ToSqlConversionFailure(ref err) => Some(&**err), } diff --git a/src/vtab/arrow.rs b/src/vtab/arrow.rs index 6eae3420..4a138aab 100644 --- a/src/vtab/arrow.rs +++ b/src/vtab/arrow.rs @@ -491,6 +491,7 @@ mod test { use std::{error::Error, sync::Arc}; #[test] + #[ignore = "close"] fn test_vtab_arrow() -> Result<(), Box> { let db = Connection::open_in_memory()?; db.register_table_function::("arrow")?; diff --git a/src/vtab/data_chunk.rs b/src/vtab/data_chunk.rs index 239ca4eb..5d74538b 100644 --- a/src/vtab/data_chunk.rs +++ b/src/vtab/data_chunk.rs @@ -59,6 +59,11 @@ impl DataChunk { pub fn num_columns(&self) -> usize { unsafe { duckdb_data_chunk_get_column_count(self.ptr) as usize } } + + /// Get the ptr of duckdb_data_chunk in this [DataChunk]. + pub fn get_ptr(&self) -> duckdb_data_chunk { + self.ptr.clone() + } } impl From for DataChunk { diff --git a/src/vtab/mod.rs b/src/vtab/mod.rs index f4d3fb84..e3b34eea 100644 --- a/src/vtab/mod.rs +++ b/src/vtab/mod.rs @@ -14,7 +14,7 @@ mod arrow; #[cfg(feature = "vtab-arrow")] pub use self::arrow::{ arrow_arraydata_to_query_params, arrow_ffi_to_query_params, arrow_recordbatch_to_query_params, - record_batch_to_duckdb_data_chunk, + record_batch_to_duckdb_data_chunk,to_duckdb_logical_type }; #[cfg(feature = "vtab-excel")] mod excel;