From 04becc9cefa93ed01f82d41febd82926d5e362c0 Mon Sep 17 00:00:00 2001 From: "Ma, Rong" Date: Mon, 22 Apr 2024 14:34:21 +0800 Subject: [PATCH] use folly::Expected as return code --- velox/common/base/Status.h | 18 ++ velox/common/compression/Compression.h | 54 ++-- .../compression/HadoopCompressionFormat.cpp | 12 +- .../compression/HadoopCompressionFormat.h | 4 +- velox/common/compression/Lz4Compression.cpp | 256 +++++++++--------- velox/common/compression/Lz4Compression.h | 28 +- .../compression/tests/CompressionTest.cpp | 87 ++++-- 7 files changed, 254 insertions(+), 205 deletions(-) diff --git a/velox/common/base/Status.h b/velox/common/base/Status.h index 94537b17f38c..275301640e81 100644 --- a/velox/common/base/Status.h +++ b/velox/common/base/Status.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -476,6 +477,23 @@ void Status::moveFrom(Status& s) { VELOX_RETURN_IF(!__s.ok(), __s); \ } while (false) +/// Return with given status wrapped in folly::Unexpected if condition is met. +#define VELOX_RETURN_UNEXPECTED_IF(condition, status) \ + do { \ + if (FOLLY_UNLIKELY(condition)) { \ + return (::folly::makeUnexpected(status)); \ + } \ + } while (0) + +/// Propagate any non-successful Status wrapped in folly::Unexpected to the +/// caller. +#define VELOX_RETURN_UNEXPECTED(status) \ + do { \ + ::facebook::velox::Status __s = \ + ::facebook::velox::internal::genericToStatus(status); \ + VELOX_RETURN_IF(!__s.ok(), ::folly::makeUnexpected(__s)); \ + } while (false) + namespace internal { /// Common API for extracting Status from either Status or Result (the latter diff --git a/velox/common/compression/Compression.h b/velox/common/compression/Compression.h index 8d4c3648612d..569a889812bd 100644 --- a/velox/common/compression/Compression.h +++ b/velox/common/compression/Compression.h @@ -17,9 +17,12 @@ #pragma once #include +#include #include #include +#include "velox/common/base/Status.h" + namespace facebook::velox::common { enum CompressionKind { @@ -72,7 +75,7 @@ class StreamingCompressor { /// Compress some input. /// If CompressResult.outputTooSmall is true on return, then a larger output /// buffer should be supplied. - virtual CompressResult compress( + virtual folly::Expected compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -81,13 +84,17 @@ class StreamingCompressor { /// Flush part of the compressed output. /// If FlushResult.outputTooSmall is true on return, flush() should be called /// again with a larger buffer. - virtual FlushResult flush(uint8_t* output, uint64_t outputLength) = 0; + virtual folly::Expected flush( + uint8_t* output, + uint64_t outputLength) = 0; /// End compressing, doing whatever is necessary to end the stream. /// If EndResult.outputTooSmall is true on return, end() should be called /// again with a larger buffer. Otherwise, the StreamingCompressor should not /// be used anymore. end() will flush the compressed output. - virtual EndResult end(uint8_t* output, uint64_t outputLength) = 0; + virtual folly::Expected end( + uint8_t* output, + uint64_t outputLength) = 0; }; class StreamingDecompressor { @@ -103,7 +110,7 @@ class StreamingDecompressor { /// Decompress some input. /// If outputTooSmall is true on return, a larger output buffer needs /// to be supplied. - virtual DecompressResult decompress( + virtual folly::Expected decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -113,7 +120,7 @@ class StreamingDecompressor { virtual bool isFinished() = 0; /// Reinitialize decompressor, making it ready for a new compressed stream. - virtual void reset() = 0; + virtual Status reset() = 0; }; struct CodecOptions { @@ -180,25 +187,25 @@ class Codec { /// `kUseDefaultCompressionLevel` will be returned. virtual int32_t defaultCompressionLevel() const = 0; - /// One-shot decompression function. - /// `outputLength` must be correct and therefore be obtained in advance. - /// The actual decompressed length is returned. - /// Note: One-shot decompression is not always compatible with streaming - /// compression. Depending on the codec (e.g. LZ4), different formats may + /// Performs one-shot compression. + /// `outputLength` must first have been computed using maxCompressedLength(). + /// The actual compressed length will be written to actualOutputLength. + /// Note: One-shot compression is not always compatible with streaming + /// decompression. Depending on the codec (e.g. LZ4), different formats may /// be used. - virtual uint64_t decompress( + virtual folly::Expected compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) = 0; - /// Performs one-shot compression. - /// `outputLength` must first have been computed using maxCompressedLength(). - /// The actual compressed length is returned. - /// Note: One-shot compression is not always compatible with streaming - /// decompression. Depending on the codec (e.g. LZ4), different formats may + /// One-shot decompression function. + /// `outputLength` must be correct and therefore be obtained in advance. + /// The actual decompressed length is returned. + /// Note: One-shot decompression is not always compatible with streaming + /// compression. Depending on the codec (e.g. LZ4), different formats may /// be used. - virtual uint64_t compress( + virtual folly::Expected decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -231,11 +238,18 @@ class Codec { const uint8_t* input) const; /// Create a streaming compressor instance. - virtual std::shared_ptr makeStreamingCompressor() = 0; + virtual folly::Expected, Status> + makeStreamingCompressor() { + return folly::makeUnexpected(Status::Invalid( + "Streaming compression is unsupported with {} format.", name())); + } /// Create a streaming compressor instance. - virtual std::shared_ptr - makeStreamingDecompressor() = 0; + virtual folly::Expected, Status> + makeStreamingDecompressor() { + return folly::makeUnexpected(Status::Invalid( + "Streaming decompression is unsupported with {} format.", name())); + } /// This Codec's compression type. virtual CompressionKind compressionKind() const = 0; diff --git a/velox/common/compression/HadoopCompressionFormat.cpp b/velox/common/compression/HadoopCompressionFormat.cpp index bda5a756c7c0..9b2983c2d0cf 100644 --- a/velox/common/compression/HadoopCompressionFormat.cpp +++ b/velox/common/compression/HadoopCompressionFormat.cpp @@ -56,15 +56,13 @@ bool HadoopCompressionFormat::tryDecompressHadoop( return false; } // Try decompressing and compare with expected decompressed length. - try { - auto decompressedSize = decompressInternal( - input, expectedCompressedSize, output, outputLength); - if (decompressedSize != expectedDecompressedSize) { - return false; - } - } catch (const VeloxException& e) { + auto maybeDecompressedSize = + decompressInternal(input, expectedCompressedSize, output, outputLength); + if (maybeDecompressedSize.hasError() || + maybeDecompressedSize.value() != expectedDecompressedSize) { return false; } + input += expectedCompressedSize; inputLength -= expectedCompressedSize; output += expectedDecompressedSize; diff --git a/velox/common/compression/HadoopCompressionFormat.h b/velox/common/compression/HadoopCompressionFormat.h index a23df0622ef9..f58a118d1387 100644 --- a/velox/common/compression/HadoopCompressionFormat.h +++ b/velox/common/compression/HadoopCompressionFormat.h @@ -16,7 +16,9 @@ #pragma once +#include #include +#include "velox/common/base/Status.h" namespace facebook::velox::common { @@ -29,7 +31,7 @@ class HadoopCompressionFormat { uint64_t outputLength, uint64_t& actualDecompressedSize); - virtual uint64_t decompressInternal( + virtual folly::Expected decompressInternal( const uint8_t* input, uint64_t inputLength, uint8_t* output, diff --git a/velox/common/compression/Lz4Compression.cpp b/velox/common/compression/Lz4Compression.cpp index 1742c71b2a91..c47a6da9cd8d 100644 --- a/velox/common/compression/Lz4Compression.cpp +++ b/velox/common/compression/Lz4Compression.cpp @@ -23,8 +23,14 @@ namespace { constexpr int32_t kLz4DefaultCompressionLevel = 1; constexpr int32_t kLz4MinCompressionLevel = 1; -void lz4Error(const char* prefixMessage, LZ4F_errorCode_t errorCode) { - VELOX_FAIL(prefixMessage, LZ4F_getErrorName(errorCode)); +#if (defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER < 10800) +constexpr int32_t kLegacyLz4MaxCompressionLevel = 12; +#endif + +static inline Status lz4Error( + const char* prefixMessage, + LZ4F_errorCode_t errorCode) { + return Status::IOError(prefixMessage, LZ4F_getErrorName(errorCode)); } LZ4F_preferences_t defaultPreferences() { @@ -46,20 +52,23 @@ class LZ4Compressor : public StreamingCompressor { ~LZ4Compressor() override; - void init(); + Status init(); - CompressResult compress( + folly::Expected compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) override; - FlushResult flush(uint8_t* output, uint64_t outputLength) override; + folly::Expected flush( + uint8_t* output, + uint64_t outputLength) override; - EndResult end(uint8_t* output, uint64_t outputLength) override; + folly::Expected end(uint8_t* output, uint64_t outputLength) + override; protected: - void + Status compressBegin(uint8_t* output, size_t& outputLen, uint64_t& bytesWritten); int compressionLevel_; @@ -78,11 +87,7 @@ class LZ4Decompressor : public StreamingDecompressor { } } - void init(); - - void reset() override; - - DecompressResult decompress( + folly::Expected decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -90,6 +95,10 @@ class LZ4Decompressor : public StreamingDecompressor { bool isFinished() override; + Status reset() override; + + Status init(); + protected: LZ4F_decompressionContext_t ctx_{nullptr}; bool finished_{false}; @@ -104,18 +113,18 @@ LZ4Compressor::~LZ4Compressor() { } } -void LZ4Compressor::init() { +Status LZ4Compressor::init() { LZ4F_errorCode_t ret; prefs_ = defaultPreferences(compressionLevel_); firstTime_ = true; ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION); - if (LZ4F_isError(ret)) { - lz4Error("LZ4 init failed: ", ret); - } + VELOX_RETURN_IF(LZ4F_isError(ret), lz4Error("LZ4 init failed: ", ret)); + return Status::OK(); } -StreamingCompressor::CompressResult LZ4Compressor::compress( +folly::Expected +LZ4Compressor::compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -129,7 +138,7 @@ StreamingCompressor::CompressResult LZ4Compressor::compress( if (outputLength < LZ4F_HEADER_SIZE_MAX) { return CompressResult{0, 0, true}; } - compressBegin(output, outputSize, bytesWritten); + VELOX_RETURN_UNEXPECTED(compressBegin(output, outputSize, bytesWritten)); } if (outputSize < LZ4F_compressBound(inputSize, &prefs_)) { @@ -138,15 +147,15 @@ StreamingCompressor::CompressResult LZ4Compressor::compress( } auto numBytesOrError = LZ4F_compressUpdate( ctx_, output, outputSize, input, inputSize, nullptr /* options */); - if (LZ4F_isError(numBytesOrError)) { - lz4Error("LZ4 compress update failed: ", numBytesOrError); - } + VELOX_RETURN_UNEXPECTED_IF( + LZ4F_isError(numBytesOrError), + lz4Error("LZ4 compress updated failed: ", numBytesOrError)); bytesWritten += static_cast(numBytesOrError); VELOX_DCHECK_LE(bytesWritten, outputSize); return CompressResult{inputLength, bytesWritten, false}; } -StreamingCompressor::FlushResult LZ4Compressor::flush( +folly::Expected LZ4Compressor::flush( uint8_t* output, uint64_t outputLength) { auto outputSize = static_cast(outputLength); @@ -157,7 +166,7 @@ StreamingCompressor::FlushResult LZ4Compressor::flush( if (outputLength < LZ4F_HEADER_SIZE_MAX) { return FlushResult{0, true}; } - compressBegin(output, outputSize, bytesWritten); + VELOX_RETURN_UNEXPECTED(compressBegin(output, outputSize, bytesWritten)); } if (outputSize < LZ4F_compressBound(0, &prefs_)) { @@ -167,15 +176,15 @@ StreamingCompressor::FlushResult LZ4Compressor::flush( auto numBytesOrError = LZ4F_flush(ctx_, output, outputSize, nullptr /* options */); - if (LZ4F_isError(numBytesOrError)) { - lz4Error("LZ4 flush failed: ", numBytesOrError); - } + VELOX_RETURN_UNEXPECTED_IF( + LZ4F_isError(numBytesOrError), + lz4Error("LZ4 flush failed: ", numBytesOrError)); bytesWritten += static_cast(numBytesOrError); VELOX_DCHECK_LE(bytesWritten, outputLength); return FlushResult{bytesWritten, false}; } -StreamingCompressor::EndResult LZ4Compressor::end( +folly::Expected LZ4Compressor::end( uint8_t* output, uint64_t outputLength) { auto outputSize = static_cast(outputLength); @@ -186,7 +195,7 @@ StreamingCompressor::EndResult LZ4Compressor::end( if (outputLength < LZ4F_HEADER_SIZE_MAX) { return EndResult{0, true}; } - compressBegin(output, outputSize, bytesWritten); + VELOX_RETURN_UNEXPECTED(compressBegin(output, outputSize, bytesWritten)); } if (outputSize < LZ4F_compressBound(0, &prefs_)) { @@ -196,51 +205,56 @@ StreamingCompressor::EndResult LZ4Compressor::end( auto numBytesOrError = LZ4F_compressEnd(ctx_, output, outputSize, nullptr /* options */); - if (LZ4F_isError(numBytesOrError)) { - lz4Error("LZ4 end failed: ", numBytesOrError); - } + VELOX_RETURN_UNEXPECTED_IF( + LZ4F_isError(numBytesOrError), + lz4Error("LZ4 end failed: ", numBytesOrError)); bytesWritten += static_cast(numBytesOrError); VELOX_DCHECK_LE(bytesWritten, outputLength); return EndResult{bytesWritten, false}; } -void LZ4Compressor::compressBegin( +Status LZ4Compressor::compressBegin( uint8_t* output, size_t& outputLen, uint64_t& bytesWritten) { auto numBytesOrError = LZ4F_compressBegin(ctx_, output, outputLen, &prefs_); - if (LZ4F_isError(numBytesOrError)) { - lz4Error("LZ4 compress begin failed: ", numBytesOrError); - } + VELOX_RETURN_IF( + LZ4F_isError(numBytesOrError), + lz4Error("LZ4 compress begin failed: ", numBytesOrError)); firstTime_ = false; output += numBytesOrError; outputLen -= numBytesOrError; bytesWritten += static_cast(numBytesOrError); + return Status::OK(); } -void common::LZ4Decompressor::init() { +Status common::LZ4Decompressor::init() { finished_ = false; auto ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION); - if (LZ4F_isError(ret)) { - lz4Error("LZ4 init failed: ", ret); - } + VELOX_RETURN_IF(LZ4F_isError(ret), lz4Error("LZ4 init failed: ", ret)); + return Status::OK(); } -void LZ4Decompressor::reset() { +Status LZ4Decompressor::reset() { #if defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER >= 10800 // LZ4F_resetDecompressionContext appeared in 1.8.0 VELOX_CHECK_NOT_NULL(ctx_); + if (ctx_ == nullptr) { + return Status::Invalid("LZ4 decompression context is null."); + } LZ4F_resetDecompressionContext(ctx_); finished_ = false; + return Status::OK(); #else if (ctx_ != nullptr) { LZ4F_freeDecompressionContext(ctx_); } - init(); + return init(); #endif } -StreamingDecompressor::DecompressResult LZ4Decompressor::decompress( +folly::Expected +LZ4Decompressor::decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -250,9 +264,8 @@ StreamingDecompressor::DecompressResult LZ4Decompressor::decompress( auto ret = LZ4F_decompress( ctx_, output, &outputSize, input, &inputSize, nullptr /* options */); - if (LZ4F_isError(ret)) { - lz4Error("LZ4 decompress failed: ", ret); - } + VELOX_RETURN_UNEXPECTED_IF( + LZ4F_isError(ret), lz4Error("LZ4 decompress failed: ", ret)); finished_ = (ret == 0); return DecompressResult{ static_cast(inputSize), @@ -276,7 +289,7 @@ int32_t Lz4CodecBase::minimumCompressionLevel() const { int32_t Lz4CodecBase::maximumCompressionLevel() const { #if (defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER < 10800) - return 12; + return kLegacyLz4MaxCompressionLevel; #else return LZ4F_compressionLevel_max(); #endif @@ -303,7 +316,7 @@ uint64_t Lz4FrameCodec::maxCompressedLength(uint64_t inputLen) { LZ4F_compressFrameBound(static_cast(inputLen), &prefs_)); } -uint64_t Lz4FrameCodec::compress( +folly::Expected Lz4FrameCodec::compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -314,51 +327,54 @@ uint64_t Lz4FrameCodec::compress( input, static_cast(inputLength), &prefs_); - if (LZ4F_isError(ret)) { - lz4Error("Lz4 compression failure: ", ret); - } + VELOX_RETURN_UNEXPECTED_IF( + LZ4F_isError(ret), lz4Error("Lz4 compression failure: ", ret)); return static_cast(ret); } -uint64_t Lz4FrameCodec::decompress( +folly::Expected Lz4FrameCodec::decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) { - auto decompressor = makeStreamingDecompressor(); + const auto maybeDecompressor = makeStreamingDecompressor(); + VELOX_RETURN_UNEXPECTED_IF( + maybeDecompressor.hasError(), maybeDecompressor.error()); + const auto& decompressor = maybeDecompressor.value(); uint64_t bytesWritten = 0; while (!decompressor->isFinished() && inputLength != 0) { - auto result = + const auto maybeResult = decompressor->decompress(input, inputLength, output, outputLength); + VELOX_RETURN_UNEXPECTED_IF(maybeResult.hasError(), maybeResult.error()); + + const auto& result = maybeResult.value(); input += result.bytesRead; inputLength -= result.bytesRead; output += result.bytesWritten; outputLength -= result.bytesWritten; bytesWritten += result.bytesWritten; - if (result.outputTooSmall) { - VELOX_FAIL("Lz4 decompression buffer too small."); - } - } - if (!decompressor->isFinished()) { - VELOX_FAIL("Lz4 compressed input contains less than one frame."); - } - if (inputLength != 0) { - VELOX_FAIL("Lz4 compressed input contains more than one frame."); + VELOX_RETURN_UNEXPECTED_IF( + result.outputTooSmall, + Status::IOError("Lz4 decompression buffer too small.")); } + VELOX_RETURN_UNEXPECTED_IF( + !decompressor->isFinished() || inputLength != 0, + Status::IOError("Lz4 compressed input contains less than one frame.")); return bytesWritten; } -std::shared_ptr Lz4FrameCodec::makeStreamingCompressor() { +folly::Expected, Status> +Lz4FrameCodec::makeStreamingCompressor() { auto ptr = std::make_shared(compressionLevel_); - ptr->init(); + VELOX_RETURN_UNEXPECTED(ptr->init()); return ptr; } -std::shared_ptr +folly::Expected, Status> Lz4FrameCodec::makeStreamingDecompressor() { auto ptr = std::make_shared(); - ptr->init(); + VELOX_RETURN_UNEXPECTED(ptr->init()); return ptr; } @@ -370,23 +386,7 @@ uint64_t Lz4RawCodec::maxCompressedLength(uint64_t inputLength) { LZ4_compressBound(static_cast(inputLength))); } -uint64_t Lz4RawCodec::decompress( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength) { - auto decompressedSize = LZ4_decompress_safe( - reinterpret_cast(input), - reinterpret_cast(output), - static_cast(inputLength), - static_cast(outputLength)); - if (decompressedSize < 0) { - VELOX_FAIL("Corrupt Lz4 compressed data."); - } - return static_cast(decompressedSize); -} - -uint64_t Lz4RawCodec::compress( +folly::Expected Lz4RawCodec::compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -411,23 +411,24 @@ uint64_t Lz4RawCodec::compress( static_cast(outputLength), compressionLevel_); } - if (compressedSize == 0) { - VELOX_FAIL("Lz4 compression failure."); - } + VELOX_RETURN_UNEXPECTED_IF( + compressedSize == 0, Status::IOError("Lz4 compression failure.")); return static_cast(compressedSize); } -std::shared_ptr Lz4RawCodec::makeStreamingCompressor() { - VELOX_UNSUPPORTED( - "Streaming compression unsupported with LZ4 raw format. " - "Try using LZ4 frame format instead."); -} - -std::shared_ptr -Lz4RawCodec::makeStreamingDecompressor() { - VELOX_UNSUPPORTED( - "Streaming decompression unsupported with LZ4 raw format. " - "Try using LZ4 frame format instead."); +folly::Expected Lz4RawCodec::decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + auto decompressedSize = LZ4_decompress_safe( + reinterpret_cast(input), + reinterpret_cast(output), + static_cast(inputLength), + static_cast(outputLength)); + VELOX_RETURN_UNEXPECTED_IF( + decompressedSize < 0, Status::IOError("Lz4 decompression failure.")); + return static_cast(decompressedSize); } Lz4HadoopCodec::Lz4HadoopCodec() : Lz4RawCodec(kLz4DefaultCompressionLevel) {} @@ -436,31 +437,35 @@ uint64_t Lz4HadoopCodec::maxCompressedLength(uint64_t inputLength) { return kPrefixLength + Lz4RawCodec::maxCompressedLength(inputLength); } -uint64_t Lz4HadoopCodec::compress( +folly::Expected Lz4HadoopCodec::compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) { - if (outputLength < kPrefixLength) { - VELOX_FAIL("Output buffer too small for Lz4HadoopCodec compression."); - } - - uint64_t compressedSize = Lz4RawCodec::compress( - input, inputLength, output + kPrefixLength, outputLength - kPrefixLength); - - // Prepend decompressed size in bytes and compressed size in bytes - // to be compatible with Hadoop Lz4RawCodec. - const uint32_t decompressedLength = - folly::Endian::big(static_cast(inputLength)); - const uint32_t compressedLength = - folly::Endian::big(static_cast(compressedSize)); - folly::storeUnaligned(output, decompressedLength); - folly::storeUnaligned(output + sizeof(uint32_t), compressedLength); - - return kPrefixLength + compressedSize; -} - -uint64_t Lz4HadoopCodec::decompress( + VELOX_RETURN_UNEXPECTED_IF( + outputLength < kPrefixLength, + Status::IOError( + "Output buffer too small for Lz4HadoopCodec compression.")); + + return Lz4RawCodec::compress( + input, + inputLength, + output + kPrefixLength, + outputLength - kPrefixLength) + .then([&](const auto& compressedSize) { + // Prepend decompressed size in bytes and compressed size in bytes + // to be compatible with Hadoop Lz4RawCodec. + const uint32_t decompressedLength = + folly::Endian::big(static_cast(inputLength)); + const uint32_t compressedLength = + folly::Endian::big(static_cast(compressedSize)); + folly::storeUnaligned(output, decompressedLength); + folly::storeUnaligned(output + sizeof(uint32_t), compressedLength); + return kPrefixLength + compressedSize; + }); +} + +folly::Expected Lz4HadoopCodec::decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -475,19 +480,6 @@ uint64_t Lz4HadoopCodec::decompress( return Lz4RawCodec::decompress(input, inputLength, output, outputLength); } -std::shared_ptr Lz4HadoopCodec::makeStreamingCompressor() { - VELOX_UNSUPPORTED( - "Streaming compression unsupported with LZ4 Hadoop raw format. " - "Try using LZ4 frame format instead."); -} - -std::shared_ptr -Lz4HadoopCodec::makeStreamingDecompressor() { - VELOX_UNSUPPORTED( - "Streaming decompression unsupported with LZ4 Hadoop raw format. " - "Try using LZ4 frame format instead."); -} - int32_t Lz4HadoopCodec::minimumCompressionLevel() const { return kUseDefaultCompressionLevel; } @@ -500,7 +492,7 @@ int32_t Lz4HadoopCodec::defaultCompressionLevel() const { return kUseDefaultCompressionLevel; } -uint64_t Lz4HadoopCodec::decompressInternal( +folly::Expected Lz4HadoopCodec::decompressInternal( const uint8_t* input, uint64_t inputLength, uint8_t* output, diff --git a/velox/common/compression/Lz4Compression.h b/velox/common/compression/Lz4Compression.h index 001a265fe889..7c0328ce28d6 100644 --- a/velox/common/compression/Lz4Compression.h +++ b/velox/common/compression/Lz4Compression.h @@ -60,21 +60,23 @@ class Lz4FrameCodec : public Lz4CodecBase { uint64_t maxCompressedLength(uint64_t inputLength) override; - uint64_t compress( + folly::Expected compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) override; - uint64_t decompress( + folly::Expected decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) override; - std::shared_ptr makeStreamingCompressor() override; + folly::Expected, Status> + makeStreamingCompressor() override; - std::shared_ptr makeStreamingDecompressor() override; + folly::Expected, Status> + makeStreamingDecompressor() override; protected: const LZ4F_preferences_t prefs_; @@ -86,21 +88,17 @@ class Lz4RawCodec : public Lz4CodecBase { uint64_t maxCompressedLength(uint64_t inputLength) override; - uint64_t compress( + folly::Expected compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) override; - uint64_t decompress( + folly::Expected decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) override; - - std::shared_ptr makeStreamingCompressor() override; - - std::shared_ptr makeStreamingDecompressor() override; }; class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { @@ -109,22 +107,18 @@ class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { uint64_t maxCompressedLength(uint64_t inputLength) override; - uint64_t compress( + folly::Expected compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) override; - uint64_t decompress( + folly::Expected decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) override; - std::shared_ptr makeStreamingCompressor() override; - - std::shared_ptr makeStreamingDecompressor() override; - int32_t minimumCompressionLevel() const override; int32_t maximumCompressionLevel() const override; @@ -132,7 +126,7 @@ class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { int32_t defaultCompressionLevel() const override; private: - uint64_t decompressInternal( + folly::Expected decompressInternal( const uint8_t* input, uint64_t inputLength, uint8_t* output, diff --git a/velox/common/compression/tests/CompressionTest.cpp b/velox/common/compression/tests/CompressionTest.cpp index d39be5da20cd..6ec3aa70c239 100644 --- a/velox/common/compression/tests/CompressionTest.cpp +++ b/velox/common/compression/tests/CompressionTest.cpp @@ -23,7 +23,7 @@ #include -#include "velox/common/base/VeloxException.h" +#include "velox/common/base/Exceptions.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/compression/Compression.h" #include "velox/common/compression/Lz4Compression.h" @@ -83,19 +83,21 @@ void checkCodecRoundtrip( std::vector decompressed(data.size()); // Compress with codec c1. - auto compressedSize = c1->compress( + auto maybeCompressedLength = c1->compress( data.data(), data.size(), compressed.data(), maxCompressedLen); - compressed.resize(compressedSize); + ASSERT_FALSE(maybeCompressedLength.hasError()); + compressed.resize(maybeCompressedLength.value()); // Decompress with codec c2. - auto decompressedSize = c2->decompress( + auto maybeDecompressedLength = c2->decompress( compressed.data(), compressed.size(), decompressed.data(), decompressed.size()); + ASSERT_FALSE(maybeDecompressedLength.hasError()); ASSERT_EQ(data, decompressed); - ASSERT_EQ(data.size(), decompressedSize); + ASSERT_EQ(data.size(), maybeDecompressedLength.value()); } // Use same codec for both compression and decompression. @@ -134,8 +136,10 @@ void streamingCompress( uint8_t* output = compressed.data() + compressedSize; // Compress once. - auto compressResult = + auto maybeCompressResult = compressor->compress(input, inputLength, output, outputLength); + ASSERT_FALSE(maybeCompressResult.hasError()); + const auto& compressResult = maybeCompressResult.value(); ASSERT_LE(compressResult.bytesRead, inputLength); ASSERT_LE(compressResult.bytesWritten, outputLength); @@ -151,33 +155,43 @@ void streamingCompress( // Once every two iterations, do a flush. if (doFlush) { - StreamingCompressor::FlushResult flushResult; + bool outputTooSmall = false; do { outputLength = compressed.size() - compressedSize; output = compressed.data() + compressedSize; - flushResult = compressor->flush(output, outputLength); + auto maybeFlushResult = compressor->flush(output, outputLength); + ASSERT_FALSE(maybeFlushResult.hasError()); + + const auto& flushResult = maybeFlushResult.value(); ASSERT_LE(flushResult.bytesWritten, outputLength); compressedSize += flushResult.bytesWritten; - if (flushResult.outputTooSmall) { + + outputTooSmall = flushResult.outputTooSmall; + if (outputTooSmall) { compressed.resize(compressed.capacity() * 2); } - } while (flushResult.outputTooSmall); + } while (outputTooSmall); } doFlush = !doFlush; } // End the compressed stream. - StreamingCompressor::EndResult endResult; + bool outputTooSmall = false; do { int64_t outputLength = compressed.size() - compressedSize; uint8_t* output = compressed.data() + compressedSize; - endResult = compressor->end(output, outputLength); + auto maybeEndResult = compressor->end(output, outputLength); + ASSERT_FALSE(maybeEndResult.hasError()); + + const auto& endResult = std::move(maybeEndResult.value()); ASSERT_LE(endResult.bytesWritten, outputLength); compressedSize += endResult.bytesWritten; - if (endResult.outputTooSmall) { + + outputTooSmall = endResult.outputTooSmall; + if (outputTooSmall) { compressed.resize(compressed.capacity() * 2); } - } while (endResult.outputTooSmall); + } while (outputTooSmall); compressed.resize(compressedSize); } @@ -201,8 +215,11 @@ void streamingDecompress( uint8_t* output = decompressed.data() + decompressedSize; // Decompress once. - auto result = + auto maybeDecompressResult = decompressor->decompress(input, inputLength, output, outputLength); + ASSERT_FALSE(maybeDecompressResult.hasError()); + + const auto& result = maybeDecompressResult.value(); ASSERT_LE(result.bytesRead, inputLength); ASSERT_LE(result.bytesWritten, outputLength); ASSERT_TRUE( @@ -225,11 +242,24 @@ void streamingDecompress( decompressed.resize(decompressedSize); } +std::shared_ptr makeStreamingCompressor(Codec* codec) { + auto maybeCompressor = codec->makeStreamingCompressor(); + VELOX_CHECK(maybeCompressor.hasValue()); + return maybeCompressor.value(); +} + +std::shared_ptr makeStreamingDecompressor(Codec* codec) { + auto maybeDecompressor = codec->makeStreamingDecompressor(); + VELOX_CHECK(maybeDecompressor.hasValue()); + return maybeDecompressor.value(); +} + // Check the streaming compressor against one-shot decompression. void checkStreamingCompressor(Codec* codec, const std::vector& data) { // Run streaming compression. std::vector compressed; - streamingCompress(codec->makeStreamingCompressor(), data, compressed); + const auto& compressor = makeStreamingCompressor(codec); + streamingCompress(compressor, data, compressed); // Check decompressing the compressed data. std::vector decompressed(data.size()); @@ -248,14 +278,15 @@ void checkStreamingDecompressor( // Create compressed data. auto maxCompressedLen = codec->maxCompressedLength(data.size()); std::vector compressed(maxCompressedLen); - auto compressedSize = codec->compress( + auto maybeCompressedLength = codec->compress( data.data(), data.size(), compressed.data(), maxCompressedLen); - compressed.resize(compressedSize); + ASSERT_FALSE(maybeCompressedLength.hasError()); + compressed.resize(maybeCompressedLength.value()); // Run streaming decompression. std::vector decompressed; - streamingDecompress( - codec->makeStreamingDecompressor(), compressed, decompressed); + const auto& decompressor = makeStreamingDecompressor(codec); + streamingDecompress(decompressor, compressed, decompressed); // Check the decompressed data. ASSERT_EQ(data.size(), decompressed.size()); @@ -276,9 +307,7 @@ void checkStreamingRoundtrip( void checkStreamingRoundtrip(Codec* codec, const std::vector& data) { checkStreamingRoundtrip( - codec->makeStreamingCompressor(), - codec->makeStreamingDecompressor(), - data); + makeStreamingCompressor(codec), makeStreamingDecompressor(codec), data); } } // namespace @@ -362,8 +391,10 @@ TEST_P(CodecTest, getUncompressedLength) { auto inputLength = 100; auto input = makeRandomData(inputLength); std::vector compressed(codec->maxCompressedLength(input.size())); - auto compressedLength = codec->compress( + auto maybeCompressedLength = codec->compress( input.data(), inputLength, compressed.data(), compressed.size()); + ASSERT_FALSE(maybeCompressedLength.hasError()); + auto compressedLength = maybeCompressedLength.value(); compressed.resize(compressedLength); if (Codec::supportsGetUncompressedLength(getCompressionKind())) { @@ -427,15 +458,15 @@ TEST_P(CodecTest, streamingDecompressorReuse) { } auto codec = makeCodec(); - auto decompressor = codec->makeStreamingDecompressor(); + const auto& decompressor = makeStreamingDecompressor(codec.get()); checkStreamingRoundtrip( - codec->makeStreamingCompressor(), decompressor, makeRandomData(100)); + makeStreamingCompressor(codec.get()), decompressor, makeRandomData(100)); // StreamingDecompressor::reset() should allow reusing decompressor for a // new stream. - decompressor->reset(); + ASSERT_TRUE(decompressor->reset().ok()); checkStreamingRoundtrip( - codec->makeStreamingCompressor(), decompressor, makeRandomData(200)); + makeStreamingCompressor(codec.get()), decompressor, makeRandomData(200)); } INSTANTIATE_TEST_SUITE_P(