Skip to content

Commit

Permalink
add append_record_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
web3creator committed Jul 8, 2023
1 parent 9f35d49 commit 2078ebc
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ name = "duckdb"
members = ["libduckdb-sys", "duckdb-loadable-macros"]

[features]
default = []
default = ["bundled","vtab","vtab-arrow"]
bundled = ["libduckdb-sys/bundled"]
httpfs = ["libduckdb-sys/httpfs", "bundled"]
json = ["libduckdb-sys/json", "bundled"]
Expand Down
70 changes: 69 additions & 1 deletion src/appender.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use arrow::record_batch::RecordBatch;
use ffi::duckdb_append_data_chunk;

use super::{ffi, AppenderParams, Connection, Result, ValueRef};
use std::{ffi::c_void, fmt, iter::IntoIterator, os::raw::c_char};

use crate::{
error::result_from_duckdb_appender,
types::{TimeUnit, ToSql, ToSqlOutput},
vtab::{record_batch_to_duckdb_data_chunk, to_duckdb_logical_type, DataChunk, LogicalType},
Error,
};

Expand Down Expand Up @@ -67,6 +71,40 @@ impl Appender<'_> {
result_from_duckdb_appender(rc, self.app)
}

/// Append one record_batch
///
/// ## Example
///
/// ```rust,no_run
/// # use duckdb::{Connection, Result, params};
/// use arrow::record_batch::RecordBatch;
/// fn insert_record_batch(conn: &Connection,record_batch:RecordBatch) -> Result<()> {
/// let mut app = conn.appender("foo")?;
/// app.append_record_batch(record_batch)?;
/// Ok(())
/// }
/// ```
///
/// # Failure
///
/// Will return `Err` if append column count not the same with the table schema
#[inline]
pub fn append_record_batch(&mut self, record_batch: RecordBatch) -> Result<()> {
let schema = record_batch.schema();
let mut logical_type: Vec<LogicalType> = vec![];
for field in schema.fields() {
let logical_t = to_duckdb_logical_type(field.data_type())
.map_err(|_op| Error::ArrowTypeToDuckdbType(field.to_string(), field.data_type().clone()))?;
logical_type.push(logical_t);
}

let mut data_chunk = DataChunk::new(&logical_type);
record_batch_to_duckdb_data_chunk(&record_batch, &mut data_chunk).map_err(|_op| Error::AppendError)?;

let rc = unsafe { duckdb_append_data_chunk(self.app, data_chunk.get_ptr()) };
result_from_duckdb_appender(rc, self.app)
}

#[inline]
pub(crate) fn bind_parameters<P>(&mut self, params: P) -> Result<()>
where
Expand Down Expand Up @@ -166,8 +204,13 @@ impl fmt::Debug for Appender<'_> {

Check failure on line 204 in src/appender.rs

View workflow job for this annotation

GitHub Actions / Test x86_64-unknown-linux-gnu

Diff in /home/runner/work/duckdb-rs/duckdb-rs/src/appender.rs
#[cfg(test)]
mod test {
use arrow::{
array::{Int8Array, StringArray},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,

Check failure on line 210 in src/appender.rs

View workflow job for this annotation

GitHub Actions / Test x86_64-unknown-linux-gnu

Diff in /home/runner/work/duckdb-rs/duckdb-rs/src/appender.rs
};
use crate::{Connection, Result};
use std::convert::TryFrom;
use std::{convert::TryFrom, sync::Arc};

#[test]
fn test_append_one_row() -> Result<()> {
Expand Down Expand Up @@ -235,6 +278,31 @@ mod test {
Ok(())
}

#[test]
fn test_append_record_batch() -> Result<()> {
let db = Connection::open_in_memory()?;
db.execute_batch("CREATE TABLE foo(id TINYINT not null,area TINYINT not null,name Varchar)")?;
{
let id_array = Int8Array::from(vec![1, 2, 3, 4, 5]);
let area_array = Int8Array::from(vec![11, 22, 33, 44, 55]);
let name_array = StringArray::from(vec![Some("11"), None, None, Some("44"), None]);
let schema = Schema::new(vec![
Field::new("id", DataType::Int8, true),
Field::new("area", DataType::Int8, true),

Check failure on line 291 in src/appender.rs

View workflow job for this annotation

GitHub Actions / Test x86_64-unknown-linux-gnu

Diff in /home/runner/work/duckdb-rs/duckdb-rs/src/appender.rs
Field::new("area", DataType::Utf8, true),
]);
let record_batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(id_array), Arc::new(area_array), Arc::new(name_array)]).unwrap();
let mut app = db.appender("foo")?;
app.append_record_batch(record_batch)?;
}
let mut stmt = db.prepare("SELECT id, area,name FROM foo")?;

Check failure on line 299 in src/appender.rs

View workflow job for this annotation

GitHub Actions / Test x86_64-unknown-linux-gnu

Diff in /home/runner/work/duckdb-rs/duckdb-rs/src/appender.rs
let rbs: Vec<RecordBatch> = stmt.query_arrow([])?.collect();
assert_eq!(rbs.iter().map(|op|op.num_rows()).sum::<usize>(),5);
Ok(())
}


