Skip to content

Commit

Permalink
Coerce types on read
Browse files Browse the repository at this point in the history
`COPY FROM parquet` is too strict when matching Postgres tupledesc schema to the schema from parquet file.
e.g. `INT32` type in the parquet schema cannot be read into a Postgres column with `int64` type.
We can avoid this situation by adding a `is_coercible(from_type, to_type)` check while matching the expected schema
from the parquet file.

With that we can coerce as shown below from parquet source type to Postgres destination types:
- INT16 => {int32, int64}
- INT32 => {int64}
- UINT16 => {int16, int32, int64}
- UINT32 => {int32, int64}
- UINT64 => {int64}
- FLOAT32 => {double}

As we use arrow as intermediate format, it might be the case that `LargeUtf8` or `LargeBinary` types are used by the external writer instead of `Utf8` and `Binary`.
That is why we also need to support below coercions for arrow source types:
- `Utf8 | LargeUtf8` => {text}
- `Binary | LargeBinary` => {bytea}

Closes #67.
  • Loading branch information
aykut-bozkurt committed Nov 11, 2024
1 parent 518a5ac commit d0f1554
Show file tree
Hide file tree
Showing 11 changed files with 703 additions and 178 deletions.
481 changes: 313 additions & 168 deletions src/arrow_parquet/arrow_to_pg.rs

Large diffs are not rendered by default.

27 changes: 26 additions & 1 deletion src/arrow_parquet/arrow_to_pg/bytea.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow::array::{Array, BinaryArray};
use arrow::array::{Array, BinaryArray, LargeBinaryArray};

use super::{ArrowArrayToPgType, ArrowToPgAttributeContext};

Expand All @@ -13,6 +13,16 @@ impl ArrowArrayToPgType<Vec<u8>> for BinaryArray {
}
}

impl ArrowArrayToPgType<Vec<u8>> for LargeBinaryArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<u8>> {
if self.is_null(0) {
None
} else {
Some(self.value(0).to_vec())
}
}
}

// Bytea[]
impl ArrowArrayToPgType<Vec<Option<Vec<u8>>>> for BinaryArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<Vec<u8>>>> {
Expand All @@ -28,3 +38,18 @@ impl ArrowArrayToPgType<Vec<Option<Vec<u8>>>> for BinaryArray {
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<Vec<u8>>>> for LargeBinaryArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<Vec<u8>>>> {
let mut vals = vec![];
for val in self.iter() {
if let Some(val) = val {
vals.push(Some(val.to_vec()));
} else {
vals.push(None);
}
}

Some(vals)
}
}
28 changes: 27 additions & 1 deletion src/arrow_parquet/arrow_to_pg/char.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow::array::{Array, StringArray};
use arrow::array::{Array, LargeStringArray, StringArray};

use super::{ArrowArrayToPgType, ArrowToPgAttributeContext};

Expand All @@ -15,6 +15,18 @@ impl ArrowArrayToPgType<i8> for StringArray {
}
}

impl ArrowArrayToPgType<i8> for LargeStringArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i8> {
if self.is_null(0) {
None
} else {
let val = self.value(0);
let val: i8 = val.chars().next().expect("unexpected ascii char") as i8;
Some(val)
}
}
}

// Char[]
impl ArrowArrayToPgType<Vec<Option<i8>>> for StringArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i8>>> {
Expand All @@ -29,3 +41,17 @@ impl ArrowArrayToPgType<Vec<Option<i8>>> for StringArray {
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<i8>>> for LargeStringArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i8>>> {
let mut vals = vec![];
for val in self.iter() {
let val = val.map(|val| {
let val: i8 = val.chars().next().expect("unexpected ascii char") as i8;
val
});
vals.push(val);
}
Some(vals)
}
}
28 changes: 27 additions & 1 deletion src/arrow_parquet/arrow_to_pg/fallback_to_text.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow::array::{Array, StringArray};
use arrow::array::{Array, LargeStringArray, StringArray};

use crate::type_compat::fallback_to_text::FallbackToText;

Expand All @@ -17,6 +17,18 @@ impl ArrowArrayToPgType<FallbackToText> for StringArray {
}
}

impl ArrowArrayToPgType<FallbackToText> for LargeStringArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<FallbackToText> {
if self.is_null(0) {
None
} else {
let text_repr = self.value(0).to_string();
let val = FallbackToText(text_repr);
Some(val)
}
}
}

