From 3b5d9ed4dd08fa4666b9b50b7c0feccd96160d65 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Tue, 6 Aug 2024 16:59:10 +0800 Subject: [PATCH] [GLUTEN-6724][CH] Shuffle writer supports compression level configuration for CompressionCodecFactory --- .../gluten/vectorized/BlockOutputStream.java | 9 ++++++++- .../vectorized/CHShuffleSplitterJniWrapper.java | 6 ++++++ .../vectorized/CHColumnarBatchSerializer.scala | 13 ++++++++++--- .../spark/shuffle/CHColumnarShuffleWriter.scala | 10 ++++++++-- .../spark/sql/execution/utils/CHExecUtil.scala | 6 ++++-- .../benchmarks/CHStorageJoinBenchmark.scala | 3 ++- cpp-ch/local-engine/Shuffle/PartitionWriter.cpp | 15 ++++++++++----- cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp | 5 +++-- cpp-ch/local-engine/Shuffle/ShuffleWriter.h | 2 +- cpp-ch/local-engine/local_engine_jni.cpp | 8 ++++++-- .../CHCelebornColumnarBatchSerializer.scala | 11 ++++++++--- .../shuffle/CHCelebornColumnarShuffleWriter.scala | 1 + .../shuffle/CelebornColumnarShuffleWriter.scala | 5 ++++- .../writer/VeloxUniffleColumnarShuffleWriter.java | 8 ++++++-- 14 files changed, 77 insertions(+), 25 deletions(-) diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java index 40e2c2c56b77..e209010b2f85 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java @@ -38,6 +38,7 @@ public BlockOutputStream( SQLMetric dataSize, boolean compressionEnable, String defaultCompressionCodec, + int defaultCompressionLevel, int bufferSize) { OutputStream unwrapOutputStream = CHShuffleWriteStreamFactory.unwrapSparkCompressionOutputStream( @@ -50,7 +51,12 @@ public BlockOutputStream( } this.instance = nativeCreate( - this.outputStream, buffer, defaultCompressionCodec, compressionEnable, bufferSize); + this.outputStream, + buffer, + defaultCompressionCodec, + defaultCompressionLevel, + compressionEnable, + bufferSize); this.dataSize = dataSize; } @@ -58,6 +64,7 @@ private native long nativeCreate( OutputStream outputStream, byte[] buffer, String defaultCompressionCodec, + int defaultCompressionLevel, boolean compressionEnable, int bufferSize); diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java index 864cc4eb70ac..7bc4f5dac6b8 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java @@ -27,6 +27,7 @@ public long make( long mapId, int bufferSize, String codec, + int level, String dataFile, String localDirs, int subDirsPerLocalDir, @@ -43,6 +44,7 @@ public long make( mapId, bufferSize, codec, + level, dataFile, localDirs, subDirsPerLocalDir, @@ -58,6 +60,7 @@ public long makeForRSS( long mapId, int bufferSize, String codec, + int level, long spillThreshold, String hashAlgorithm, Object pusher, @@ -71,6 +74,7 @@ public long makeForRSS( mapId, bufferSize, codec, + level, spillThreshold, hashAlgorithm, pusher, @@ -86,6 +90,7 @@ public native long nativeMake( long mapId, int bufferSize, String codec, + int level, String dataFile, String localDirs, int subDirsPerLocalDir, @@ -103,6 +108,7 @@ public native long nativeMakeForRSS( long mapId, int bufferSize, String codec, + int level, long spillThreshold, String hashAlgorithm, Object pusher, diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala index f640bfd2d7f1..66f8cc3f56f0 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala @@ -54,8 +54,14 @@ private class CHColumnarBatchSerializerInstance( extends SerializerInstance with Logging { - private lazy val compressionCodec = - GlutenShuffleUtils.getCompressionCodec(SparkEnv.get.conf).toUpperCase(Locale.ROOT) + private lazy val conf = SparkEnv.get.conf + private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf) + private lazy val customizedCompressCodec = compressionCodec.toUpperCase(Locale.ROOT) + private lazy val compressionLevel = + GlutenShuffleUtils.getCompressionLevel( + conf, + compressionCodec, + GlutenConfig.getConf.columnarShuffleCodecBackend.orNull) override def deserializeStream(in: InputStream): DeserializationStream = { new DeserializationStream { @@ -136,7 +142,8 @@ private class CHColumnarBatchSerializerInstance( writeBuffer, dataSize, CHBackendSettings.useCustomizedShuffleCodec, - compressionCodec, + customizedCompressCodec, + compressionLevel, CHBackendSettings.customizeBufferSize ) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala index db9bba5f170a..1b9bb61b3b09 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala @@ -51,8 +51,13 @@ class CHColumnarShuffleWriter[K, V]( .mkString(",") private val subDirsPerLocalDir = blockManager.diskBlockManager.subDirsPerLocalDir private val splitSize = GlutenConfig.getConf.maxBatchSize - private val customizedCompressCodec = - GlutenShuffleUtils.getCompressionCodec(conf).toUpperCase(Locale.ROOT) + private val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf) + private val customizedCompressCodec = compressionCodec.toUpperCase(Locale.ROOT) + private val compressionLevel = + GlutenShuffleUtils.getCompressionLevel( + conf, + compressionCodec, + GlutenConfig.getConf.columnarShuffleCodecBackend.orNull) private val maxSortBufferSize = GlutenConfig.getConf.chColumnarMaxSortBufferSize private val forceMemorySortShuffle = GlutenConfig.getConf.chColumnarForceMemorySortShuffle private val spillThreshold = GlutenConfig.getConf.chColumnarShuffleSpillThreshold @@ -99,6 +104,7 @@ class CHColumnarShuffleWriter[K, V]( mapId, splitSize, customizedCompressCodec, + compressionLevel, dataTmp.getAbsolutePath, localDirs, subDirsPerLocalDir, diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala index 17eb0ed0b037..a3b141640f54 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala @@ -59,14 +59,16 @@ object CHExecUtil extends Logging { dataSize: SQLMetric, iter: Iterator[ColumnarBatch], compressionCodec: Option[String] = Some("lz4"), + compressionLevel: Option[Int] = Some(Int.MinValue), bufferSize: Int = 4 << 10): Iterator[(Int, Array[Byte])] = { var count = 0 val bos = new ByteArrayOutputStream() val buffer = new Array[Byte](bufferSize) // 4K + val level = compressionLevel.getOrElse(Int.MinValue) val blockOutputStream = compressionCodec - .map(new BlockOutputStream(bos, buffer, dataSize, true, _, bufferSize)) - .getOrElse(new BlockOutputStream(bos, buffer, dataSize, false, "", bufferSize)) + .map(new BlockOutputStream(bos, buffer, dataSize, true, _, level, bufferSize)) + .getOrElse(new BlockOutputStream(bos, buffer, dataSize, false, "", level, bufferSize)) while (iter.hasNext) { val batch = iter.next() count += batch.numRows diff --git a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala index 194eccc50878..910a3a4f871a 100644 --- a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala +++ b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala @@ -191,7 +191,8 @@ object CHStorageJoinBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark batch => val bos = new ByteArrayOutputStream() val buffer = new Array[Byte](4 << 10) // 4K - val dout = new BlockOutputStream(bos, buffer, dataSize, true, "lz4", buffer.length) + val dout = + new BlockOutputStream(bos, buffer, dataSize, true, "lz4", Int.MinValue, buffer.length) dout.write(batch) dout.flush() dout.close() diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index a2ef0888aeff..19aee8a5eec9 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -139,7 +139,8 @@ size_t LocalPartitionWriter::evictPartitions() { auto file = getNextSpillFile(); WriteBufferFromFile output(file, shuffle_writer->options.io_buffer_size); - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + auto family = boost::to_upper_copy(shuffle_writer->options.compress_method); + auto codec = DB::CompressionCodecFactory::instance().get(family, family == 'LZ4' ? {} : std::optional(shuffle_writer->options.compress_level)); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); @@ -200,7 +201,8 @@ String Spillable::getNextSpillFile() std::vector Spillable::mergeSpills(CachedShuffleWriter * shuffle_writer, WriteBuffer & data_file, ExtraData extra_data) { - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + auto family = boost::to_upper_copy(shuffle_writer->options.compress_method); + auto codec = DB::CompressionCodecFactory::instance().get(family, family == 'LZ4' ? {} : std::optional(shuffle_writer->options.compress_level)); CompressedWriteBuffer compressed_output(data_file, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); @@ -352,7 +354,8 @@ size_t MemorySortLocalPartitionWriter::evictPartitions() return; auto file = getNextSpillFile(); WriteBufferFromFile output(file, shuffle_writer->options.io_buffer_size); - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + auto family = boost::to_upper_copy(shuffle_writer->options.compress_method); + auto codec = DB::CompressionCodecFactory::instance().get(family, family == 'LZ4' ? {} : std::optional(shuffle_writer->options.compress_level)); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, output_header); @@ -453,7 +456,8 @@ size_t MemorySortCelebornPartitionWriter::evictPartitions() return; WriteBufferFromOwnString output; - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + auto family = boost::to_upper_copy(shuffle_writer->options.compress_method); + auto codec = DB::CompressionCodecFactory::instance().get(family, family == 'LZ4' ? {} : std::optional(shuffle_writer->options.compress_level)); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); @@ -564,7 +568,8 @@ size_t CelebornPartitionWriter::evictSinglePartition(size_t partition_id) return; WriteBufferFromOwnString output; - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + auto family = boost::to_upper_copy(shuffle_writer->options.compress_method); + auto codec = DB::CompressionCodecFactory::instance().get(family, family == 'LZ4' ? {} : std::optional(shuffle_writer->options.compress_level)); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp index dddf0b895fdf..c272c76865ad 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp @@ -25,13 +25,14 @@ using namespace DB; namespace local_engine { ShuffleWriter::ShuffleWriter( - jobject output_stream, jbyteArray buffer, const std::string & codecStr, bool enable_compression, size_t customize_buffer_size) + jobject output_stream, jbyteArray buffer, const std::string & codecStr, jint level, bool enable_compression, size_t customize_buffer_size) { compression_enable = enable_compression; write_buffer = std::make_unique(output_stream, buffer, customize_buffer_size); if (compression_enable) { - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(codecStr), {}); + auto family = boost::to_upper_copy(codecStr); + auto codec = DB::CompressionCodecFactory::instance().get(family, family == 'LZ4' ? {} : std::optional(level)); compressed_out = std::make_unique(*write_buffer, codec); } } diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriter.h b/cpp-ch/local-engine/Shuffle/ShuffleWriter.h index 98f67d1ccadb..541e93e0347c 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleWriter.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleWriter.h @@ -24,7 +24,7 @@ class ShuffleWriter { public: ShuffleWriter( - jobject output_stream, jbyteArray buffer, const std::string & codecStr, bool enable_compression, size_t customize_buffer_size); + jobject output_stream, jbyteArray buffer, const std::string & codecStr, jint level, bool enable_compression, size_t customize_buffer_size); virtual ~ShuffleWriter(); void write(const DB::Block & block); void flush(); diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index c4e8ec67b106..6283881afae5 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -544,6 +544,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na jlong map_id, jint split_size, jstring codec, + jint compress_level, jstring data_file, jstring local_dirs, jint num_sub_dirs, @@ -585,6 +586,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), + .compress_level = compress_level, .spill_threshold = static_cast(spill_threshold), .hash_algorithm = jstring2string(env, hash_algorithm), .max_sort_buffer_size = static_cast(max_sort_buffer_size), @@ -606,6 +608,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na jlong map_id, jint split_size, jstring codec, + jint compress_level, jlong spill_threshold, jstring hash_algorithm, jobject pusher, @@ -637,6 +640,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), + .compress_level = compress_level, .spill_threshold = static_cast(spill_threshold), .hash_algorithm = jstring2string(env, hash_algorithm), .force_memory_sort = static_cast(force_memory_sort)}; @@ -1158,11 +1162,11 @@ JNIEXPORT jint Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeNextPa } JNIEXPORT jlong Java_org_apache_gluten_vectorized_BlockOutputStream_nativeCreate( - JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer, jstring codec, jboolean compressed, jint customize_buffer_size) + JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer, jstring codec, jint level, jboolean compressed, jint customize_buffer_size) { LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleWriter * writer - = new local_engine::ShuffleWriter(output_stream, buffer, jstring2string(env, codec), compressed, customize_buffer_size); + = new local_engine::ShuffleWriter(output_stream, buffer, jstring2string(env, codec), level, compressed, customize_buffer_size); return reinterpret_cast(writer); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala index 3619855f74ed..e5691900edc5 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala @@ -58,8 +58,12 @@ private class CHCelebornColumnarBatchSerializerInstance( extends SerializerInstance with Logging { - private lazy val compressionCodec = - GlutenShuffleUtils.getCompressionCodec(SparkEnv.get.conf).toUpperCase(Locale.ROOT) + private lazy val conf = SparkEnv.get.conf + private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf) + private lazy val customizedCompressCodec = compressionCodec.toUpperCase(Locale.ROOT) + private lazy val compressionLevel = + GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec, + GlutenConfig.getConf.columnarShuffleCodecBackend.orNull) override def deserializeStream(in: InputStream): DeserializationStream = { new DeserializationStream { @@ -199,7 +203,8 @@ private class CHCelebornColumnarBatchSerializerInstance( writeBuffer, dataSize, CHBackendSettings.useCustomizedShuffleCodec, - compressionCodec, + customizedCompressCodec, + compressionLevel, CHBackendSettings.customizeBufferSize ) diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala index c7d7957c15b6..d28da685e9d6 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala @@ -106,6 +106,7 @@ class CHCelebornColumnarShuffleWriter[K, V]( mapId, nativeBufferSize, customizedCompressCodec, + compressionLevel, GlutenConfig.getConf.chColumnarShuffleSpillThreshold, CHBackendSettings.shuffleHashAlgorithm, celebornPartitionPusher, diff --git a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala index f71fadd4cd64..3f7c3586ced2 100644 --- a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala @@ -94,7 +94,10 @@ abstract class CelebornColumnarShuffleWriter[K, V]( } protected val compressionLevel: Int = - GlutenShuffleUtils.getCompressionLevel(conf, customizedCompressionCodec, null) + GlutenShuffleUtils.getCompressionLevel( + conf, + customizedCompressionCodec, + GlutenConfig.getConf.columnarShuffleCodecBackend.orNull) protected val bufferCompressThreshold: Int = GlutenConfig.getConf.columnarShuffleCompressionThreshold diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index d2032fa48564..37dd2903ba31 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -66,7 +66,7 @@ public class VeloxUniffleColumnarShuffleWriter extends RssShuffleWriter null)); } - compressionLevel = GlutenShuffleUtils.getCompressionLevel(sparkConf, compressionCodec, null); } @Override