Skip to content

Commit

Permalink
Merge branch 'main' into prerelease
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxiaoying committed Jul 5, 2023
2 parents 8d41ef6 + 47df7f4 commit 569978d
Show file tree
Hide file tree
Showing 20 changed files with 328 additions and 33 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion connectorx-cpp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "connectorx-cpp"
version = "0.3.2-alpha.6"
version = "0.3.2-alpha.7"
edition = "2021"
license = "MIT"

Expand Down
4 changes: 2 additions & 2 deletions connectorx-python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion connectorx-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
authors = ["Weiyuan Wu <[email protected]>"]
edition = "2018"
name = "connectorx-python"
version = "0.3.2-alpha.6"
version = "0.3.2-alpha.7"
license = "MIT"
readme = "README.md"

Expand Down
10 changes: 8 additions & 2 deletions connectorx-python/connectorx/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_arrow2(postgres_url: str) -> None:


def test_arrow2_type(postgres_url: str) -> None:
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_json, test_jsonb, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array, test_enum, test_ltree FROM test_types"
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_json, test_jsonb, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array, test_enum, test_ltree, test_name FROM test_types"
df = read_sql(postgres_url, query, return_type="arrow2")
df = df.to_pandas(date_as_object=False)
df.sort_values(by="test_int16", inplace=True, ignore_index=True)
Expand Down Expand Up @@ -181,7 +181,13 @@ def test_arrow2_type(postgres_url: str) -> None:
"test_enum": pd.Series(
["happy", "very happy", "ecstatic", None], dtype="object"
),
"test_ltree": pd.Series(["A.B.C.D", "A.B.E", "A", None], dtype="object"),
"test_ltree": pd.Series(
["A.B.C.D", "A.B.E", "A", None], dtype="object"
),
"test_name": pd.Series(
["0", "21", "someName", "101203203-1212323-22131235"]
)

},
)
assert_frame_equal(df, expected, check_names=True)
31 changes: 27 additions & 4 deletions connectorx-python/connectorx/tests/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ def test_postgres_with_index_col(postgres_url: str) -> None:


