From 59665799d6775d0bba6616f79116609602fba527 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 22 Nov 2023 10:54:03 +0800 Subject: [PATCH] [VL] Add configuration for generating 4k window size gzip parquet file (#3792) --- .../velox/VeloxParquetWriterInjects.scala | 3 +++ cpp/core/config/GlutenConfig.h | 3 +++ .../operators/writer/VeloxParquetDatasource.cc | 15 +++++++++++++-- .../scala/io/glutenproject/GlutenConfig.scala | 1 + 4 files changed, 20 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala index 0fdc225008b4..c758b62d066f 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala @@ -39,6 +39,9 @@ class VeloxParquetWriterInjects extends VeloxFormatWriterInjects { GlutenConfig.PARQUET_BLOCK_ROWS, GlutenConfig.getConf.columnarParquetWriteBlockRows.toString) sparkOptions.put(GlutenConfig.PARQUET_BLOCK_ROWS, blockRows) + options + .get(GlutenConfig.PARQUET_GZIP_WINDOW_SIZE) + .foreach(sparkOptions.put(GlutenConfig.PARQUET_GZIP_WINDOW_SIZE, _)) sparkOptions.asJava } diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index 32fd958c0d7a..c96dc2844c70 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -46,6 +46,9 @@ const std::string kParquetBlockSize = "parquet.block.size"; const std::string kParquetBlockRows = "parquet.block.rows"; +const std::string kParquetGzipWindowSize = "parquet.gzip.windowSize"; +const std::string kGzipWindowSize4k = "4096"; + const std::string kParquetCompressionCodec = "spark.sql.parquet.compression.codec"; const std::string kUGIUserName = "spark.gluten.ugi.username"; diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.cc b/cpp/velox/operators/writer/VeloxParquetDatasource.cc index e5e3da871b3f..cd004f55065d 100644 --- a/cpp/velox/operators/writer/VeloxParquetDatasource.cc +++ b/cpp/velox/operators/writer/VeloxParquetDatasource.cc @@ -38,6 +38,10 @@ using namespace facebook::velox::filesystems; namespace gluten { +namespace { +const int32_t kGzipWindowBits4k = 12; +} + void VeloxParquetDatasource::init(const std::unordered_map& sparkConfs) { if (strncmp(filePath_.c_str(), "file:", 5) == 0) { auto path = filePath_.substr(5); @@ -73,6 +77,7 @@ void VeloxParquetDatasource::init(const std::unordered_map(stoi(sparkConfs.find(kParquetBlockRows)->second)); } + velox::parquet::WriterOptions writeOption; auto compressionCodec = CompressionKind::CompressionKind_SNAPPY; if (sparkConfs.find(kParquetCompressionCodec) != sparkConfs.end()) { auto compressionCodecStr = sparkConfs.find(kParquetCompressionCodec)->second; @@ -81,6 +86,14 @@ void VeloxParquetDatasource::init(const std::unordered_mapsecond; + if (parquetGzipWindowSizeStr == kGzipWindowSize4k) { + auto codecOptions = std::make_shared(); + codecOptions->window_bits = kGzipWindowBits4k; + writeOption.codecOptions = std::move(codecOptions); + } + } } else if (boost::iequals(compressionCodecStr, "lzo")) { compressionCodec = CompressionKind::CompressionKind_LZO; } else if (boost::iequals(compressionCodecStr, "brotli")) { @@ -96,8 +109,6 @@ void VeloxParquetDatasource::init(const std::unordered_map( diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index cf98f67b1ef1..0922ac4088e2 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -310,6 +310,7 @@ object GlutenConfig { val SPARK_SQL_PARQUET_COMPRESSION_CODEC: String = "spark.sql.parquet.compression.codec" val PARQUET_BLOCK_SIZE: String = "parquet.block.size" val PARQUET_BLOCK_ROWS: String = "parquet.block.rows" + val PARQUET_GZIP_WINDOW_SIZE: String = "parquet.gzip.windowSize" // Hadoop config val HADOOP_PREFIX = "spark.hadoop."