From 8f25b5a8441e2052016d5fc56545081209528bae Mon Sep 17 00:00:00 2001 From: Wechar Yu Date: Thu, 31 Oct 2024 11:02:58 +0800 Subject: [PATCH] [VL] Enhance write parquet with compression codec test (#7737) --- .../execution/VeloxParquetWriteSuite.scala | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala index ce3b0852f6f0..4c76c753b90e 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala @@ -20,7 +20,11 @@ import org.apache.gluten.execution.VeloxWholeStageTransformerSuite import org.apache.gluten.test.FallbackUtil import org.apache.spark.SparkConf +import org.apache.spark.util.Utils +import org.apache.hadoop.fs.Path +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.util.HadoopInputFile import org.junit.Assert class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite { @@ -50,17 +54,11 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion("test write parquet with compression codec", Some("3.2")) { + test("test write parquet with compression codec") { // compression codec details see `VeloxParquetDatasource.cc` Seq("snappy", "gzip", "zstd", "lz4", "none", "uncompressed") .foreach { codec => - val extension = codec match { - case "none" | "uncompressed" => "" - case "gzip" => "gz" - case _ => codec - } - TPCHTableDataFrames.foreach { case (_, df) => withTempPath { @@ -69,15 +67,25 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite { .format("parquet") .option("compression", codec) .save(f.getCanonicalPath) - val files = f.list() - assert(files.nonEmpty, extension) - - if (!isSparkVersionGE("3.4")) { - assert( - files.exists(_.contains(extension)), - extension - ) // filename changed in spark 3.4. + val expectedCodec = codec match { + case "none" => "uncompressed" + case _ => codec } + val parquetFiles = f.list((_, name) => name.contains("parquet")) + assert(parquetFiles.nonEmpty, expectedCodec) + assert( + parquetFiles.forall { + file => + val path = new Path(f.getCanonicalPath, file) + val in = HadoopInputFile.fromPath(path, spark.sessionState.newHadoopConf()) + Utils.tryWithResource(ParquetFileReader.open(in)) { + reader => + val column = reader.getFooter.getBlocks.get(0).getColumns.get(0) + expectedCodec.equalsIgnoreCase(column.getCodec.toString) + } + }, + expectedCodec + ) val parquetDf = spark.read .format("parquet")