Skip to content

Commit

Permalink
[VL] Enhance write parquet with compression codec test (#7737)
Browse files Browse the repository at this point in the history
  • Loading branch information
wecharyu authored Oct 31, 2024
1 parent 40fc52a commit 8f25b5a
Showing 1 changed file with 23 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ import org.apache.gluten.execution.VeloxWholeStageTransformerSuite
import org.apache.gluten.test.FallbackUtil

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.junit.Assert

class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
Expand Down Expand Up @@ -50,17 +54,11 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
}
}

testWithSpecifiedSparkVersion("test write parquet with compression codec", Some("3.2")) {
test("test write parquet with compression codec") {
// compression codec details see `VeloxParquetDatasource.cc`
Seq("snappy", "gzip", "zstd", "lz4", "none", "uncompressed")
.foreach {
codec =>
val extension = codec match {
case "none" | "uncompressed" => ""
case "gzip" => "gz"
case _ => codec
}

TPCHTableDataFrames.foreach {
case (_, df) =>
withTempPath {
Expand All @@ -69,15 +67,25 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
.format("parquet")
.option("compression", codec)
.save(f.getCanonicalPath)
val files = f.list()
assert(files.nonEmpty, extension)

if (!isSparkVersionGE("3.4")) {
assert(
files.exists(_.contains(extension)),
extension
) // filename changed in spark 3.4.
val expectedCodec = codec match {
case "none" => "uncompressed"
case _ => codec
}
val parquetFiles = f.list((_, name) => name.contains("parquet"))
assert(parquetFiles.nonEmpty, expectedCodec)
assert(
parquetFiles.forall {
file =>
val path = new Path(f.getCanonicalPath, file)
val in = HadoopInputFile.fromPath(path, spark.sessionState.newHadoopConf())
Utils.tryWithResource(ParquetFileReader.open(in)) {
reader =>
val column = reader.getFooter.getBlocks.get(0).getColumns.get(0)
expectedCodec.equalsIgnoreCase(column.getCodec.toString)
}
},
expectedCodec
)

val parquetDf = spark.read
.format("parquet")
Expand Down

0 comments on commit 8f25b5a

Please sign in to comment.