Skip to content

Commit

Permalink
fix some test
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Apr 19, 2024
1 parent a5febdb commit 9912015
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.PermissiveMode
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkSchemaUtil

import org.apache.arrow.dataset.file.FileFormat

Expand All @@ -35,30 +37,41 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] {
plan.resolveOperators {
// Read path
case l @ LogicalRelation(
r @ HadoopFsRelation(_, _, _, _, _: CSVFileFormat, options),
r @ HadoopFsRelation(_, _, dataSchema, _, _: CSVFileFormat, options),
_,
_,
_) =>
val csvOptions = new CSVOptions(
options,
columnPruning = session.sessionState.conf.csvColumnPruning,
session.sessionState.conf.sessionLocalTimeZone)
if (
csvOptions.headerFlag && !csvOptions.multiLine
&& csvOptions.delimiter == ","
&& csvOptions.quote == '\"'
&& csvOptions.escape == '\\'
&& csvOptions.charset == StandardCharsets.UTF_8.name()
&& csvOptions.parseMode == PermissiveMode
&& !csvOptions.inferSchemaFlag
&& !csvOptions.timestampFormatInRead.isDefined
&& !csvOptions.timestampNTZFormatInRead.isDefined
&& csvOptions.nullValue == ""
) {
if (checkSchema(dataSchema) && checkCsvOptions(csvOptions)) {
l.copy(relation = r.copy(fileFormat = new ArrowFileFormat(FileFormat.CSV))(session))
} else l
case r =>
r
}
}

private def checkCsvOptions(csvOptions: CSVOptions): Boolean = {
csvOptions.headerFlag && !csvOptions.multiLine && csvOptions.delimiter == "," &&
csvOptions.quote == '\"' &&
csvOptions.escape == '\\' &&
csvOptions.charset == StandardCharsets.UTF_8.name() &&
csvOptions.parseMode == PermissiveMode && !csvOptions.inferSchemaFlag &&
csvOptions.timestampFormatInRead.isEmpty && csvOptions.timestampNTZFormatInRead.isEmpty &&
csvOptions.nullValue == "" &&
csvOptions.dateFormatInRead.isEmpty &&
csvOptions.emptyValueInRead == "" && csvOptions.comment == '\u0000' && !csvOptions.columnPruning
}

private def checkSchema(schema: StructType): Boolean = {
try {
SparkSchemaUtil.toArrowSchema(schema)
true
} catch {
case _: Exception =>
false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,25 @@ class VeloxTestSettings extends BackendTestSettings {
// Exception.
.exclude("column pruning - non-readable file")
enableSuite[GlutenCSVv1Suite]
// file cars.csv include null string, Arrow not support to read
.exclude("DDL test with schema")
.exclude("save csv")
.exclude("save csv with compression codec option")
.exclude("save csv with empty fields with user defined empty values")
.exclude("save csv with quote")
.exclude("SPARK-13543 Write the output as uncompressed via option()")
// Arrow not support corrupt record
.exclude("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord")
enableSuite[GlutenCSVv2Suite]
// file cars.csv include null string, Arrow not support to read
.exclude("DDL test with schema")
.exclude("save csv")
.exclude("save csv with compression codec option")
.exclude("save csv with empty fields with user defined empty values")
.exclude("save csv with quote")
.exclude("SPARK-13543 Write the output as uncompressed via option()")
// Arrow not support corrupt record
.exclude("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord")
enableSuite[GlutenCSVLegacyTimeParserSuite]
enableSuite[GlutenJsonV1Suite]
// FIXME: Array direct selection fails
Expand Down

0 comments on commit 9912015

Please sign in to comment.