def test_postgres_types_binary(postgres_url: str) -> None:
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array, test_citext, test_ltree, test_lquery, test_ltxtquery FROM test_types"
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum, test_f4array, test_f8array, test_narray, test_boolarray, test_i2array, test_i4array, test_i8array, test_citext, test_ltree, test_lquery, test_ltxtquery FROM test_types"
df = read_sql(postgres_url, query)
expected = pd.DataFrame(
index=range(4),
Expand Down Expand Up @@ -538,6 +538,9 @@ def test_postgres_types_binary(postgres_url: str) -> None:
"test_narray": pd.Series(
[[], None, [521.34], [0.12, 333.33, 22.22]], dtype="object"
),
"test_boolarray": pd.Series(
[[True, False], [], [True], None], dtype="object"
),
"test_i2array": pd.Series(
[[-1, 0, 1], [], [-32768, 32767], None], dtype="object"
),
Expand All @@ -560,7 +563,7 @@ def test_postgres_types_binary(postgres_url: str) -> None:


def test_postgres_types_csv(postgres_url: str) -> None:
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array, test_citext, test_ltree FROM test_types"
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text, test_f4array, test_f8array, test_narray, test_boolarray, test_i2array, test_i4array, test_i8array, test_citext, test_ltree FROM test_types"
df = read_sql(postgres_url, query, protocol="csv")
expected = pd.DataFrame(
index=range(4),
Expand Down Expand Up @@ -648,6 +651,9 @@ def test_postgres_types_csv(postgres_url: str) -> None:
"test_narray": pd.Series(
[[], None, [521.34], [0.12, 333.33, 22.22]], dtype="object"
),
"test_boolarray": pd.Series(
[[True, False], [], [True], None], dtype="object"
),
"test_i2array": pd.Series(
[[-1, 0, 1], [], [-32768, 32767], None], dtype="object"
),
Expand All @@ -666,7 +672,7 @@ def test_postgres_types_csv(postgres_url: str) -> None:


def test_postgres_types_cursor(postgres_url: str) -> None:
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array, test_citext, test_ltree FROM test_types"
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text, test_f4array, test_f8array, test_narray, test_boolarray, test_i2array, test_i4array, test_i8array, test_citext, test_ltree FROM test_types"
df = read_sql(postgres_url, query, protocol="cursor")
expected = pd.DataFrame(
index=range(4),
Expand Down Expand Up @@ -754,6 +760,9 @@ def test_postgres_types_cursor(postgres_url: str) -> None:
"test_narray": pd.Series(
[[], None, [521.34], [0.12, 333.33, 22.22]], dtype="object"
),
"test_boolarray": pd.Series(
[[True, False], [], [True], None], dtype="object"
),
"test_i2array": pd.Series(
[[-1, 0, 1], [], [-32768, 32767], None], dtype="object"
),
Expand All @@ -772,7 +781,7 @@ def test_postgres_types_cursor(postgres_url: str) -> None:


def test_postgres_types_simple(postgres_url: str) -> None:
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_enum, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array FROM test_types"
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_enum, test_f4array, test_f8array, test_narray, test_boolarray, test_i2array, test_i4array, test_i8array FROM test_types"
df = read_sql(postgres_url, query, protocol="simple")
expected = pd.DataFrame(
index=range(4),
Expand Down Expand Up @@ -842,6 +851,9 @@ def test_postgres_types_simple(postgres_url: str) -> None:
"test_narray": pd.Series(
[[], None, [521.34], [0.12, 333.33, 22.22]], dtype="object"
),
"test_boolarray": pd.Series(
[[True, False], [], [True], None], dtype="object"
),
"test_i2array": pd.Series(
[[-1, 0, 1], [], [-32768, 32767], None], dtype="object"
),
Expand Down Expand Up @@ -1116,3 +1128,14 @@ def test_postgres_tls_fail(postgres_url_tls: str) -> None:
partition_range=(0, 2000),
partition_num=3,
)

def test_postgres_name_type(postgres_url: str) -> None:
# partition column can not have None
query = "SELECT test_name FROM test_types"
df = read_sql(postgres_url, query)
expected = pd.DataFrame(
data={
"test_name": pd.Series(["0", "21", "someName", "101203203-1212323-22131235"]),
},
)
assert_frame_equal(df, expected, check_names=True)
2 changes: 1 addition & 1 deletion connectorx-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ license = "MIT"
maintainers = ["Weiyuan Wu <[email protected]>"]
name = "connectorx"
readme = "README.md" # Markdown files are supported
version = "0.3.2-alpha.6"
version = "0.3.2-alpha.7"

[project]
name = "connectorx" # Target file name of maturin build
Expand Down
14 changes: 14 additions & 0 deletions connectorx-python/src/pandas/destination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ impl<'a> Destination for PandasDestination<'a> {
PandasBlockType::Float64 => {
self.allocate_array::<f64>(dt, placement)?;
}
PandasBlockType::BooleanArray => {
self.allocate_array::<super::pandas_columns::PyList>(dt, placement)?;
}
PandasBlockType::Float64Array => {
self.allocate_array::<super::pandas_columns::PyList>(dt, placement)?;
}
Expand Down Expand Up @@ -219,6 +222,17 @@ impl<'a> Destination for PandasDestination<'a> {
.collect()
}
}
PandasBlockType::BooleanArray => {
let bblock = ArrayBlock::<bool>::extract(buf)?;
let bcols = bblock.split()?;
for (&cid, bcol) in block.cids.iter().zip_eq(bcols) {
partitioned_columns[cid] = bcol
.partition(counts)
.into_iter()
.map(|c| Box::new(c) as _)
.collect()
}
}
PandasBlockType::Float64Array => {
let fblock = ArrayBlock::<f64>::extract(buf)?;
let fcols = fblock.split()?;
Expand Down
35 changes: 35 additions & 0 deletions connectorx-python/src/pandas/pandas_columns/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,33 @@ where
}
}

impl PandasColumn<Vec<bool>> for ArrayColumn<bool> {
#[throws(ConnectorXPythonError)]
fn write(&mut self, val: Vec<bool>, row: usize) {
self.lengths.push(val.len());
self.buffer.extend_from_slice(&val[..]);
self.row_idx.push(row);
self.try_flush()?;
}
}

impl PandasColumn<Option<Vec<bool>>> for ArrayColumn<bool> {
#[throws(ConnectorXPythonError)]
fn write(&mut self, val: Option<Vec<bool>>, row: usize) {
match val {
Some(v) => {
self.lengths.push(v.len());
self.buffer.extend_from_slice(&v[..]);
self.row_idx.push(row);
self.try_flush()?;
}
None => {
self.lengths.push(usize::MAX);
self.row_idx.push(row);
}
}
}
}
impl PandasColumn<Vec<f64>> for ArrayColumn<f64> {
#[throws(ConnectorXPythonError)]
fn write(&mut self, val: Vec<f64>, row: usize) {
Expand Down Expand Up @@ -150,6 +177,14 @@ impl PandasColumn<Option<Vec<i64>>> for ArrayColumn<i64> {
}
}

impl HasPandasColumn for Vec<bool> {
type PandasColumn<'a> = ArrayColumn<bool>;
}

impl HasPandasColumn for Option<Vec<bool>> {
type PandasColumn<'a> = ArrayColumn<bool>;
}

impl HasPandasColumn for Vec<f64> {
type PandasColumn<'a> = ArrayColumn<f64>;
}
Expand Down
2 changes: 2 additions & 0 deletions connectorx-python/src/pandas/transports/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ macro_rules! impl_postgres_transport {
{ Int2[i16] => I64[i64] | conversion auto }
{ Int4[i32] => I64[i64] | conversion auto }
{ Int8[i64] => I64[i64] | conversion auto }
{ BoolArray[Vec<bool>] => BoolArray[Vec<bool>] | conversion auto_vec }
{ Int2Array[Vec<i16>] => I64Array[Vec<i64>] | conversion auto_vec }
{ Int4Array[Vec<i32>] => I64Array[Vec<i64>] | conversion auto_vec }
{ Int8Array[Vec<i64>] => I64Array[Vec<i64>] | conversion auto }
Expand All @@ -44,6 +45,7 @@ macro_rules! impl_postgres_transport {
{ Text[&'r str] => Str[&'r str] | conversion auto }
{ BpChar[&'r str] => Str[&'r str] | conversion none }
{ VarChar[&'r str] => Str[&'r str] | conversion none }
{ Name[&'r str] => Str[&'r str] | conversion none }
{ Timestamp[NaiveDateTime] => DateTime[DateTime<Utc>] | conversion option }
{ TimestampTz[DateTime<Utc>] => DateTime[DateTime<Utc>] | conversion auto }
{ Date[NaiveDate] => DateTime[DateTime<Utc>] | conversion option }
Expand Down
4 changes: 4 additions & 0 deletions connectorx-python/src/pandas/typesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub enum PandasTypeSystem {
F64Array(bool),
I64Array(bool),
Bool(bool),
BoolArray(bool),
Char(bool),
Str(bool),
BoxStr(bool),
Expand All @@ -23,6 +24,7 @@ pub enum PandasBlockType {
Boolean(bool), // bool indicates nullablity
Int64(bool),
Float64,
BooleanArray,
Int64Array,
Float64Array,
String,
Expand Down Expand Up @@ -54,6 +56,7 @@ impl From<PandasTypeSystem> for PandasBlockType {
PandasTypeSystem::Bool(nullable) => PandasBlockType::Boolean(nullable),
PandasTypeSystem::I64(nullable) => PandasBlockType::Int64(nullable),
PandasTypeSystem::F64(_) => PandasBlockType::Float64,
PandasTypeSystem::BoolArray(_) => PandasBlockType::BooleanArray,
PandasTypeSystem::F64Array(_) => PandasBlockType::Float64Array,
PandasTypeSystem::I64Array(_) => PandasBlockType::Int64Array,
PandasTypeSystem::String(_)
Expand All @@ -74,6 +77,7 @@ impl_typesystem! {
{ F64Array => Vec<f64> }
{ I64Array => Vec<i64> }
{ Bool => bool }
{ BoolArray => Vec<bool> }
{ Char => char }
{ Str => &'r str }
{ BoxStr => Box<str> }
Expand Down
2 changes: 1 addition & 1 deletion connectorx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license = "MIT"
name = "connectorx"
readme = "../README.md"
repository = "https://github.com/sfu-db/connector-x"
version = "0.3.2-alpha.6"
version = "0.3.2-alpha.7"

[dependencies]
anyhow = "1"
Expand Down
29 changes: 18 additions & 11 deletions connectorx/src/destinations/arrow2/arrow_assoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ impl_arrow_assoc!(f64, ArrowDataType::Float64, MutablePrimitiveArray<f64>);
impl_arrow_assoc!(bool, ArrowDataType::Boolean, MutableBooleanArray);

macro_rules! impl_arrow_assoc_vec {
($T:ty, $AT:expr) => {
($T:ty, $PT:ty, $AT:expr) => {
impl ArrowAssoc for Vec<$T> {
type Builder = MutableListArray<i64, MutablePrimitiveArray<$T>>;
type Builder = MutableListArray<i64, $PT>;

fn builder(nrows: usize) -> Self::Builder {
MutableListArray::<i64, MutablePrimitiveArray<$T>>::with_capacity(nrows)
MutableListArray::<i64, $PT>::with_capacity(nrows)
}

#[inline]
Expand All @@ -86,10 +86,10 @@ macro_rules! impl_arrow_assoc_vec {
}

impl ArrowAssoc for Option<Vec<$T>> {
type Builder = MutableListArray<i64, MutablePrimitiveArray<$T>>;
type Builder = MutableListArray<i64, $PT>;

fn builder(nrows: usize) -> Self::Builder {
MutableListArray::<i64, MutablePrimitiveArray<$T>>::with_capacity(nrows)
MutableListArray::<i64, $PT>::with_capacity(nrows)
}

#[inline]
Expand All @@ -114,12 +114,19 @@ macro_rules! impl_arrow_assoc_vec {
};
}

impl_arrow_assoc_vec!(i32, ArrowDataType::Int32);
impl_arrow_assoc_vec!(i64, ArrowDataType::Int64);
impl_arrow_assoc_vec!(u32, ArrowDataType::UInt32);
impl_arrow_assoc_vec!(u64, ArrowDataType::UInt64);
impl_arrow_assoc_vec!(f32, ArrowDataType::Float32);
impl_arrow_assoc_vec!(f64, ArrowDataType::Float64);
macro_rules! impl_arrow_assoc_primitive_vec {
($T:ty, $AT:expr) => {
impl_arrow_assoc_vec!($T, MutablePrimitiveArray<$T>, $AT);
};
}

impl_arrow_assoc_vec!(bool, MutableBooleanArray, ArrowDataType::Boolean);
impl_arrow_assoc_primitive_vec!(i32, ArrowDataType::Int32);
impl_arrow_assoc_primitive_vec!(i64, ArrowDataType::Int64);
impl_arrow_assoc_primitive_vec!(u32, ArrowDataType::UInt32);
impl_arrow_assoc_primitive_vec!(u64, ArrowDataType::UInt64);
impl_arrow_assoc_primitive_vec!(f32, ArrowDataType::Float32);
impl_arrow_assoc_primitive_vec!(f64, ArrowDataType::Float64);

impl ArrowAssoc for &str {
type Builder = MutableUtf8Array<i64>;
Expand Down
2 changes: 2 additions & 0 deletions connectorx/src/destinations/arrow2/typesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub enum Arrow2TypeSystem {
Date64(bool),
Time64(bool),
DateTimeTz(bool),
BoolArray(bool),
Int32Array(bool),
Int64Array(bool),
UInt32Array(bool),
Expand All @@ -41,6 +42,7 @@ impl_typesystem! {
{ Date64 => NaiveDateTime }
{ Time64 => NaiveTime }
{ DateTimeTz => DateTime<Utc> }
{ BoolArray => Vec<bool> }
{ Int32Array => Vec<i32> }
{ Int64Array => Vec<i64> }
{ UInt32Array => Vec<u32> }
Expand Down
Loading

0 comments on commit 569978d

Please sign in to comment.