Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49916][SQL] Throw appropriate Exception for type mismatch between ColumnType and data type in some rows #48397

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -8704,6 +8704,11 @@
"Doesn't support month or year interval: <interval>"
]
},
"_LEGACY_ERROR_TEMP_3263": {
RaleSapic marked this conversation as resolved.
Show resolved Hide resolved
"message": [
"Some values in field <pos> are not the <type> type"
]
},
"_LEGACY_ERROR_USER_RAISED_EXCEPTION" : {
"message" : [
"<errorMessage>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.jdbc.v2

import java.sql.Connection

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkSQLException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
Expand Down Expand Up @@ -65,6 +65,143 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
|)
""".stripMargin
).executeUpdate()

connection.prepareStatement("CREATE TABLE array_test_table (int_array int[]," +
"float_array FLOAT8[], timestamp_array TIMESTAMP[], string_array TEXT[]," +
"datetime_array TIMESTAMPTZ[], array_of_int_arrays INT[][])").executeUpdate()

val query =
"""
INSERT INTO array_test_table (int_array, float_array, timestamp_array, string_array, datetime_array, array_of_int_arrays)
VALUES
(
ARRAY[1, 2, 3], -- Array of integers
ARRAY[1.1, 2.2, 3.3], -- Array of floats
ARRAY['2023-01-01 12:00'::timestamp, '2023-06-01 08:30'::timestamp], -- Array of timestamps
ARRAY['hello', 'world'], -- Array of strings
ARRAY['2023-10-04 12:00:00+00'::timestamptz, '2023-12-01 14:15:00+00'::timestamptz], -- Array of datetimes with time zone
ARRAY[ARRAY[1, 2]] -- Array of arrays of integers
),
(
ARRAY[10, 20, 30], -- Another set of data
ARRAY[10.5, 20.5, 30.5],
ARRAY['2022-01-01 09:15'::timestamp, '2022-03-15 07:45'::timestamp],
ARRAY['postgres', 'arrays'],
ARRAY['2022-11-22 09:00:00+00'::timestamptz, '2022-12-31 23:59:59+00'::timestamptz],
ARRAY[ARRAY[10, 20]]
);
"""
connection.prepareStatement(query).executeUpdate()

connection.prepareStatement("CREATE TABLE array_int (col int[])").executeUpdate()
connection.prepareStatement("CREATE TABLE array_bigint(col bigint[])").executeUpdate()
connection.prepareStatement("CREATE TABLE array_smallint (col smallint[])").executeUpdate()
connection.prepareStatement("CREATE TABLE array_boolean (col boolean[])").executeUpdate()
connection.prepareStatement("CREATE TABLE array_float (col real[])").executeUpdate()
connection.prepareStatement("CREATE TABLE array_double (col float8[])").executeUpdate()
connection.prepareStatement("CREATE TABLE array_text (col text[])").executeUpdate()
connection.prepareStatement("CREATE TABLE array_timestamp (col timestamp[])").executeUpdate()
connection.prepareStatement("CREATE TABLE array_timestamptz (col timestamptz[])")
.executeUpdate()

connection.prepareStatement("INSERT INTO array_int VALUES (array[array[10]])").executeUpdate()
connection.prepareStatement("INSERT INTO array_bigint VALUES (array[array[10]])")
.executeUpdate()
connection.prepareStatement("INSERT INTO array_smallint VALUES (array[array[10]])")
.executeUpdate()
connection.prepareStatement("INSERT INTO array_boolean VALUES (array[array[true]])")
.executeUpdate()
connection.prepareStatement("INSERT INTO array_float VALUES (array[array[10.5]])")
.executeUpdate()
connection.prepareStatement("INSERT INTO array_double VALUES (array[array[10.1]])")
.executeUpdate()
connection.prepareStatement("INSERT INTO array_text VALUES (array[array['helo world']])")
.executeUpdate()
connection.prepareStatement("INSERT INTO array_timestamp VALUES (" +
"array[array['2022-01-01 09:15'::timestamp]])").executeUpdate()
connection.prepareStatement("INSERT INTO array_timestamptz VALUES " +
"(array[array['2022-01-01 09:15'::timestamptz]])").executeUpdate()
}

test("Test multi-dimensional column types") {
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "array_test_table")
.load()
df.collect()


intercept[SparkSQLException] {
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "array_int")
.load()
df.collect()
}

intercept[SparkSQLException] {
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "array_bigint")
.load()
df.collect()
}

intercept[SparkSQLException] {
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "array_smallint")
.load()
df.collect()
}

intercept[SparkSQLException] {
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "array_boolean")
.load()
df.collect()
}

intercept[SparkSQLException] {
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "array_float")
.load()
df.collect()
}

intercept[SparkSQLException] {
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "array_double")
.load()
df.collect()
}

intercept[SparkSQLException] {
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "array_timestamp")
.load()
df.collect()
}

intercept[SparkSQLException] {
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "array_timestamptz")
.load()
df.collect()
}

intercept[SparkSQLException] {
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "array_text")
.load()
df.collect()
}
}

override def testUpdateColumnType(tbl: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,12 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
"dataType" -> toSQLType(dataType)))
}

def wrongDatatypeInSomeRows(pos: Int, dataType: DataType): SparkSQLException = {
new SparkSQLException(
errorClass = "_LEGACY_ERROR_TEMP_3263",
messageParameters = Map("pos" -> pos.toString(), "type" -> dataType.typeName))
RaleSapic marked this conversation as resolved.
Show resolved Hide resolved
}

def rootConverterReturnNullError(): SparkRuntimeException = {
new SparkRuntimeException(
errorClass = "INVALID_JSON_ROOT_FIELD",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,14 +585,26 @@ object JdbcUtils extends Logging with SQLConfHelper {
arr => new GenericArrayData(elementConversion(et0)(arr))
}

case IntegerType => arrayConverter[Int]((i: Int) => i)
case FloatType => arrayConverter[Float]((f: Float) => f)
case DoubleType => arrayConverter[Double]((d: Double) => d)
case ShortType => arrayConverter[Short]((s: Short) => s)
case BooleanType => arrayConverter[Boolean]((b: Boolean) => b)
case LongType => arrayConverter[Long]((l: Long) => l)

case _ => (array: Object) => array.asInstanceOf[Array[Any]]
}

(rs: ResultSet, row: InternalRow, pos: Int) =>
val array = nullSafeConvert[java.sql.Array](
input = rs.getArray(pos + 1),
array => new GenericArrayData(elementConversion(et)(array.getArray)))
row.update(pos, array)
try {
val array = nullSafeConvert[java.sql.Array](
input = rs.getArray(pos + 1),
array => new GenericArrayData(elementConversion(et)(array.getArray())))
row.update(pos, array)
} catch {
case e: java.lang.ClassCastException =>
throw QueryExecutionErrors.wrongDatatypeInSomeRows(pos, dt)
}

case NullType =>
(_: ResultSet, row: InternalRow, pos: Int) => row.update(pos, null)
Expand Down