Skip to content

Commit

Permalink
add compression v2 API and lz4_frame/lz4_raw/lz4_hadoop codec
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Apr 19, 2024
1 parent f8fde93 commit 59519ff
Show file tree
Hide file tree
Showing 10 changed files with 1,561 additions and 22 deletions.
27 changes: 11 additions & 16 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -415,23 +415,18 @@ endif()
set_source(fmt)
resolve_dependency(fmt 9.0.0)

if(${VELOX_BUILD_MINIMAL_WITH_DWIO} OR ${VELOX_ENABLE_HIVE_CONNECTOR})
# DWIO needs all sorts of stream compression libraries.
#
# TODO: make these optional and pluggable.
find_package(ZLIB REQUIRED)
find_package(lz4 REQUIRED)
find_package(lzo2 REQUIRED)
find_package(zstd REQUIRED)
find_package(Snappy REQUIRED)
if(NOT TARGET zstd::zstd)
if(TARGET zstd::libzstd_static)
set(ZSTD_TYPE static)
else()
set(ZSTD_TYPE shared)
endif()
add_library(zstd::zstd ALIAS zstd::libzstd_${ZSTD_TYPE})
find_package(ZLIB REQUIRED)
find_package(lz4 REQUIRED)
find_package(lzo2 REQUIRED)
find_package(zstd REQUIRED)
find_package(Snappy REQUIRED)
if(NOT TARGET zstd::zstd)
if(TARGET zstd::libzstd_static)
set(ZSTD_TYPE static)
else()
set(ZSTD_TYPE shared)
endif()
add_library(zstd::zstd ALIAS zstd::libzstd_${ZSTD_TYPE})
endif()

set_source(re2)
Expand Down
6 changes: 4 additions & 2 deletions velox/common/compression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()

add_library(velox_common_compression Compression.cpp LzoDecompressor.cpp)
add_library(
velox_common_compression Compression.cpp LzoDecompressor.cpp
Lz4Compression.cpp HadoopCompressionFormat.cpp)
target_link_libraries(
velox_common_compression
PUBLIC Folly::folly
PUBLIC Folly::folly lz4::lz4
PRIVATE velox_exception)
121 changes: 121 additions & 0 deletions velox/common/compression/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "velox/common/compression/Compression.h"
#include "velox/common/base/Exceptions.h"
#include "velox/common/compression/Lz4Compression.h"

#include <folly/Conv.h>

Expand Down Expand Up @@ -98,4 +99,124 @@ CompressionKind stringToCompressionKind(const std::string& kind) {
VELOX_UNSUPPORTED("Not support compression kind {}", kind);
}
}

void Codec::init() {}

bool Codec::supportsGetUncompressedLength(CompressionKind kind) {
switch (kind) {
default:
return false;
}
}

bool Codec::supportsStreamingCompression(CompressionKind kind) {
switch (kind) {
case CompressionKind::CompressionKind_LZ4:
return true;
default:
return false;
}
}

bool Codec::supportsCompressFixedLength(CompressionKind kind) {
switch (kind) {
default:
return false;
}
}

int32_t Codec::maximumCompressionLevel(CompressionKind kind) {
auto codec = Codec::create(kind);
return codec->maximumCompressionLevel();
}

int32_t Codec::minimumCompressionLevel(CompressionKind kind) {
auto codec = Codec::create(kind);
return codec->minimumCompressionLevel();
}

int32_t Codec::defaultCompressionLevel(CompressionKind kind) {
auto codec = Codec::create(kind);
return codec->defaultCompressionLevel();
}

