Skip to content

Commit

Permalink
[GLUTEN-7675][VL] Support parquet write with complex data type(eg. MA…
Browse files Browse the repository at this point in the history
…P, ARRYY) (apache#7676)
  • Loading branch information
weixiuli authored Nov 7, 2024
1 parent 427aeb2 commit f6a9665
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,26 @@ object VeloxBackendSettings extends BackendSettingsApi {

// Validate if all types are supported.
def validateDataTypes(): Option[String] = {
val unsupportedTypes = fields.flatMap {
field =>
field.dataType match {
case _: StructType => Some("StructType")
case _: ArrayType => Some("ArrayType")
case _: MapType => Some("MapType")
case _: YearMonthIntervalType => Some("YearMonthIntervalType")
val unsupportedTypes = format match {
case _: ParquetFileFormat =>
fields.flatMap {
case StructField(_, _: YearMonthIntervalType, _, _) =>
Some("YearMonthIntervalType")
case StructField(_, _: StructType, _, _) =>
Some("StructType")
case _ => None
}
case _ =>
fields.flatMap {
field =>
field.dataType match {
case _: StructType => Some("StructType")
case _: ArrayType => Some("ArrayType")
case _: MapType => Some("MapType")
case _: YearMonthIntervalType => Some("YearMonthIntervalType")
case _ => None
}
}
}
if (unsupportedTypes.nonEmpty) {
Some(unsupportedTypes.mkString("Found unsupported type:", ",", ""))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.planner.plan.GlutenPlanModel.GroupLeafExec
import org.apache.gluten.substrait.`type`.ColumnTypeNode
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
Expand All @@ -29,12 +30,15 @@ import org.apache.gluten.utils.SubstraitUtil

import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType}
import org.apache.spark.sql.types.MetadataBuilder

import io.substrait.proto.NamedStruct
Expand Down Expand Up @@ -127,6 +131,37 @@ case class WriteFilesExecTransformer(

override protected def doValidateInternal(): ValidationResult = {
val finalChildOutput = getFinalChildOutput

def isConstantComplexType(e: Expression): Boolean = {
e match {
case Literal(_, _: ArrayType | _: MapType) => true
case _ => e.children.exists(isConstantComplexType)
}
}

lazy val hasConstantComplexType = child match {
case t: ProjectExecTransformer =>
t.projectList.exists(isConstantComplexType)
case p: ProjectExec =>
p.projectList.exists(isConstantComplexType)
case g: GroupLeafExec => // support the ras
g.metadata
.logicalLink()
.plan
.collectFirst {
case p: Project if p.projectList.exists(isConstantComplexType) => true
}
.isDefined
case _ => false
}
// TODO: currently the velox don't support parquet write with complex data type
// with constant.
if (fileFormat.isInstanceOf[ParquetFileFormat] && hasConstantComplexType) {
return ValidationResult.failed(
"Unsupported native parquet write: " +
"complex data type with constant")
}

val validationResult =
BackendsApiManager.getSettings.supportWriteFilesExec(
fileFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,27 @@
*/
package org.apache.spark.sql.execution.datasources.parquet

import org.apache.spark.sql.GlutenSQLTestsBaseTrait
import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, Row}

class GlutenParquetFieldIdIOSuite extends ParquetFieldIdIOSuite with GlutenSQLTestsBaseTrait {}
class GlutenParquetFieldIdIOSuite extends ParquetFieldIdIOSuite with GlutenSQLTestsBaseTrait {
testGluten("Parquet writer with ARRAY and MAP") {
spark.sql("""
|CREATE TABLE T1 (
| a INT,
| b ARRAY<STRING>,
| c MAP<STRING,STRING>
|)
|USING PARQUET
|""".stripMargin)

spark.sql("""
| INSERT OVERWRITE T1 VALUES
| (1, ARRAY(1, 2, 3), MAP("key1","value1"))
|""".stripMargin)

checkAnswer(
spark.sql("SELECT * FROM T1"),
Row(1, Array("1", "2", "3"), Map("key1" -> "value1")) :: Nil
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,13 @@ class VeloxTestSettings extends BackendTestSettings {
// Extra ColumnarToRow is needed to transform vanilla columnar data to gluten columnar data.
.exclude("SPARK-37369: Avoid redundant ColumnarToRow transition on InMemoryTableScan")
enableSuite[GlutenFileSourceCharVarcharTestSuite]
.exclude("length check for input string values: nested in array")
.exclude("length check for input string values: nested in array")
.exclude("length check for input string values: nested in map key")
.exclude("length check for input string values: nested in map value")
.exclude("length check for input string values: nested in both map key and value")
.exclude("length check for input string values: nested in array of struct")
.exclude("length check for input string values: nested in array of array")
enableSuite[GlutenDSV2CharVarcharTestSuite]
enableSuite[GlutenColumnExpressionSuite]
// Velox raise_error('errMsg') throws a velox_user_error exception with the message 'errMsg'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,107 @@
*/
package org.apache.spark.sql

import org.apache.spark.SparkException
class GlutenFileSourceCharVarcharTestSuite
extends FileSourceCharVarcharTestSuite
with GlutenSQLTestsTrait {}
with GlutenSQLTestsTrait {
private def testTableWrite(f: String => Unit): Unit = {
withTable("t")(f("char"))
withTable("t")(f("varchar"))
}

private val ERROR_MESSAGE =
"Exceeds char/varchar type length limitation: 5"

testGluten("length check for input string values: nested in struct") {
testTableWrite {
typeName =>
sql(s"CREATE TABLE t(c STRUCT<c: $typeName(5)>) USING $format")
sql("INSERT INTO t SELECT struct(null)")
checkAnswer(spark.table("t"), Row(Row(null)))
val e = intercept[RuntimeException] {
sql("INSERT INTO t SELECT struct('123456')")
}
assert(e.getMessage.contains(ERROR_MESSAGE))
}
}

testGluten("length check for input string values: nested in array") {
testTableWrite {
typeName =>
sql(s"CREATE TABLE t(c ARRAY<$typeName(5)>) USING $format")
sql("INSERT INTO t VALUES (array(null))")
checkAnswer(spark.table("t"), Row(Seq(null)))
val e = intercept[SparkException] {
sql("INSERT INTO t VALUES (array('a', '123456'))")
}
assert(e.getMessage.contains(ERROR_MESSAGE))
}
}

testGluten("length check for input string values: nested in map key") {
testTableWrite {
typeName =>
sql(s"CREATE TABLE t(c MAP<$typeName(5), STRING>) USING $format")
val e = intercept[SparkException](sql("INSERT INTO t VALUES (map('123456', 'a'))"))
assert(e.getMessage.contains(ERROR_MESSAGE))
}
}

testGluten("length check for input string values: nested in map value") {
testTableWrite {
typeName =>
sql(s"CREATE TABLE t(c MAP<STRING, $typeName(5)>) USING $format")
sql("INSERT INTO t VALUES (map('a', null))")
checkAnswer(spark.table("t"), Row(Map("a" -> null)))
val e = intercept[SparkException](sql("INSERT INTO t VALUES (map('a', '123456'))"))
assert(e.getMessage.contains(ERROR_MESSAGE))
}
}

testGluten("length check for input string values: nested in both map key and value") {
testTableWrite {
typeName =>
sql(s"CREATE TABLE t(c MAP<$typeName(5), $typeName(5)>) USING $format")
val e1 = intercept[SparkException](sql("INSERT INTO t VALUES (map('123456', 'a'))"))
assert(e1.getMessage.contains(ERROR_MESSAGE))
val e2 = intercept[SparkException](sql("INSERT INTO t VALUES (map('a', '123456'))"))
assert(e2.getMessage.contains(ERROR_MESSAGE))
}
}

testGluten("length check for input string values: nested in struct of array") {
testTableWrite {
typeName =>
sql(s"CREATE TABLE t(c STRUCT<c: ARRAY<$typeName(5)>>) USING $format")
sql("INSERT INTO t SELECT struct(array(null))")
checkAnswer(spark.table("t"), Row(Row(Seq(null))))
val e = intercept[SparkException](sql("INSERT INTO t SELECT struct(array('123456'))"))
assert(e.getMessage.contains(ERROR_MESSAGE))
}
}

testGluten("length check for input string values: nested in array of struct") {
testTableWrite {
typeName =>
sql(s"CREATE TABLE t(c ARRAY<STRUCT<c: $typeName(5)>>) USING $format")
sql("INSERT INTO t VALUES (array(struct(null)))")
checkAnswer(spark.table("t"), Row(Seq(Row(null))))
val e = intercept[SparkException](sql("INSERT INTO t VALUES (array(struct('123456')))"))
assert(e.getMessage.contains(ERROR_MESSAGE))
}
}

testGluten("length check for input string values: nested in array of array") {
testTableWrite {
typeName =>
sql(s"CREATE TABLE t(c ARRAY<ARRAY<$typeName(5)>>) USING $format")
sql("INSERT INTO t VALUES (array(array(null)))")
checkAnswer(spark.table("t"), Row(Seq(Seq(null))))
val e = intercept[SparkException](sql("INSERT INTO t VALUES (array(array('123456')))"))
assert(e.getMessage.contains(ERROR_MESSAGE))
}
}
}

class GlutenDSV2CharVarcharTestSuite extends DSV2CharVarcharTestSuite with GlutenSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,27 @@
*/
package org.apache.spark.sql.execution.datasources.parquet

import org.apache.spark.sql.GlutenSQLTestsBaseTrait
import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, Row}

class GlutenParquetFieldIdIOSuite extends ParquetFieldIdIOSuite with GlutenSQLTestsBaseTrait {}
class GlutenParquetFieldIdIOSuite extends ParquetFieldIdIOSuite with GlutenSQLTestsBaseTrait {
testGluten("Parquet writer with ARRAY and MAP") {
spark.sql("""
|CREATE TABLE T1 (
| a INT,
| b ARRAY<STRING>,
| c MAP<STRING,STRING>
|)
|USING PARQUET
|""".stripMargin)

spark.sql("""
| INSERT OVERWRITE T1 VALUES
| (1, ARRAY(1, 2, 3), MAP("key1","value1"))
|""".stripMargin)

checkAnswer(
spark.sql("SELECT * FROM T1"),
Row(1, Array("1", "2", "3"), Map("key1" -> "value1")) :: Nil
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,13 @@ class VeloxTestSettings extends BackendTestSettings {
// Extra ColumnarToRow is needed to transform vanilla columnar data to gluten columnar data.
.exclude("SPARK-37369: Avoid redundant ColumnarToRow transition on InMemoryTableScan")
enableSuite[GlutenFileSourceCharVarcharTestSuite]
.exclude("length check for input string values: nested in array")
.exclude("length check for input string values: nested in array")
.exclude("length check for input string values: nested in map key")
.exclude("length check for input string values: nested in map value")
.exclude("length check for input string values: nested in both map key and value")
.exclude("length check for input string values: nested in array of struct")
.exclude("length check for input string values: nested in array of array")
enableSuite[GlutenDSV2CharVarcharTestSuite]
enableSuite[GlutenColumnExpressionSuite]
// Velox raise_error('errMsg') throws a velox_user_error exception with the message 'errMsg'.
Expand Down
Loading

0 comments on commit f6a9665

Please sign in to comment.