// Text[] representation of any type
impl ArrowArrayToPgType<Vec<Option<FallbackToText>>> for StringArray {
fn to_pg_type(
Expand All @@ -31,3 +43,17 @@ impl ArrowArrayToPgType<Vec<Option<FallbackToText>>> for StringArray {
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<FallbackToText>>> for LargeStringArray {
fn to_pg_type(
self,
_context: &ArrowToPgAttributeContext,
) -> Option<Vec<Option<FallbackToText>>> {
let mut vals = vec![];
for val in self.iter() {
let val = val.map(|val| FallbackToText(val.to_string()));
vals.push(val);
}
Some(vals)
}
}
21 changes: 21 additions & 0 deletions src/arrow_parquet/arrow_to_pg/float4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ impl ArrowArrayToPgType<f32> for Float32Array {
}
}

impl ArrowArrayToPgType<f64> for Float32Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<f64> {
if self.is_null(0) {
None
} else {
let val = self.value(0) as _;
Some(val)
}
}
}

// Float4[]
impl ArrowArrayToPgType<Vec<Option<f32>>> for Float32Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<f32>>> {
Expand All @@ -24,3 +35,13 @@ impl ArrowArrayToPgType<Vec<Option<f32>>> for Float32Array {
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<f64>>> for Float32Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<f64>>> {
let mut vals = vec![];
for val in self.iter() {
vals.push(val.map(|val| val as _));
}
Some(vals)
}
}
27 changes: 26 additions & 1 deletion src/arrow_parquet/arrow_to_pg/geometry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow::array::{Array, BinaryArray};
use arrow::array::{Array, BinaryArray, LargeBinaryArray};

use crate::type_compat::geometry::Geometry;

Expand All @@ -15,6 +15,16 @@ impl ArrowArrayToPgType<Geometry> for BinaryArray {
}
}

impl ArrowArrayToPgType<Geometry> for LargeBinaryArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Geometry> {
if self.is_null(0) {
None
} else {
Some(self.value(0).to_vec().into())
}
}
}

// Geometry[]
impl ArrowArrayToPgType<Vec<Option<Geometry>>> for BinaryArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<Geometry>>> {
Expand All @@ -30,3 +40,18 @@ impl ArrowArrayToPgType<Vec<Option<Geometry>>> for BinaryArray {
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<Geometry>>> for LargeBinaryArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<Geometry>>> {
let mut vals = vec![];
for val in self.iter() {
if let Some(val) = val {
vals.push(Some(val.to_vec().into()));
} else {
vals.push(None);
}
}

Some(vals)
}
}
107 changes: 106 additions & 1 deletion src/arrow_parquet/arrow_to_pg/int2.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow::array::{Array, Int16Array};
use arrow::array::{Array, Int16Array, UInt16Array};

use super::{ArrowArrayToPgType, ArrowToPgAttributeContext};

Expand All @@ -14,6 +14,61 @@ impl ArrowArrayToPgType<i16> for Int16Array {
}
}

impl ArrowArrayToPgType<i32> for Int16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i32> {
if self.is_null(0) {
None
} else {
let val = self.value(0) as _;
Some(val)
}
}
}

impl ArrowArrayToPgType<i64> for Int16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i64> {
if self.is_null(0) {
None
} else {
let val = self.value(0) as _;
Some(val)
}
}
}

impl ArrowArrayToPgType<i16> for UInt16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i16> {
if self.is_null(0) {
None
} else {
let val = self.value(0) as _;
Some(val)
}
}
}

impl ArrowArrayToPgType<i32> for UInt16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i32> {
if self.is_null(0) {
None
} else {
let val = self.value(0) as _;
Some(val)
}
}
}

impl ArrowArrayToPgType<i64> for UInt16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i64> {
if self.is_null(0) {
None
} else {
let val = self.value(0) as _;
Some(val)
}
}
}

// Int2[]
impl ArrowArrayToPgType<Vec<Option<i16>>> for Int16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i16>>> {
Expand All @@ -24,3 +79,53 @@ impl ArrowArrayToPgType<Vec<Option<i16>>> for Int16Array {
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<i32>>> for Int16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i32>>> {
let mut vals = vec![];
for val in self.iter() {
vals.push(val.map(|val| val as _));
}
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<i64>>> for Int16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i64>>> {
let mut vals = vec![];
for val in self.iter() {
vals.push(val.map(|val| val as _));
}
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<i16>>> for UInt16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i16>>> {
let mut vals = vec![];
for val in self.iter() {
vals.push(val.map(|val| val as _));
}
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<i32>>> for UInt16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i32>>> {
let mut vals = vec![];
for val in self.iter() {
vals.push(val.map(|val| val as _));
}
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<i64>>> for UInt16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i64>>> {
let mut vals = vec![];
for val in self.iter() {
vals.push(val.map(|val| val as _));
}
Some(vals)
}
}
Loading

0 comments on commit d0f1554

Please sign in to comment.