Skip to content

Commit

Permalink
[VL] Add configuration for generating 4k window size gzip parquet file (
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma authored Nov 22, 2023
1 parent b40a5f0 commit 5966579
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class VeloxParquetWriterInjects extends VeloxFormatWriterInjects {
GlutenConfig.PARQUET_BLOCK_ROWS,
GlutenConfig.getConf.columnarParquetWriteBlockRows.toString)
sparkOptions.put(GlutenConfig.PARQUET_BLOCK_ROWS, blockRows)
options
.get(GlutenConfig.PARQUET_GZIP_WINDOW_SIZE)
.foreach(sparkOptions.put(GlutenConfig.PARQUET_GZIP_WINDOW_SIZE, _))
sparkOptions.asJava
}

Expand Down
3 changes: 3 additions & 0 deletions cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ const std::string kParquetBlockSize = "parquet.block.size";

const std::string kParquetBlockRows = "parquet.block.rows";

const std::string kParquetGzipWindowSize = "parquet.gzip.windowSize";
const std::string kGzipWindowSize4k = "4096";

const std::string kParquetCompressionCodec = "spark.sql.parquet.compression.codec";

const std::string kUGIUserName = "spark.gluten.ugi.username";
Expand Down
15 changes: 13 additions & 2 deletions cpp/velox/operators/writer/VeloxParquetDatasource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ using namespace facebook::velox::filesystems;

namespace gluten {

namespace {
const int32_t kGzipWindowBits4k = 12;
}

void VeloxParquetDatasource::init(const std::unordered_map<std::string, std::string>& sparkConfs) {
if (strncmp(filePath_.c_str(), "file:", 5) == 0) {
auto path = filePath_.substr(5);
Expand Down Expand Up @@ -73,6 +77,7 @@ void VeloxParquetDatasource::init(const std::unordered_map<std::string, std::str
if (sparkConfs.find(kParquetBlockRows) != sparkConfs.end()) {
maxRowGroupRows_ = static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockRows)->second));
}
velox::parquet::WriterOptions writeOption;
auto compressionCodec = CompressionKind::CompressionKind_SNAPPY;
if (sparkConfs.find(kParquetCompressionCodec) != sparkConfs.end()) {
auto compressionCodecStr = sparkConfs.find(kParquetCompressionCodec)->second;
Expand All @@ -81,6 +86,14 @@ void VeloxParquetDatasource::init(const std::unordered_map<std::string, std::str
compressionCodec = CompressionKind::CompressionKind_SNAPPY;
} else if (boost::iequals(compressionCodecStr, "gzip")) {
compressionCodec = CompressionKind::CompressionKind_GZIP;
if (sparkConfs.find(kParquetGzipWindowSize) != sparkConfs.end()) {
auto parquetGzipWindowSizeStr = sparkConfs.find(kParquetGzipWindowSize)->second;
if (parquetGzipWindowSizeStr == kGzipWindowSize4k) {
auto codecOptions = std::make_shared<facebook::velox::parquet::arrow::util::GZipCodecOptions>();
codecOptions->window_bits = kGzipWindowBits4k;
writeOption.codecOptions = std::move(codecOptions);
}
}
} else if (boost::iequals(compressionCodecStr, "lzo")) {
compressionCodec = CompressionKind::CompressionKind_LZO;
} else if (boost::iequals(compressionCodecStr, "brotli")) {
Expand All @@ -96,8 +109,6 @@ void VeloxParquetDatasource::init(const std::unordered_map<std::string, std::str
compressionCodec = CompressionKind::CompressionKind_NONE;
}
}

velox::parquet::WriterOptions writeOption;
writeOption.compression = compressionCodec;
writeOption.flushPolicyFactory = [&]() {
return std::make_unique<velox::parquet::LambdaFlushPolicy>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ object GlutenConfig {
val SPARK_SQL_PARQUET_COMPRESSION_CODEC: String = "spark.sql.parquet.compression.codec"
val PARQUET_BLOCK_SIZE: String = "parquet.block.size"
val PARQUET_BLOCK_ROWS: String = "parquet.block.rows"
val PARQUET_GZIP_WINDOW_SIZE: String = "parquet.gzip.windowSize"
// Hadoop config
val HADOOP_PREFIX = "spark.hadoop."

Expand Down

0 comments on commit 5966579

Please sign in to comment.