std::unique_ptr<Codec> Codec::create(
CompressionKind kind,
const CodecOptions& codecOptions) {
if (!isAvailable(kind)) {
auto name = compressionKindToString(kind);
if (folly::StringPiece({name}).startsWith("unknown")) {
VELOX_UNSUPPORTED("Unrecognized codec '{}'", name);
}
VELOX_UNSUPPORTED("Support for codec '{}' not implemented.", name);
}

auto compressionLevel = codecOptions.compressionLevel;
std::unique_ptr<Codec> codec;
switch (kind) {
case CompressionKind::CompressionKind_LZ4:
if (auto options = dynamic_cast<const Lz4CodecOptions*>(&codecOptions)) {
switch (options->type) {
case Lz4CodecOptions::kLz4Frame:
codec = makeLz4FrameCodec(compressionLevel);
break;
case Lz4CodecOptions::kLz4Raw:
codec = makeLz4RawCodec(compressionLevel);
break;
case Lz4CodecOptions::kLz4Hadoop:
codec = makeLz4HadoopCodec();
break;
}
}
// By default, create LZ4 Frame codec.
codec = makeLz4FrameCodec(compressionLevel);
break;
default:
break;
}

if (codec == nullptr) {
VELOX_UNSUPPORTED(
"{} codec not implemented", compressionKindToString(kind));
}

codec->init();

return codec;
}

std::unique_ptr<Codec> Codec::create(
CompressionKind kind,
int32_t compressionLevel) {
return create(kind, CodecOptions{compressionLevel});
}

bool Codec::isAvailable(CompressionKind kind) {
switch (kind) {
case CompressionKind::CompressionKind_NONE:
case CompressionKind::CompressionKind_LZ4:
return true;
case CompressionKind::CompressionKind_SNAPPY:
case CompressionKind::CompressionKind_GZIP:
case CompressionKind::CompressionKind_ZLIB:
case CompressionKind::CompressionKind_ZSTD:
case CompressionKind::CompressionKind_LZO:
default:
return false;
}
}

std::optional<uint64_t> Codec::getUncompressedLength(
uint64_t inputLength,
const uint8_t* input) const {
return std::nullopt;
}

uint64_t Codec::compressFixedLength(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
uint64_t outputLength) {
VELOX_UNSUPPORTED("'{}' doesn't support fixed-length compression", name());
}
} // namespace facebook::velox::common
208 changes: 208 additions & 0 deletions velox/common/compression/Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,214 @@ CompressionKind stringToCompressionKind(const std::string& kind);

constexpr uint64_t DEFAULT_COMPRESSION_BLOCK_SIZE = 256 * 1024;

static constexpr int32_t kUseDefaultCompressionLevel =
std::numeric_limits<int32_t>::min();

class StreamingCompressor {
public:
virtual ~StreamingCompressor() = default;

struct CompressResult {
uint64_t bytesRead;
uint64_t bytesWritten;
bool outputTooSmall;
};

struct FlushResult {
uint64_t bytesWritten;
bool outputTooSmall;
};

struct EndResult {
uint64_t bytesWritten;
bool outputTooSmall;
};

/// Compress some input.
/// If CompressResult.outputTooSmall is true on return, then a larger output
/// buffer should be supplied.
virtual CompressResult compress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
uint64_t outputLength) = 0;

/// Flush part of the compressed output.
/// If FlushResult.outputTooSmall is true on return, flush() should be called
/// again with a larger buffer.
virtual FlushResult flush(uint8_t* output, uint64_t outputLength) = 0;

/// End compressing, doing whatever is necessary to end the stream.
/// If EndResult.outputTooSmall is true on return, end() should be called
/// again with a larger buffer. Otherwise, the StreamingCompressor should not
/// be used anymore. end() will flush the compressed output.
virtual EndResult end(uint8_t* output, uint64_t outputLength) = 0;
};

class StreamingDecompressor {
public:
virtual ~StreamingDecompressor() = default;

struct DecompressResult {
uint64_t bytesRead;
uint64_t bytesWritten;
bool outputTooSmall;
};

/// Decompress some input.
/// If outputTooSmall is true on return, a larger output buffer needs
/// to be supplied.
virtual DecompressResult decompress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
uint64_t outputLength) = 0;

/// Return whether the compressed stream is finished.
virtual bool isFinished() = 0;

/// Reinitialize decompressor, making it ready for a new compressed stream.
virtual void reset() = 0;
};

struct CodecOptions {
int32_t compressionLevel;

CodecOptions(int32_t compressionLevel = kUseDefaultCompressionLevel)
: compressionLevel(compressionLevel) {}

virtual ~CodecOptions() = default;
};

