From 59519ff5c11e37941aa6c8a3ab080ca316885403 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Sun, 12 Nov 2023 10:07:59 +0000 Subject: [PATCH] add compression v2 API and lz4_frame/lz4_raw/lz4_hadoop codec --- CMakeLists.txt | 27 +- velox/common/compression/CMakeLists.txt | 6 +- velox/common/compression/Compression.cpp | 121 ++++ velox/common/compression/Compression.h | 208 +++++++ .../compression/HadoopCompressionFormat.cpp | 81 +++ .../compression/HadoopCompressionFormat.h | 41 ++ velox/common/compression/Lz4Compression.cpp | 522 ++++++++++++++++++ velox/common/compression/Lz4Compression.h | 152 +++++ .../compression/tests/CompressionTest.cpp | 413 ++++++++++++++ velox/dwio/dwrf/test/TestDecompression.cpp | 12 +- 10 files changed, 1561 insertions(+), 22 deletions(-) create mode 100644 velox/common/compression/HadoopCompressionFormat.cpp create mode 100644 velox/common/compression/HadoopCompressionFormat.h create mode 100644 velox/common/compression/Lz4Compression.cpp create mode 100644 velox/common/compression/Lz4Compression.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 83feabee3ef7f..a1ce66e47a6bf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -415,23 +415,18 @@ endif() set_source(fmt) resolve_dependency(fmt 9.0.0) -if(${VELOX_BUILD_MINIMAL_WITH_DWIO} OR ${VELOX_ENABLE_HIVE_CONNECTOR}) - # DWIO needs all sorts of stream compression libraries. - # - # TODO: make these optional and pluggable. - find_package(ZLIB REQUIRED) - find_package(lz4 REQUIRED) - find_package(lzo2 REQUIRED) - find_package(zstd REQUIRED) - find_package(Snappy REQUIRED) - if(NOT TARGET zstd::zstd) - if(TARGET zstd::libzstd_static) - set(ZSTD_TYPE static) - else() - set(ZSTD_TYPE shared) - endif() - add_library(zstd::zstd ALIAS zstd::libzstd_${ZSTD_TYPE}) +find_package(ZLIB REQUIRED) +find_package(lz4 REQUIRED) +find_package(lzo2 REQUIRED) +find_package(zstd REQUIRED) +find_package(Snappy REQUIRED) +if(NOT TARGET zstd::zstd) + if(TARGET zstd::libzstd_static) + set(ZSTD_TYPE static) + else() + set(ZSTD_TYPE shared) endif() + add_library(zstd::zstd ALIAS zstd::libzstd_${ZSTD_TYPE}) endif() set_source(re2) diff --git a/velox/common/compression/CMakeLists.txt b/velox/common/compression/CMakeLists.txt index e429485151e35..d352763e8161e 100644 --- a/velox/common/compression/CMakeLists.txt +++ b/velox/common/compression/CMakeLists.txt @@ -16,8 +16,10 @@ if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) endif() -add_library(velox_common_compression Compression.cpp LzoDecompressor.cpp) +add_library( + velox_common_compression Compression.cpp LzoDecompressor.cpp + Lz4Compression.cpp HadoopCompressionFormat.cpp) target_link_libraries( velox_common_compression - PUBLIC Folly::folly + PUBLIC Folly::folly lz4::lz4 PRIVATE velox_exception) diff --git a/velox/common/compression/Compression.cpp b/velox/common/compression/Compression.cpp index e17ff941ba99f..2e75fdb425878 100644 --- a/velox/common/compression/Compression.cpp +++ b/velox/common/compression/Compression.cpp @@ -16,6 +16,7 @@ #include "velox/common/compression/Compression.h" #include "velox/common/base/Exceptions.h" +#include "velox/common/compression/Lz4Compression.h" #include @@ -98,4 +99,124 @@ CompressionKind stringToCompressionKind(const std::string& kind) { VELOX_UNSUPPORTED("Not support compression kind {}", kind); } } + +void Codec::init() {} + +bool Codec::supportsGetUncompressedLength(CompressionKind kind) { + switch (kind) { + default: + return false; + } +} + +bool Codec::supportsStreamingCompression(CompressionKind kind) { + switch (kind) { + case CompressionKind::CompressionKind_LZ4: + return true; + default: + return false; + } +} + +bool Codec::supportsCompressFixedLength(CompressionKind kind) { + switch (kind) { + default: + return false; + } +} + +int32_t Codec::maximumCompressionLevel(CompressionKind kind) { + auto codec = Codec::create(kind); + return codec->maximumCompressionLevel(); +} + +int32_t Codec::minimumCompressionLevel(CompressionKind kind) { + auto codec = Codec::create(kind); + return codec->minimumCompressionLevel(); +} + +int32_t Codec::defaultCompressionLevel(CompressionKind kind) { + auto codec = Codec::create(kind); + return codec->defaultCompressionLevel(); +} + +std::unique_ptr Codec::create( + CompressionKind kind, + const CodecOptions& codecOptions) { + if (!isAvailable(kind)) { + auto name = compressionKindToString(kind); + if (folly::StringPiece({name}).startsWith("unknown")) { + VELOX_UNSUPPORTED("Unrecognized codec '{}'", name); + } + VELOX_UNSUPPORTED("Support for codec '{}' not implemented.", name); + } + + auto compressionLevel = codecOptions.compressionLevel; + std::unique_ptr codec; + switch (kind) { + case CompressionKind::CompressionKind_LZ4: + if (auto options = dynamic_cast(&codecOptions)) { + switch (options->type) { + case Lz4CodecOptions::kLz4Frame: + codec = makeLz4FrameCodec(compressionLevel); + break; + case Lz4CodecOptions::kLz4Raw: + codec = makeLz4RawCodec(compressionLevel); + break; + case Lz4CodecOptions::kLz4Hadoop: + codec = makeLz4HadoopCodec(); + break; + } + } + // By default, create LZ4 Frame codec. + codec = makeLz4FrameCodec(compressionLevel); + break; + default: + break; + } + + if (codec == nullptr) { + VELOX_UNSUPPORTED( + "{} codec not implemented", compressionKindToString(kind)); + } + + codec->init(); + + return codec; +} + +std::unique_ptr Codec::create( + CompressionKind kind, + int32_t compressionLevel) { + return create(kind, CodecOptions{compressionLevel}); +} + +bool Codec::isAvailable(CompressionKind kind) { + switch (kind) { + case CompressionKind::CompressionKind_NONE: + case CompressionKind::CompressionKind_LZ4: + return true; + case CompressionKind::CompressionKind_SNAPPY: + case CompressionKind::CompressionKind_GZIP: + case CompressionKind::CompressionKind_ZLIB: + case CompressionKind::CompressionKind_ZSTD: + case CompressionKind::CompressionKind_LZO: + default: + return false; + } +} + +std::optional Codec::getUncompressedLength( + uint64_t inputLength, + const uint8_t* input) const { + return std::nullopt; +} + +uint64_t Codec::compressFixedLength( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + VELOX_UNSUPPORTED("'{}' doesn't support fixed-length compression", name()); +} } // namespace facebook::velox::common diff --git a/velox/common/compression/Compression.h b/velox/common/compression/Compression.h index 82658976571c9..8d4c3648612d1 100644 --- a/velox/common/compression/Compression.h +++ b/velox/common/compression/Compression.h @@ -46,6 +46,214 @@ CompressionKind stringToCompressionKind(const std::string& kind); constexpr uint64_t DEFAULT_COMPRESSION_BLOCK_SIZE = 256 * 1024; +static constexpr int32_t kUseDefaultCompressionLevel = + std::numeric_limits::min(); + +class StreamingCompressor { + public: + virtual ~StreamingCompressor() = default; + + struct CompressResult { + uint64_t bytesRead; + uint64_t bytesWritten; + bool outputTooSmall; + }; + + struct FlushResult { + uint64_t bytesWritten; + bool outputTooSmall; + }; + + struct EndResult { + uint64_t bytesWritten; + bool outputTooSmall; + }; + + /// Compress some input. + /// If CompressResult.outputTooSmall is true on return, then a larger output + /// buffer should be supplied. + virtual CompressResult compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + /// Flush part of the compressed output. + /// If FlushResult.outputTooSmall is true on return, flush() should be called + /// again with a larger buffer. + virtual FlushResult flush(uint8_t* output, uint64_t outputLength) = 0; + + /// End compressing, doing whatever is necessary to end the stream. + /// If EndResult.outputTooSmall is true on return, end() should be called + /// again with a larger buffer. Otherwise, the StreamingCompressor should not + /// be used anymore. end() will flush the compressed output. + virtual EndResult end(uint8_t* output, uint64_t outputLength) = 0; +}; + +class StreamingDecompressor { + public: + virtual ~StreamingDecompressor() = default; + + struct DecompressResult { + uint64_t bytesRead; + uint64_t bytesWritten; + bool outputTooSmall; + }; + + /// Decompress some input. + /// If outputTooSmall is true on return, a larger output buffer needs + /// to be supplied. + virtual DecompressResult decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + /// Return whether the compressed stream is finished. + virtual bool isFinished() = 0; + + /// Reinitialize decompressor, making it ready for a new compressed stream. + virtual void reset() = 0; +}; + +struct CodecOptions { + int32_t compressionLevel; + + CodecOptions(int32_t compressionLevel = kUseDefaultCompressionLevel) + : compressionLevel(compressionLevel) {} + + virtual ~CodecOptions() = default; +}; + +class Codec { + public: + virtual ~Codec() = default; + + /// Create a kind for the given compression algorithm with CodecOptions. + static std::unique_ptr create( + CompressionKind kind, + const CodecOptions& codecOptions = CodecOptions{}); + + /// Create a kind for the given compression algorithm. + static std::unique_ptr create( + CompressionKind kind, + int32_t compressionLevel); + + /// Return true if support for indicated kind has been enabled. + static bool isAvailable(CompressionKind kind); + + /// Return true if indicated kind supports extracting uncompressed length + /// from compressed data. + static bool supportsGetUncompressedLength(CompressionKind kind); + + /// Return true if indicated kind supports one-shot compression with fixed + /// compressed length. + static bool supportsCompressFixedLength(CompressionKind kind); + + /// Return true if indicated kind supports creating streaming de/compressor. + static bool supportsStreamingCompression(CompressionKind kind); + + /// Return the smallest supported compression level for the kind. + /// Note: This function creates a temporary Codec instance. + static int32_t minimumCompressionLevel(CompressionKind kind); + + /// Return the largest supported compression level for the kind + /// Note: This function creates a temporary Codec instance. + static int32_t maximumCompressionLevel(CompressionKind kind); + + /// Return the default compression level. + /// Note: This function creates a temporary Codec instance. + static int32_t defaultCompressionLevel(CompressionKind kind); + + /// Return the smallest supported compression level. + /// If the codec doesn't support compression level, + /// `kUseDefaultCompressionLevel` will be returned. + virtual int32_t minimumCompressionLevel() const = 0; + + /// Return the largest supported compression level. + /// If the codec doesn't support compression level, + /// `kUseDefaultCompressionLevel` will be returned. + virtual int32_t maximumCompressionLevel() const = 0; + + /// Return the default compression level. + /// If the codec doesn't support compression level, + /// `kUseDefaultCompressionLevel` will be returned. + virtual int32_t defaultCompressionLevel() const = 0; + + /// One-shot decompression function. + /// `outputLength` must be correct and therefore be obtained in advance. + /// The actual decompressed length is returned. + /// Note: One-shot decompression is not always compatible with streaming + /// compression. Depending on the codec (e.g. LZ4), different formats may + /// be used. + virtual uint64_t decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + /// Performs one-shot compression. + /// `outputLength` must first have been computed using maxCompressedLength(). + /// The actual compressed length is returned. + /// Note: One-shot compression is not always compatible with streaming + /// decompression. Depending on the codec (e.g. LZ4), different formats may + /// be used. + virtual uint64_t compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + /// Performs one-shot compression. + /// This function compresses data and writes the output up to the specified + /// outputLength. If outputLength is too small to hold all the compressed + /// data, the function doesn't fail. Instead, it returns the number of bytes + /// actually written to the output buffer. Any remaining data that couldn't + /// be written in this call will be written in subsequent calls to this + /// function. This is useful when fixed-length compression blocks are required + /// by the caller. + /// Note: Only Gzip and Zstd codec supports this function. + virtual uint64_t compressFixedLength( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength); + + /// Maximum compressed length of given input length. + virtual uint64_t maxCompressedLength(uint64_t inputLength) = 0; + + /// Retrieves the actual uncompressed length of data using the specified + /// compression library. + /// Note: This functionality is not universally supported by all compression + /// libraries. If not supported, `std::nullopt` will be returned. + std::optional getUncompressedLength( + uint64_t inputLength, + const uint8_t* input) const; + + /// Create a streaming compressor instance. + virtual std::shared_ptr makeStreamingCompressor() = 0; + + /// Create a streaming compressor instance. + virtual std::shared_ptr + makeStreamingDecompressor() = 0; + + /// This Codec's compression type. + virtual CompressionKind compressionKind() const = 0; + + /// The name of this Codec's compression type. + std::string name() const { + return compressionKindToString(compressionKind()); + } + + /// This Codec's compression level, if applicable. + virtual int32_t compressionLevel() const { + return kUseDefaultCompressionLevel; + } + + private: + /// Initializes the codec's resources. + virtual void init(); +}; } // namespace facebook::velox::common template <> diff --git a/velox/common/compression/HadoopCompressionFormat.cpp b/velox/common/compression/HadoopCompressionFormat.cpp new file mode 100644 index 0000000000000..bda5a756c7c07 --- /dev/null +++ b/velox/common/compression/HadoopCompressionFormat.cpp @@ -0,0 +1,81 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/compression/HadoopCompressionFormat.h" +#include "velox/common/base/Exceptions.h" + +#include + +namespace facebook::velox::common { + +bool HadoopCompressionFormat::tryDecompressHadoop( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength, + uint64_t& actualDecompressedSize) { + // Parquet files written with the Hadoop Lz4RawCodec use their own framing. + // The input buffer can contain an arbitrary number of "frames", each + // with the following structure: + // - bytes 0..3: big-endian uint32_t representing the frame decompressed + // size + // - bytes 4..7: big-endian uint32_t representing the frame compressed size + // - bytes 8...: frame compressed data + // + // The Hadoop Lz4Codec source code can be found here: + // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc + uint64_t totalDecompressedSize = 0; + + while (inputLength >= kPrefixLength) { + const uint32_t expectedDecompressedSize = + folly::Endian::big(folly::loadUnaligned(input)); + const uint32_t expectedCompressedSize = folly::Endian::big( + folly::loadUnaligned(input + sizeof(uint32_t))); + input += kPrefixLength; + inputLength -= kPrefixLength; + + if (inputLength < expectedCompressedSize) { + // Not enough bytes for Hadoop "frame". + return false; + } + if (outputLength < expectedDecompressedSize) { + // Not enough bytes to hold advertised output => probably not Hadoop. + return false; + } + // Try decompressing and compare with expected decompressed length. + try { + auto decompressedSize = decompressInternal( + input, expectedCompressedSize, output, outputLength); + if (decompressedSize != expectedDecompressedSize) { + return false; + } + } catch (const VeloxException& e) { + return false; + } + input += expectedCompressedSize; + inputLength -= expectedCompressedSize; + output += expectedDecompressedSize; + outputLength -= expectedDecompressedSize; + totalDecompressedSize += expectedDecompressedSize; + } + + if (inputLength == 0) { + actualDecompressedSize = totalDecompressedSize; + return true; + } + return false; +} +} // namespace facebook::velox::common diff --git a/velox/common/compression/HadoopCompressionFormat.h b/velox/common/compression/HadoopCompressionFormat.h new file mode 100644 index 0000000000000..a23df0622ef93 --- /dev/null +++ b/velox/common/compression/HadoopCompressionFormat.h @@ -0,0 +1,41 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace facebook::velox::common { + +class HadoopCompressionFormat { + protected: + bool tryDecompressHadoop( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength, + uint64_t& actualDecompressedSize); + + virtual uint64_t decompressInternal( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + // Offset starting at which page data can be read/written. + static constexpr uint64_t kPrefixLength = sizeof(uint32_t) * 2; +}; +} // namespace facebook::velox::common diff --git a/velox/common/compression/Lz4Compression.cpp b/velox/common/compression/Lz4Compression.cpp new file mode 100644 index 0000000000000..1742c71b2a912 --- /dev/null +++ b/velox/common/compression/Lz4Compression.cpp @@ -0,0 +1,522 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/compression/Lz4Compression.h" +#include "velox/common/base/Exceptions.h" + +namespace facebook::velox::common { +namespace { + +constexpr int32_t kLz4DefaultCompressionLevel = 1; +constexpr int32_t kLz4MinCompressionLevel = 1; + +void lz4Error(const char* prefixMessage, LZ4F_errorCode_t errorCode) { + VELOX_FAIL(prefixMessage, LZ4F_getErrorName(errorCode)); +} + +LZ4F_preferences_t defaultPreferences() { + LZ4F_preferences_t prefs; + memset(&prefs, 0, sizeof(prefs)); + return prefs; +} + +LZ4F_preferences_t defaultPreferences(int compressionLevel) { + LZ4F_preferences_t prefs = defaultPreferences(); + prefs.compressionLevel = compressionLevel; + return prefs; +} +} // namespace + +class LZ4Compressor : public StreamingCompressor { + public: + explicit LZ4Compressor(int32_t compressionLevel); + + ~LZ4Compressor() override; + + void init(); + + CompressResult compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + FlushResult flush(uint8_t* output, uint64_t outputLength) override; + + EndResult end(uint8_t* output, uint64_t outputLength) override; + + protected: + void + compressBegin(uint8_t* output, size_t& outputLen, uint64_t& bytesWritten); + + int compressionLevel_; + LZ4F_compressionContext_t ctx_{nullptr}; + LZ4F_preferences_t prefs_; + bool firstTime_; +}; + +class LZ4Decompressor : public StreamingDecompressor { + public: + LZ4Decompressor() {} + + ~LZ4Decompressor() override { + if (ctx_ != nullptr) { + LZ4F_freeDecompressionContext(ctx_); + } + } + + void init(); + + void reset() override; + + DecompressResult decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + bool isFinished() override; + + protected: + LZ4F_decompressionContext_t ctx_{nullptr}; + bool finished_{false}; +}; + +LZ4Compressor::LZ4Compressor(int32_t compressionLevel) + : compressionLevel_(compressionLevel) {} + +LZ4Compressor::~LZ4Compressor() { + if (ctx_ != nullptr) { + LZ4F_freeCompressionContext(ctx_); + } +} + +void LZ4Compressor::init() { + LZ4F_errorCode_t ret; + prefs_ = defaultPreferences(compressionLevel_); + firstTime_ = true; + + ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION); + if (LZ4F_isError(ret)) { + lz4Error("LZ4 init failed: ", ret); + } +} + +StreamingCompressor::CompressResult LZ4Compressor::compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + auto inputSize = static_cast(inputLength); + auto outputSize = static_cast(outputLength); + uint64_t bytesWritten = 0; + + if (firstTime_) { + // Output too small to write LZ4F header. + if (outputLength < LZ4F_HEADER_SIZE_MAX) { + return CompressResult{0, 0, true}; + } + compressBegin(output, outputSize, bytesWritten); + } + + if (outputSize < LZ4F_compressBound(inputSize, &prefs_)) { + // Output too small to compress into. + return CompressResult{0, bytesWritten, true}; + } + auto numBytesOrError = LZ4F_compressUpdate( + ctx_, output, outputSize, input, inputSize, nullptr /* options */); + if (LZ4F_isError(numBytesOrError)) { + lz4Error("LZ4 compress update failed: ", numBytesOrError); + } + bytesWritten += static_cast(numBytesOrError); + VELOX_DCHECK_LE(bytesWritten, outputSize); + return CompressResult{inputLength, bytesWritten, false}; +} + +StreamingCompressor::FlushResult LZ4Compressor::flush( + uint8_t* output, + uint64_t outputLength) { + auto outputSize = static_cast(outputLength); + uint64_t bytesWritten = 0; + + if (firstTime_) { + // Output too small to write LZ4F header. + if (outputLength < LZ4F_HEADER_SIZE_MAX) { + return FlushResult{0, true}; + } + compressBegin(output, outputSize, bytesWritten); + } + + if (outputSize < LZ4F_compressBound(0, &prefs_)) { + // Output too small to flush into. + return FlushResult{bytesWritten, true}; + } + + auto numBytesOrError = + LZ4F_flush(ctx_, output, outputSize, nullptr /* options */); + if (LZ4F_isError(numBytesOrError)) { + lz4Error("LZ4 flush failed: ", numBytesOrError); + } + bytesWritten += static_cast(numBytesOrError); + VELOX_DCHECK_LE(bytesWritten, outputLength); + return FlushResult{bytesWritten, false}; +} + +StreamingCompressor::EndResult LZ4Compressor::end( + uint8_t* output, + uint64_t outputLength) { + auto outputSize = static_cast(outputLength); + uint64_t bytesWritten = 0; + + if (firstTime_) { + // Output too small to write LZ4F header. + if (outputLength < LZ4F_HEADER_SIZE_MAX) { + return EndResult{0, true}; + } + compressBegin(output, outputSize, bytesWritten); + } + + if (outputSize < LZ4F_compressBound(0, &prefs_)) { + // Output too small to end frame into. + return EndResult{bytesWritten, true}; + } + + auto numBytesOrError = + LZ4F_compressEnd(ctx_, output, outputSize, nullptr /* options */); + if (LZ4F_isError(numBytesOrError)) { + lz4Error("LZ4 end failed: ", numBytesOrError); + } + bytesWritten += static_cast(numBytesOrError); + VELOX_DCHECK_LE(bytesWritten, outputLength); + return EndResult{bytesWritten, false}; +} + +void LZ4Compressor::compressBegin( + uint8_t* output, + size_t& outputLen, + uint64_t& bytesWritten) { + auto numBytesOrError = LZ4F_compressBegin(ctx_, output, outputLen, &prefs_); + if (LZ4F_isError(numBytesOrError)) { + lz4Error("LZ4 compress begin failed: ", numBytesOrError); + } + firstTime_ = false; + output += numBytesOrError; + outputLen -= numBytesOrError; + bytesWritten += static_cast(numBytesOrError); +} + +void common::LZ4Decompressor::init() { + finished_ = false; + auto ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION); + if (LZ4F_isError(ret)) { + lz4Error("LZ4 init failed: ", ret); + } +} + +void LZ4Decompressor::reset() { +#if defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER >= 10800 + // LZ4F_resetDecompressionContext appeared in 1.8.0 + VELOX_CHECK_NOT_NULL(ctx_); + LZ4F_resetDecompressionContext(ctx_); + finished_ = false; +#else + if (ctx_ != nullptr) { + LZ4F_freeDecompressionContext(ctx_); + } + init(); +#endif +} + +StreamingDecompressor::DecompressResult LZ4Decompressor::decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + auto inputSize = static_cast(inputLength); + auto outputSize = static_cast(outputLength); + + auto ret = LZ4F_decompress( + ctx_, output, &outputSize, input, &inputSize, nullptr /* options */); + if (LZ4F_isError(ret)) { + lz4Error("LZ4 decompress failed: ", ret); + } + finished_ = (ret == 0); + return DecompressResult{ + static_cast(inputSize), + static_cast(outputSize), + (inputSize == 0 && outputSize == 0)}; +} + +bool LZ4Decompressor::isFinished() { + return finished_; +} + +Lz4CodecBase::Lz4CodecBase(int32_t compressionLevel) + : compressionLevel_( + compressionLevel == kUseDefaultCompressionLevel + ? kLz4DefaultCompressionLevel + : compressionLevel) {} + +int32_t Lz4CodecBase::minimumCompressionLevel() const { + return kLz4MinCompressionLevel; +} + +int32_t Lz4CodecBase::maximumCompressionLevel() const { +#if (defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER < 10800) + return 12; +#else + return LZ4F_compressionLevel_max(); +#endif +} + +int32_t Lz4CodecBase::defaultCompressionLevel() const { + return kLz4DefaultCompressionLevel; +} + +int32_t Lz4CodecBase::compressionLevel() const { + return compressionLevel_; +} + +CompressionKind Lz4CodecBase::compressionKind() const { + return CompressionKind::CompressionKind_LZ4; +} + +Lz4FrameCodec::Lz4FrameCodec(int32_t compressionLevel) + : Lz4CodecBase(compressionLevel), + prefs_(defaultPreferences(compressionLevel_)) {} + +uint64_t Lz4FrameCodec::maxCompressedLength(uint64_t inputLen) { + return static_cast( + LZ4F_compressFrameBound(static_cast(inputLen), &prefs_)); +} + +uint64_t Lz4FrameCodec::compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + auto ret = LZ4F_compressFrame( + output, + static_cast(outputLength), + input, + static_cast(inputLength), + &prefs_); + if (LZ4F_isError(ret)) { + lz4Error("Lz4 compression failure: ", ret); + } + return static_cast(ret); +} + +uint64_t Lz4FrameCodec::decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + auto decompressor = makeStreamingDecompressor(); + + uint64_t bytesWritten = 0; + while (!decompressor->isFinished() && inputLength != 0) { + auto result = + decompressor->decompress(input, inputLength, output, outputLength); + input += result.bytesRead; + inputLength -= result.bytesRead; + output += result.bytesWritten; + outputLength -= result.bytesWritten; + bytesWritten += result.bytesWritten; + if (result.outputTooSmall) { + VELOX_FAIL("Lz4 decompression buffer too small."); + } + } + if (!decompressor->isFinished()) { + VELOX_FAIL("Lz4 compressed input contains less than one frame."); + } + if (inputLength != 0) { + VELOX_FAIL("Lz4 compressed input contains more than one frame."); + } + return bytesWritten; +} + +std::shared_ptr Lz4FrameCodec::makeStreamingCompressor() { + auto ptr = std::make_shared(compressionLevel_); + ptr->init(); + return ptr; +} + +std::shared_ptr +Lz4FrameCodec::makeStreamingDecompressor() { + auto ptr = std::make_shared(); + ptr->init(); + return ptr; +} + +Lz4RawCodec::Lz4RawCodec(int32_t compressionLevel) + : Lz4CodecBase(compressionLevel) {} + +uint64_t Lz4RawCodec::maxCompressedLength(uint64_t inputLength) { + return static_cast( + LZ4_compressBound(static_cast(inputLength))); +} + +uint64_t Lz4RawCodec::decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + auto decompressedSize = LZ4_decompress_safe( + reinterpret_cast(input), + reinterpret_cast(output), + static_cast(inputLength), + static_cast(outputLength)); + if (decompressedSize < 0) { + VELOX_FAIL("Corrupt Lz4 compressed data."); + } + return static_cast(decompressedSize); +} + +uint64_t Lz4RawCodec::compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + uint64_t compressedSize; +#ifdef LZ4HC_CLEVEL_MIN + constexpr int kMinHcClevel = LZ4HC_CLEVEL_MIN; +#else // For older versions of the lz4 library. + constexpr int kMinHcClevel = 3; +#endif + if (compressionLevel_ < kMinHcClevel) { + compressedSize = LZ4_compress_default( + reinterpret_cast(input), + reinterpret_cast(output), + static_cast(inputLength), + static_cast(outputLength)); + } else { + compressedSize = LZ4_compress_HC( + reinterpret_cast(input), + reinterpret_cast(output), + static_cast(inputLength), + static_cast(outputLength), + compressionLevel_); + } + if (compressedSize == 0) { + VELOX_FAIL("Lz4 compression failure."); + } + return static_cast(compressedSize); +} + +std::shared_ptr Lz4RawCodec::makeStreamingCompressor() { + VELOX_UNSUPPORTED( + "Streaming compression unsupported with LZ4 raw format. " + "Try using LZ4 frame format instead."); +} + +std::shared_ptr +Lz4RawCodec::makeStreamingDecompressor() { + VELOX_UNSUPPORTED( + "Streaming decompression unsupported with LZ4 raw format. " + "Try using LZ4 frame format instead."); +} + +Lz4HadoopCodec::Lz4HadoopCodec() : Lz4RawCodec(kLz4DefaultCompressionLevel) {} + +uint64_t Lz4HadoopCodec::maxCompressedLength(uint64_t inputLength) { + return kPrefixLength + Lz4RawCodec::maxCompressedLength(inputLength); +} + +uint64_t Lz4HadoopCodec::compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + if (outputLength < kPrefixLength) { + VELOX_FAIL("Output buffer too small for Lz4HadoopCodec compression."); + } + + uint64_t compressedSize = Lz4RawCodec::compress( + input, inputLength, output + kPrefixLength, outputLength - kPrefixLength); + + // Prepend decompressed size in bytes and compressed size in bytes + // to be compatible with Hadoop Lz4RawCodec. + const uint32_t decompressedLength = + folly::Endian::big(static_cast(inputLength)); + const uint32_t compressedLength = + folly::Endian::big(static_cast(compressedSize)); + folly::storeUnaligned(output, decompressedLength); + folly::storeUnaligned(output + sizeof(uint32_t), compressedLength); + + return kPrefixLength + compressedSize; +} + +uint64_t Lz4HadoopCodec::decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + uint64_t decompressedSize; + if (tryDecompressHadoop( + input, inputLength, output, outputLength, decompressedSize)) { + return decompressedSize; + } + // Fall back on raw LZ4 codec (for files produces by earlier versions of + // Parquet C++). + return Lz4RawCodec::decompress(input, inputLength, output, outputLength); +} + +std::shared_ptr Lz4HadoopCodec::makeStreamingCompressor() { + VELOX_UNSUPPORTED( + "Streaming compression unsupported with LZ4 Hadoop raw format. " + "Try using LZ4 frame format instead."); +} + +std::shared_ptr +Lz4HadoopCodec::makeStreamingDecompressor() { + VELOX_UNSUPPORTED( + "Streaming decompression unsupported with LZ4 Hadoop raw format. " + "Try using LZ4 frame format instead."); +} + +int32_t Lz4HadoopCodec::minimumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t Lz4HadoopCodec::maximumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t Lz4HadoopCodec::defaultCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +uint64_t Lz4HadoopCodec::decompressInternal( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + return Lz4RawCodec::decompress(input, inputLength, output, outputLength); +} + +std::unique_ptr makeLz4FrameCodec(int32_t compressionLevel) { + return std::make_unique(compressionLevel); +} + +std::unique_ptr makeLz4RawCodec(int32_t compressionLevel) { + return std::make_unique(compressionLevel); +} + +std::unique_ptr makeLz4HadoopCodec() { + return std::make_unique(); +} +} // namespace facebook::velox::common diff --git a/velox/common/compression/Lz4Compression.h b/velox/common/compression/Lz4Compression.h new file mode 100644 index 0000000000000..001a265fe8893 --- /dev/null +++ b/velox/common/compression/Lz4Compression.h @@ -0,0 +1,152 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include "velox/common/compression/Compression.h" +#include "velox/common/compression/HadoopCompressionFormat.h" + +namespace facebook::velox::common { + +struct Lz4CodecOptions : CodecOptions { + enum Type { kLz4Frame, kLz4Raw, kLz4Hadoop }; + + Lz4CodecOptions( + Lz4CodecOptions::Type type, + int32_t compressionLevel = kUseDefaultCompressionLevel) + : CodecOptions(compressionLevel), type(type) {} + + Lz4CodecOptions::Type type; +}; + +class Lz4CodecBase : public Codec { + public: + explicit Lz4CodecBase(int32_t compressionLevel); + + int32_t minimumCompressionLevel() const override; + + int32_t maximumCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; + + int32_t compressionLevel() const override; + + CompressionKind compressionKind() const override; + + protected: + const int compressionLevel_; +}; + +class Lz4FrameCodec : public Lz4CodecBase { + public: + explicit Lz4FrameCodec(int32_t compressionLevel); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + uint64_t compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + uint64_t decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + std::shared_ptr makeStreamingCompressor() override; + + std::shared_ptr makeStreamingDecompressor() override; + + protected: + const LZ4F_preferences_t prefs_; +}; + +class Lz4RawCodec : public Lz4CodecBase { + public: + explicit Lz4RawCodec(int32_t compressionLevel); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + uint64_t compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + uint64_t decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + std::shared_ptr makeStreamingCompressor() override; + + std::shared_ptr makeStreamingDecompressor() override; +}; + +class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { + public: + Lz4HadoopCodec(); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + uint64_t compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + uint64_t decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + std::shared_ptr makeStreamingCompressor() override; + + std::shared_ptr makeStreamingDecompressor() override; + + int32_t minimumCompressionLevel() const override; + + int32_t maximumCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; + + private: + uint64_t decompressInternal( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; +}; + +// Lz4 frame format codec. +std::unique_ptr makeLz4FrameCodec( + int32_t compressionLevel = kUseDefaultCompressionLevel); + +// Lz4 "raw" format codec. +std::unique_ptr makeLz4RawCodec( + int32_t compressionLevel = kUseDefaultCompressionLevel); + +// Lz4 "Hadoop" format codec (== Lz4 raw codec prefixed with lengths header) +std::unique_ptr makeLz4HadoopCodec(); +} // namespace facebook::velox::common diff --git a/velox/common/compression/tests/CompressionTest.cpp b/velox/common/compression/tests/CompressionTest.cpp index 0b036c55c5129..d39be5da20cd0 100644 --- a/velox/common/compression/tests/CompressionTest.cpp +++ b/velox/common/compression/tests/CompressionTest.cpp @@ -14,14 +14,274 @@ * limitations under the License. */ +#include +#include +#include +#include +#include +#include + #include #include "velox/common/base/VeloxException.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/compression/Compression.h" +#include "velox/common/compression/Lz4Compression.h" namespace facebook::velox::common { +namespace { +const std::shared_ptr kDefaultCodecOptions = + std::make_shared(); + +struct TestParams { + CompressionKind compressionKind; + std::shared_ptr codecOptions; + + explicit TestParams( + common::CompressionKind compressionKind, + std::shared_ptr codecOptions = kDefaultCodecOptions) + : compressionKind(compressionKind), + codecOptions(std::move(codecOptions)) {} +}; + +std::vector makeRandomData(size_t n) { + std::vector data(n); + std::default_random_engine engine(42); + std::uniform_int_distribution dist(0, 255); + std::generate(data.begin(), data.end(), [&]() { return dist(engine); }); + return data; +} + +std::vector makeCompressibleData(size_t size) { + std::string baseData = "The quick brown fox jumps over the lazy dog"; + auto repeats = static_cast(1 + size / baseData.size()); + + std::vector data(baseData.size() * repeats); + for (int i = 0; i < repeats; ++i) { + std::memcpy( + data.data() + i * baseData.size(), baseData.data(), baseData.size()); + } + data.resize(size); + return data; +} + +std::function makeRandomInputSize() { + std::default_random_engine engine(42); + std::uniform_int_distribution sizeDistribution(10, 40); + return [=]() mutable -> uint64_t { return sizeDistribution(engine); }; +} + +// Check roundtrip of one-shot compression and decompression functions. +void checkCodecRoundtrip( + Codec* c1, + Codec* c2, + const std::vector& data) { + auto maxCompressedLen = + static_cast(c1->maxCompressedLength(data.size())); + std::vector compressed(maxCompressedLen); + std::vector decompressed(data.size()); + + // Compress with codec c1. + auto compressedSize = c1->compress( + data.data(), data.size(), compressed.data(), maxCompressedLen); + compressed.resize(compressedSize); + + // Decompress with codec c2. + auto decompressedSize = c2->decompress( + compressed.data(), + compressed.size(), + decompressed.data(), + decompressed.size()); + + ASSERT_EQ(data, decompressed); + ASSERT_EQ(data.size(), decompressedSize); +} + +// Use same codec for both compression and decompression. +void checkCodecRoundtrip( + const std::unique_ptr& codec, + const std::vector& data) { + checkCodecRoundtrip(codec.get(), codec.get(), data); +} + +// Compress with codec c1 and decompress with codec c2. +void checkCodecRoundtrip( + const std::unique_ptr& c1, + const std::unique_ptr& c2, + const std::vector& data) { + checkCodecRoundtrip(c1.get(), c2.get(), data); +} + +void streamingCompress( + const std::shared_ptr& compressor, + const std::vector& uncompressed, + std::vector& compressed) { + const uint8_t* input = uncompressed.data(); + uint64_t remaining = uncompressed.size(); + uint64_t compressedSize = 0; + compressed.resize(10); + bool doFlush = false; + + // Generate small random input buffer size. + auto randomInputSize = makeRandomInputSize(); + + // Continue decompressing until consuming all compressed data . + while (remaining > 0) { + // Feed a small amount each time. + auto inputLength = std::min(remaining, randomInputSize()); + auto outputLength = compressed.size() - compressedSize; + uint8_t* output = compressed.data() + compressedSize; + + // Compress once. + auto compressResult = + compressor->compress(input, inputLength, output, outputLength); + ASSERT_LE(compressResult.bytesRead, inputLength); + ASSERT_LE(compressResult.bytesWritten, outputLength); + + // Update result. + compressedSize += compressResult.bytesWritten; + input += compressResult.bytesRead; + remaining -= compressResult.bytesRead; + + // Grow compressed buffer if it's too small. + if (compressResult.outputTooSmall) { + compressed.resize(compressed.capacity() * 2); + } + + // Once every two iterations, do a flush. + if (doFlush) { + StreamingCompressor::FlushResult flushResult; + do { + outputLength = compressed.size() - compressedSize; + output = compressed.data() + compressedSize; + flushResult = compressor->flush(output, outputLength); + ASSERT_LE(flushResult.bytesWritten, outputLength); + compressedSize += flushResult.bytesWritten; + if (flushResult.outputTooSmall) { + compressed.resize(compressed.capacity() * 2); + } + } while (flushResult.outputTooSmall); + } + doFlush = !doFlush; + } + + // End the compressed stream. + StreamingCompressor::EndResult endResult; + do { + int64_t outputLength = compressed.size() - compressedSize; + uint8_t* output = compressed.data() + compressedSize; + endResult = compressor->end(output, outputLength); + ASSERT_LE(endResult.bytesWritten, outputLength); + compressedSize += endResult.bytesWritten; + if (endResult.outputTooSmall) { + compressed.resize(compressed.capacity() * 2); + } + } while (endResult.outputTooSmall); + compressed.resize(compressedSize); +} + +void streamingDecompress( + const std::shared_ptr& decompressor, + const std::vector& compressed, + std::vector& decompressed) { + const uint8_t* input = compressed.data(); + uint64_t remaining = compressed.size(); + uint64_t decompressedSize = 0; + decompressed.resize(10); + + // Generate small random input buffer size. + auto ramdomInputSize = makeRandomInputSize(); + + // Continue decompressing until finishes. + while (!decompressor->isFinished()) { + // Feed a small amount each time. + auto inputLength = std::min(remaining, ramdomInputSize()); + auto outputLength = decompressed.size() - decompressedSize; + uint8_t* output = decompressed.data() + decompressedSize; + + // Decompress once. + auto result = + decompressor->decompress(input, inputLength, output, outputLength); + ASSERT_LE(result.bytesRead, inputLength); + ASSERT_LE(result.bytesWritten, outputLength); + ASSERT_TRUE( + result.outputTooSmall || result.bytesWritten > 0 || + result.bytesRead > 0) + << "Decompression not progressing anymore"; + + // Update result. + decompressedSize += result.bytesWritten; + input += result.bytesRead; + remaining -= result.bytesRead; + + // Grow decompressed buffer if it's too small. + if (result.outputTooSmall) { + decompressed.resize(decompressed.capacity() * 2); + } + } + ASSERT_TRUE(decompressor->isFinished()); + ASSERT_EQ(remaining, 0); + decompressed.resize(decompressedSize); +} + +// Check the streaming compressor against one-shot decompression. +void checkStreamingCompressor(Codec* codec, const std::vector& data) { + // Run streaming compression. + std::vector compressed; + streamingCompress(codec->makeStreamingCompressor(), data, compressed); + + // Check decompressing the compressed data. + std::vector decompressed(data.size()); + ASSERT_NO_THROW(codec->decompress( + compressed.data(), + compressed.size(), + decompressed.data(), + decompressed.size())); + ASSERT_EQ(data, decompressed); +} + +// Check the streaming decompressor against one-shot compression. +void checkStreamingDecompressor( + Codec* codec, + const std::vector& data) { + // Create compressed data. + auto maxCompressedLen = codec->maxCompressedLength(data.size()); + std::vector compressed(maxCompressedLen); + auto compressedSize = codec->compress( + data.data(), data.size(), compressed.data(), maxCompressedLen); + compressed.resize(compressedSize); + + // Run streaming decompression. + std::vector decompressed; + streamingDecompress( + codec->makeStreamingDecompressor(), compressed, decompressed); + + // Check the decompressed data. + ASSERT_EQ(data.size(), decompressed.size()); + ASSERT_EQ(data, decompressed); +} + +// Check the streaming compressor and decompressor together. +void checkStreamingRoundtrip( + const std::shared_ptr& compressor, + const std::shared_ptr& decompressor, + const std::vector& data) { + std::vector compressed; + streamingCompress(compressor, data, compressed); + std::vector decompressed; + streamingDecompress(decompressor, compressed, decompressed); + ASSERT_EQ(data, decompressed); +} + +void checkStreamingRoundtrip(Codec* codec, const std::vector& data) { + checkStreamingRoundtrip( + codec->makeStreamingCompressor(), + codec->makeStreamingDecompressor(), + data); +} +} // namespace + class CompressionTest : public testing::Test {}; TEST_F(CompressionTest, testCompressionNames) { @@ -31,6 +291,7 @@ TEST_F(CompressionTest, testCompressionNames) { EXPECT_EQ("lzo", compressionKindToString(CompressionKind_LZO)); EXPECT_EQ("lz4", compressionKindToString(CompressionKind_LZ4)); EXPECT_EQ("zstd", compressionKindToString(CompressionKind_ZSTD)); + EXPECT_EQ("gzip", compressionKindToString(CompressionKind_GZIP)); EXPECT_EQ( "unknown - 99", compressionKindToString(static_cast(99))); @@ -59,4 +320,156 @@ TEST_F(CompressionTest, stringToCompressionKind) { VELOX_ASSERT_THROW( stringToCompressionKind("bz2"), "Not support compression kind bz2"); } + +class CodecTest : public ::testing::TestWithParam { + protected: + static CompressionKind getCompressionKind() { + return GetParam().compressionKind; + } + + static const CodecOptions& getCodecOptions() { + return *GetParam().codecOptions; + } + + static std::unique_ptr makeCodec() { + return Codec::create(getCompressionKind(), getCodecOptions()); + } +}; + +TEST_P(CodecTest, specifyCompressionLevel) { + std::vector data = makeRandomData(2000); + const auto kind = getCompressionKind(); + if (!Codec::isAvailable(kind)) { + // Support for this codec hasn't been built. + VELOX_ASSERT_THROW( + Codec::create(kind, kUseDefaultCompressionLevel), + "Support for codec '" + compressionKindToString(kind) + + "' not implemented."); + return; + } + auto compressionLevels = { + Codec::defaultCompressionLevel(kind), + Codec::minimumCompressionLevel(kind), + Codec::maximumCompressionLevel(kind)}; + for (auto compressionLevel : compressionLevels) { + auto codec = Codec::create(kind, compressionLevel); + checkCodecRoundtrip(codec, data); + } +} + +TEST_P(CodecTest, getUncompressedLength) { + auto codec = makeCodec(); + auto inputLength = 100; + auto input = makeRandomData(inputLength); + std::vector compressed(codec->maxCompressedLength(input.size())); + auto compressedLength = codec->compress( + input.data(), inputLength, compressed.data(), compressed.size()); + compressed.resize(compressedLength); + + if (Codec::supportsGetUncompressedLength(getCompressionKind())) { + ASSERT_EQ( + codec->getUncompressedLength(compressedLength, compressed.data()), + inputLength); + } else { + ASSERT_EQ( + codec->getUncompressedLength(compressedLength, compressed.data()), + std::nullopt); + } +} + +TEST_P(CodecTest, codecRoundtrip) { + auto codec = makeCodec(); + for (int dataSize : {0, 10, 10000, 100000}) { + checkCodecRoundtrip(codec, makeRandomData(dataSize)); + checkCodecRoundtrip(codec, makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingCompressor) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + for (auto dataSize : {0, 10, 10000, 100000}) { + auto codec = makeCodec(); + checkStreamingCompressor(codec.get(), makeRandomData(dataSize)); + checkStreamingCompressor(codec.get(), makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingDecompressor) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + for (auto dataSize : {0, 10, 10000, 100000}) { + auto codec = makeCodec(); + checkStreamingDecompressor(codec.get(), makeRandomData(dataSize)); + checkStreamingDecompressor(codec.get(), makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingRoundtrip) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + for (auto dataSize : {0, 10, 10000, 100000}) { + auto codec = makeCodec(); + checkStreamingRoundtrip(codec.get(), makeRandomData(dataSize)); + checkStreamingRoundtrip(codec.get(), makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingDecompressorReuse) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + auto codec = makeCodec(); + auto decompressor = codec->makeStreamingDecompressor(); + checkStreamingRoundtrip( + codec->makeStreamingCompressor(), decompressor, makeRandomData(100)); + + // StreamingDecompressor::reset() should allow reusing decompressor for a + // new stream. + decompressor->reset(); + checkStreamingRoundtrip( + codec->makeStreamingCompressor(), decompressor, makeRandomData(200)); +} + +INSTANTIATE_TEST_SUITE_P( + TestLz4Frame, + CodecTest, + ::testing::Values(TestParams{ + CompressionKind::CompressionKind_LZ4, + std::make_shared(Lz4CodecOptions::kLz4Frame)})); + +INSTANTIATE_TEST_SUITE_P( + TestLz4Raw, + CodecTest, + ::testing::Values(TestParams{ + CompressionKind::CompressionKind_LZ4, + std::make_shared(Lz4CodecOptions::kLz4Raw)})); + +INSTANTIATE_TEST_SUITE_P( + TestLz4Hadoop, + CodecTest, + ::testing::Values(TestParams{ + CompressionKind::CompressionKind_LZ4, + std::make_shared(Lz4CodecOptions::kLz4Hadoop)})); + +TEST(CodecLZ4HadoopTest, compatibility) { + // LZ4 Hadoop codec should be able to read back LZ4 raw blocks. + auto c1 = Codec::create( + CompressionKind::CompressionKind_LZ4, + Lz4CodecOptions{Lz4CodecOptions::kLz4Raw}); + auto c2 = Codec::create( + CompressionKind::CompressionKind_LZ4, + Lz4CodecOptions{Lz4CodecOptions::kLz4Hadoop}); + + for (auto dataSize : {0, 10, 10000, 100000}) { + checkCodecRoundtrip(c1, c2, makeRandomData(dataSize)); + } +} } // namespace facebook::velox::common diff --git a/velox/dwio/dwrf/test/TestDecompression.cpp b/velox/dwio/dwrf/test/TestDecompression.cpp index 43033158e53ce..3d6da0455bc8e 100644 --- a/velox/dwio/dwrf/test/TestDecompression.cpp +++ b/velox/dwio/dwrf/test/TestDecompression.cpp @@ -844,8 +844,12 @@ void writeHeader(char* buffer, size_t compressedSize, bool original) { buffer[2] = static_cast(compressedSize >> 15); } -size_t -compress(char* buf, size_t size, char* output, size_t offset, Codec& codec) { +size_t compress( + char* buf, + size_t size, + char* output, + size_t offset, + folly::io::Codec& codec) { auto ioBuf = folly::IOBuf::wrapBuffer(buf, size); auto compressed = codec.compress(ioBuf.get()); auto str = compressed->moveToFbString(); @@ -870,7 +874,7 @@ class TestSeek : public ::testing::Test { kind, std::move(input), bufferSize, *pool_, "Test Decompression"); } - void runTest(Codec& codec, CompressionKind kind) { + void runTest(folly::io::Codec& codec, CompressionKind kind) { constexpr size_t inputSize = 1024; constexpr size_t outputSize = 4096; char output[outputSize]; @@ -911,7 +915,7 @@ class TestSeek : public ::testing::Test { } static void prepareTestData( - Codec& codec, + folly::io::Codec& codec, char* input1, char* input2, size_t inputSize,