Skip to content

Commit

Permalink
[memtracker](accuracy) should not account resuable buffer to query me…
Browse files Browse the repository at this point in the history
…mtracker (#33933)


Co-authored-by: yiguolei <[email protected]>
  • Loading branch information
yiguolei and Doris-Extras authored Apr 21, 2024
1 parent 2668755 commit af589c0
Showing 1 changed file with 65 additions and 53 deletions.
118 changes: 65 additions & 53 deletions be/src/util/block_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,20 @@ class Lz4BlockCompression : public BlockCompressionCodec {
ENABLE_FACTORY_CREATOR(Context);

public:
Context() : ctx(nullptr) {}
Context() : ctx(nullptr) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
buffer = std::make_unique<faststring>();
}
LZ4_stream_t* ctx;
faststring buffer;
std::unique_ptr<faststring> buffer;
~Context() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
if (ctx) {
LZ4_freeStream(ctx);
}
buffer.reset();
}
};

Expand All @@ -118,8 +125,6 @@ class Lz4BlockCompression : public BlockCompressionCodec {
}

Status compress(const Slice& input, faststring* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
if (input.size > INT_MAX) {
return Status::InvalidArgument(
"LZ4 not support those case(input.size>INT_MAX), maybe you should change "
Expand All @@ -144,8 +149,14 @@ class Lz4BlockCompression : public BlockCompressionCodec {
compressed_buf.size = max_len;
} else {
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer.resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(context->buffer.data());
{
// context->buffer is resuable between queries, should accouting to
// global tracker.
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
context->buffer->resize(max_len);
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}

Expand All @@ -165,8 +176,6 @@ class Lz4BlockCompression : public BlockCompressionCodec {
}

Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
auto decompressed_len =
LZ4_decompress_safe(input.data, output->data, input.size, output->size);
if (decompressed_len < 0) {
Expand Down Expand Up @@ -218,8 +227,6 @@ class HadoopLz4BlockCompression : public Lz4BlockCompression {
return &s_instance;
}
Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
RETURN_IF_ERROR(Decompressor::create_decompressor(CompressType::LZ4BLOCK, &_decompressor));
size_t input_bytes_read = 0;
size_t decompressed_len = 0;
Expand All @@ -245,13 +252,20 @@ class Lz4fBlockCompression : public BlockCompressionCodec {
ENABLE_FACTORY_CREATOR(CContext);

public:
CContext() : ctx(nullptr) {}
CContext() : ctx(nullptr) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
buffer = std::make_unique<faststring>();
}
LZ4F_compressionContext_t ctx;
faststring buffer;
std::unique_ptr<faststring> buffer;
~CContext() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
if (ctx) {
LZ4F_freeCompressionContext(ctx);
}
buffer.reset();
}
};
class DContext {
Expand Down Expand Up @@ -301,8 +315,6 @@ class Lz4fBlockCompression : public BlockCompressionCodec {
private:
Status _compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
faststring* output) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
std::unique_ptr<CContext> context;
RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
Expand All @@ -319,9 +331,13 @@ class Lz4fBlockCompression : public BlockCompressionCodec {
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer.resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(context->buffer.data());
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer->resize(max_len);
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}

Expand Down Expand Up @@ -361,8 +377,6 @@ class Lz4fBlockCompression : public BlockCompressionCodec {
}

Status _decompress(const Slice& input, Slice* output) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
bool decompress_failed = false;
std::unique_ptr<DContext> context;
RETURN_IF_ERROR(_acquire_decompression_ctx(context));
Expand Down Expand Up @@ -472,13 +486,20 @@ class Lz4HCBlockCompression : public BlockCompressionCodec {
ENABLE_FACTORY_CREATOR(Context);

public:
Context() : ctx(nullptr) {}
Context() : ctx(nullptr) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
buffer = std::make_unique<faststring>();
}
LZ4_streamHC_t* ctx;
faststring buffer;
std::unique_ptr<faststring> buffer;
~Context() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
if (ctx) {
LZ4_freeStreamHC(ctx);
}
buffer.reset();
}
};

Expand All @@ -494,8 +515,6 @@ class Lz4HCBlockCompression : public BlockCompressionCodec {
}

Status compress(const Slice& input, faststring* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
std::unique_ptr<Context> context;
RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
Expand All @@ -512,9 +531,13 @@ class Lz4HCBlockCompression : public BlockCompressionCodec {
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer.resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(context->buffer.data());
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer->resize(max_len);
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}

Expand All @@ -533,8 +556,6 @@ class Lz4HCBlockCompression : public BlockCompressionCodec {
}

Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
auto decompressed_len =
LZ4_decompress_safe(input.data, output->data, input.size, output->size);
if (decompressed_len < 0) {
Expand Down Expand Up @@ -654,8 +675,6 @@ class SnappyBlockCompression : public BlockCompressionCodec {
~SnappyBlockCompression() override {}

Status compress(const Slice& input, faststring* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
size_t max_len = max_compressed_len(input.size);
output->resize(max_len);
Slice s(*output);
Expand All @@ -666,8 +685,6 @@ class SnappyBlockCompression : public BlockCompressionCodec {
}

Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
if (!snappy::RawUncompress(input.data, input.size, output->data)) {
return Status::InvalidArgument("Fail to do Snappy decompress");
}
Expand Down Expand Up @@ -699,8 +716,6 @@ class ZlibBlockCompression : public BlockCompressionCodec {
~ZlibBlockCompression() {}

Status compress(const Slice& input, faststring* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
size_t max_len = max_compressed_len(input.size);
output->resize(max_len);
Slice s(*output);
Expand All @@ -715,8 +730,6 @@ class ZlibBlockCompression : public BlockCompressionCodec {

Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
faststring* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
size_t max_len = max_compressed_len(uncompressed_size);
output->resize(max_len);

Expand Down Expand Up @@ -757,8 +770,6 @@ class ZlibBlockCompression : public BlockCompressionCodec {
}

Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
size_t input_size = input.size;
auto zres =
::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size);
Expand All @@ -781,13 +792,20 @@ class ZstdBlockCompression : public BlockCompressionCodec {
ENABLE_FACTORY_CREATOR(CContext);

public:
CContext() : ctx(nullptr) {}
CContext() : ctx(nullptr) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
buffer = std::make_unique<faststring>();
}
ZSTD_CCtx* ctx;
faststring buffer;
std::unique_ptr<faststring> buffer;
~CContext() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
if (ctx) {
ZSTD_freeCCtx(ctx);
}
buffer.reset();
}
};
class DContext {
Expand Down Expand Up @@ -826,8 +844,6 @@ class ZstdBlockCompression : public BlockCompressionCodec {
// https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
faststring* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
std::unique_ptr<CContext> context;
RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
Expand All @@ -845,9 +861,13 @@ class ZstdBlockCompression : public BlockCompressionCodec {
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer.resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(context->buffer.data());
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer->resize(max_len);
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}

Expand Down Expand Up @@ -904,8 +924,6 @@ class ZstdBlockCompression : public BlockCompressionCodec {
}

Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
std::unique_ptr<DContext> context;
bool decompress_failed = false;
RETURN_IF_ERROR(_acquire_decompression_ctx(context));
Expand Down Expand Up @@ -1001,8 +1019,6 @@ class GzipBlockCompression : public ZlibBlockCompression {
~GzipBlockCompression() override = default;

Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
z_stream z_strm = {};
z_strm.zalloc = Z_NULL;
z_strm.zfree = Z_NULL;
Expand Down Expand Up @@ -1084,8 +1100,6 @@ class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression {
~GzipBlockCompressionByLibdeflate() override = default;

Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
if (input.empty()) {
output->size = 0;
return Status::OK();
Expand Down Expand Up @@ -1118,8 +1132,6 @@ class LzoBlockCompression final : public BlockCompressionCodec {
}
size_t max_compressed_len(size_t len) override { return 0; };
Status decompress(const Slice& input, Slice* output) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
auto* input_ptr = input.data;
auto remain_input_size = input.size;
auto* output_ptr = output->data;
Expand Down

0 comments on commit af589c0

Please sign in to comment.