Skip to content

Commit

Permalink
[fix](decompressor) impl ZstdDecompressor (#40315)
Browse files Browse the repository at this point in the history
Impl ZstdDecompressor and support read hive text table which is
compressed by zstd.
  • Loading branch information
suxiaogang223 committed Oct 10, 2024
1 parent d941524 commit fcd45fe
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 3 deletions.
50 changes: 50 additions & 0 deletions be/src/exec/decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ Status Decompressor::create_decompressor(CompressType type,
case CompressType::BZIP2:
decompressor->reset(new Bzip2Decompressor());
break;
case CompressType::ZSTD:
decompressor->reset(new ZstdDecompressor());
break;
case CompressType::LZ4FRAME:
decompressor->reset(new Lz4FrameDecompressor());
break;
Expand Down Expand Up @@ -86,6 +89,9 @@ Status Decompressor::create_decompressor(TFileCompressType::type type,
case TFileCompressType::BZ2:
compress_type = CompressType::BZIP2;
break;
case TFileCompressType::ZSTD:
compress_type = CompressType::ZSTD;
break;
case TFileCompressType::LZ4FRAME:
compress_type = CompressType::LZ4FRAME;
break;
Expand Down Expand Up @@ -300,6 +306,50 @@ std::string Bzip2Decompressor::debug_info() {
return ss.str();
}

ZstdDecompressor::~ZstdDecompressor() {
ZSTD_freeDStream(_zstd_strm);
}

Status ZstdDecompressor::init() {
_zstd_strm = ZSTD_createDStream();
if (!_zstd_strm) {
std::stringstream ss;
return Status::InternalError("ZSTD_dctx creation error");
}
auto ret = ZSTD_initDStream(_zstd_strm);
if (ZSTD_isError(ret)) {
return Status::InternalError("ZSTD_initDStream error: {}", ZSTD_getErrorName(ret));
}
return Status::OK();
}

Status ZstdDecompressor::decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read,
uint8_t* output, size_t output_max_len,
size_t* decompressed_len, bool* stream_end,
size_t* more_input_bytes, size_t* more_output_bytes) {
// 1. set input and output
ZSTD_inBuffer inputBuffer = {input, input_len, 0};
ZSTD_outBuffer outputBuffer = {output, output_max_len, 0};

// decompress
int ret = ZSTD_decompressStream(_zstd_strm, &outputBuffer, &inputBuffer);
*input_bytes_read = inputBuffer.pos;
*decompressed_len = outputBuffer.pos;

if (ZSTD_isError(ret)) {
return Status::InternalError("Failed to zstd decompress: {}", ZSTD_getErrorName(ret));
}

*stream_end = ret == 0;
return Status::OK();
}

std::string ZstdDecompressor::debug_info() {
std::stringstream ss;
ss << "ZstdDecompressor.";
return ss.str();
}

// Lz4Frame
// Lz4 version: 1.7.5
// define LZ4F_VERSION = 100
Expand Down
32 changes: 31 additions & 1 deletion be/src/exec/decompressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <stddef.h>
#include <stdint.h>
#include <zlib.h>
#include <zstd.h>

#include <memory>
#include <string>
Expand All @@ -34,7 +35,17 @@

namespace doris {

enum CompressType { UNCOMPRESSED, GZIP, DEFLATE, BZIP2, LZ4FRAME, LZOP, LZ4BLOCK, SNAPPYBLOCK };
enum CompressType {
UNCOMPRESSED,
GZIP,
DEFLATE,
BZIP2,
ZSTD,
LZ4FRAME,
LZOP,
LZ4BLOCK,
SNAPPYBLOCK
};

class Decompressor {
public:
Expand Down Expand Up @@ -126,6 +137,25 @@ class Bzip2Decompressor : public Decompressor {
bz_stream _bz_strm;
};

class ZstdDecompressor : public Decompressor {
public:
~ZstdDecompressor() override;

Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output,
size_t output_max_len, size_t* decompressed_len, bool* stream_end,
size_t* more_input_bytes, size_t* more_output_bytes) override;

std::string debug_info() override;

private:
friend class Decompressor;
ZstdDecompressor() : Decompressor(CompressType::ZSTD) {}
Status init() override;

private:
ZSTD_DStream* _zstd_strm {nullptr};
};

class Lz4FrameDecompressor : public Decompressor {
public:
~Lz4FrameDecompressor() override;
Expand Down
Loading

0 comments on commit fcd45fe

Please sign in to comment.