class Codec {
public:
virtual ~Codec() = default;

/// Create a kind for the given compression algorithm with CodecOptions.
static std::unique_ptr<Codec> create(
CompressionKind kind,
const CodecOptions& codecOptions = CodecOptions{});

/// Create a kind for the given compression algorithm.
static std::unique_ptr<Codec> create(
CompressionKind kind,
int32_t compressionLevel);

/// Return true if support for indicated kind has been enabled.
static bool isAvailable(CompressionKind kind);

/// Return true if indicated kind supports extracting uncompressed length
/// from compressed data.
static bool supportsGetUncompressedLength(CompressionKind kind);

/// Return true if indicated kind supports one-shot compression with fixed
/// compressed length.
static bool supportsCompressFixedLength(CompressionKind kind);

/// Return true if indicated kind supports creating streaming de/compressor.
static bool supportsStreamingCompression(CompressionKind kind);

/// Return the smallest supported compression level for the kind.
/// Note: This function creates a temporary Codec instance.
static int32_t minimumCompressionLevel(CompressionKind kind);

/// Return the largest supported compression level for the kind
/// Note: This function creates a temporary Codec instance.
static int32_t maximumCompressionLevel(CompressionKind kind);

/// Return the default compression level.
/// Note: This function creates a temporary Codec instance.
static int32_t defaultCompressionLevel(CompressionKind kind);

/// Return the smallest supported compression level.
/// If the codec doesn't support compression level,
/// `kUseDefaultCompressionLevel` will be returned.
virtual int32_t minimumCompressionLevel() const = 0;

/// Return the largest supported compression level.
/// If the codec doesn't support compression level,
/// `kUseDefaultCompressionLevel` will be returned.
virtual int32_t maximumCompressionLevel() const = 0;

/// Return the default compression level.
/// If the codec doesn't support compression level,
/// `kUseDefaultCompressionLevel` will be returned.
virtual int32_t defaultCompressionLevel() const = 0;

/// One-shot decompression function.
/// `outputLength` must be correct and therefore be obtained in advance.
/// The actual decompressed length is returned.
/// Note: One-shot decompression is not always compatible with streaming
/// compression. Depending on the codec (e.g. LZ4), different formats may
/// be used.
virtual uint64_t decompress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
uint64_t outputLength) = 0;

/// Performs one-shot compression.
/// `outputLength` must first have been computed using maxCompressedLength().
/// The actual compressed length is returned.
/// Note: One-shot compression is not always compatible with streaming
/// decompression. Depending on the codec (e.g. LZ4), different formats may
/// be used.
virtual uint64_t compress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
uint64_t outputLength) = 0;

/// Performs one-shot compression.
/// This function compresses data and writes the output up to the specified
/// outputLength. If outputLength is too small to hold all the compressed
/// data, the function doesn't fail. Instead, it returns the number of bytes
/// actually written to the output buffer. Any remaining data that couldn't
/// be written in this call will be written in subsequent calls to this
/// function. This is useful when fixed-length compression blocks are required
/// by the caller.
/// Note: Only Gzip and Zstd codec supports this function.
virtual uint64_t compressFixedLength(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
uint64_t outputLength);

/// Maximum compressed length of given input length.
virtual uint64_t maxCompressedLength(uint64_t inputLength) = 0;

/// Retrieves the actual uncompressed length of data using the specified
/// compression library.
/// Note: This functionality is not universally supported by all compression
/// libraries. If not supported, `std::nullopt` will be returned.
std::optional<uint64_t> getUncompressedLength(
uint64_t inputLength,
const uint8_t* input) const;

/// Create a streaming compressor instance.
virtual std::shared_ptr<StreamingCompressor> makeStreamingCompressor() = 0;

/// Create a streaming compressor instance.
virtual std::shared_ptr<StreamingDecompressor>
makeStreamingDecompressor() = 0;

/// This Codec's compression type.
virtual CompressionKind compressionKind() const = 0;

/// The name of this Codec's compression type.
std::string name() const {
return compressionKindToString(compressionKind());
}

/// This Codec's compression level, if applicable.
virtual int32_t compressionLevel() const {
return kUseDefaultCompressionLevel;
}

private:
/// Initializes the codec's resources.
virtual void init();
};
} // namespace facebook::velox::common

template <>
Expand Down
Loading

0 comments on commit 59519ff

Please sign in to comment.