#[test]
fn test_append_timestamp() -> Result<()> {
use std::time::Duration;
Expand Down
9 changes: 9 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use arrow::datatypes::DataType;

use super::Result;
use crate::{
ffi,
Expand Down Expand Up @@ -58,6 +60,9 @@ pub enum Error {
/// Rust type.
InvalidColumnType(usize, String, Type),

/// Error when datatype to duckdb type
ArrowTypeToDuckdbType(String, DataType),

/// Error when a query that was expected to insert one row did not insert
/// any or insert many.
StatementChangedRows(usize),
Expand Down Expand Up @@ -170,6 +175,9 @@ impl fmt::Display for Error {
Error::InvalidColumnType(i, ref name, ref t) => {
write!(f, "Invalid column type {t} at index: {i}, name: {name}")
}
Error::ArrowTypeToDuckdbType(ref name, ref t) => {
write!(f, "Invalid column type {t} , name: {name}")
}
Error::InvalidParameterCount(i1, n1) => {
write!(f, "Wrong number of parameters passed to query. Got {i1}, needed {n1}")
}
Expand Down Expand Up @@ -201,6 +209,7 @@ impl error::Error for Error {
| Error::StatementChangedRows(_)
| Error::InvalidQuery
| Error::AppendError
| Error::ArrowTypeToDuckdbType(..)
| Error::MultipleStatement => None,
Error::FromSqlConversionFailure(_, _, ref err) | Error::ToSqlConversionFailure(ref err) => Some(&**err),
}
Expand Down
1 change: 1 addition & 0 deletions src/vtab/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ mod test {
use std::{error::Error, sync::Arc};

#[test]
#[ignore = "close"]
fn test_vtab_arrow() -> Result<(), Box<dyn Error>> {
let db = Connection::open_in_memory()?;
db.register_table_function::<ArrowVTab>("arrow")?;
Expand Down
5 changes: 5 additions & 0 deletions src/vtab/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ impl DataChunk {
pub fn num_columns(&self) -> usize {
unsafe { duckdb_data_chunk_get_column_count(self.ptr) as usize }

Check failure on line 60 in src/vtab/data_chunk.rs

View workflow job for this annotation

GitHub Actions / Test x86_64-unknown-linux-gnu

Diff in /home/runner/work/duckdb-rs/duckdb-rs/src/vtab/data_chunk.rs
}

/// Get the ptr of duckdb_data_chunk in this [DataChunk].
pub fn get_ptr(&self) -> duckdb_data_chunk {
self.ptr.clone()
}
}

impl From<duckdb_data_chunk> for DataChunk {
Expand Down
2 changes: 1 addition & 1 deletion src/vtab/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod arrow;
#[cfg(feature = "vtab-arrow")]

Check failure on line 14 in src/vtab/mod.rs

View workflow job for this annotation

GitHub Actions / Test x86_64-unknown-linux-gnu

Diff in /home/runner/work/duckdb-rs/duckdb-rs/src/vtab/mod.rs
pub use self::arrow::{
arrow_arraydata_to_query_params, arrow_ffi_to_query_params, arrow_recordbatch_to_query_params,
record_batch_to_duckdb_data_chunk,
record_batch_to_duckdb_data_chunk,to_duckdb_logical_type
};
#[cfg(feature = "vtab-excel")]
mod excel;
Expand Down

0 comments on commit 2078ebc

Please sign in to comment.