Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhance](S3) Use allocator for s3 buffer's allocation #38519

Draft
wants to merge 1 commit into
base: branch-2.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1052,10 +1052,6 @@ DEFINE_mInt32(tablet_path_check_batch_size, "1000");
DEFINE_mInt64(row_column_page_size, "4096");
// it must be larger than or equal to 5MB
DEFINE_mInt32(s3_write_buffer_size, "5242880");
// the size of the whole s3 buffer pool, which indicates the s3 file writer
// can at most buffer 50MB data. And the num of multi part upload task is
// s3_write_buffer_whole_size / s3_write_buffer_size
DEFINE_mInt32(s3_write_buffer_whole_size, "524288000");
DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");

//disable shrink memory by default
Expand Down Expand Up @@ -1147,8 +1143,6 @@ DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000");

DEFINE_Bool(enable_snapshot_action, "false");

DEFINE_mInt32(s3_writer_buffer_allocation_timeout_second, "60");

DEFINE_mBool(enable_column_type_check, "true");

// Tolerance for the number of partition id 0 in rowset, default 0
Expand Down
7 changes: 0 additions & 7 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1082,10 +1082,6 @@ DECLARE_mInt32(tablet_path_check_batch_size);
DECLARE_mInt64(row_column_page_size);
// it must be larger than or equal to 5MB
DECLARE_mInt32(s3_write_buffer_size);
// the size of the whole s3 buffer pool, which indicates the s3 file writer
// can at most buffer 50MB data. And the num of multi part upload task is
// s3_write_buffer_whole_size / s3_write_buffer_size
DECLARE_mInt32(s3_write_buffer_whole_size);
// the max number of cached file handle for block segemnt
DECLARE_mInt64(file_cache_max_file_reader_cache_size);
//enable shrink memory
Expand Down Expand Up @@ -1197,9 +1193,6 @@ DECLARE_mInt32(buffered_reader_read_timeout_ms);
// whether to enable /api/snapshot api
DECLARE_Bool(enable_snapshot_action);

// The timeout config for S3 write buffer allocation
DECLARE_mInt32(s3_writer_buffer_allocation_timeout_second);

DECLARE_mBool(enable_column_type_check);

// Tolerance for the number of partition id 0 in rowset, default 0
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/benchmark/fs_benchmark_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ int main(int argc, char** argv) {
.set_max_threads(num_cores)
.build(&buffered_reader_prefetch_thread_pool);
doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance();
s3_buffer_pool->init(524288000, 5242880, buffered_reader_prefetch_thread_pool.get());
s3_buffer_pool->init(buffered_reader_prefetch_thread_pool.get());

try {
doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation, std::stoi(FLAGS_threads),
Expand Down
103 changes: 48 additions & 55 deletions be/src/io/fs/s3_file_write_bufferpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "s3_file_write_bufferpool.h"

#include <boost/core/noncopyable.hpp>
#include <chrono>
#include <cstring>

Expand All @@ -29,13 +30,48 @@

namespace doris {
namespace io {

bvar::Adder<uint64_t> s3_file_buffer_allocated("s3_file_buffer_allocated");

template <typename Allocator = Allocator<false, false, false>>
struct Memory : boost::noncopyable, Allocator {
Memory() = default;
explicit Memory(size_t size) : _size(size) {
alloc(size);
s3_file_buffer_allocated << 1;
}
~Memory() {
dealloc();
s3_file_buffer_allocated << -1;
}
void alloc(size_t size) { _data = static_cast<char*>(Allocator::alloc(size, 0)); }
void dealloc() {
if (_data == nullptr) {
return;
}
Allocator::free(_data, _size);
_data = nullptr;
}
size_t _size;
char* _data;
};

struct S3FileBuffer::PartData {
Memory<> _memory;
PartData() : _memory(config::s3_write_buffer_size) {}
ByteYue marked this conversation as resolved.
Show resolved Hide resolved
~PartData() = default;
};

S3FileBuffer::S3FileBuffer(ThreadPool* pool)
: _inner_data(std::make_unique<S3FileBuffer::PartData>()), _thread_pool(pool) {}

S3FileBuffer::~S3FileBuffer() = default;

void S3FileBuffer::on_finished() {
if (_buf.empty()) {
if (nullptr == _inner_data) {
return;
}
reset();
S3FileBufferPool::GetInstance()->reclaim(_buf);
_buf.clear();
}

// when there is memory preserved, directly write data to buf
Expand All @@ -45,74 +81,31 @@ Status S3FileBuffer::append_data(const Slice& data) {
Defer defer {[&] { _size += data.get_size(); }};
while (true) {
// if buf is not empty, it means there is memory preserved for this buf
if (!_buf.empty()) {
memcpy(_buf.data + _size, data.get_data(), data.get_size());
if (_inner_data != nullptr) {
memcpy(_inner_data->_memory._data + _size, data.get_data(), data.get_size());
break;
} else {
// wait allocate buffer pool
auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
if (tmp->get_size() == 0) {
return Status::InternalError("Failed to allocate s3 writer buffer for {} seconds",
config::s3_writer_buffer_allocation_timeout_second);
}
rob_buffer(tmp);
return Status::InternalError("Failed to allocate s3 writer buffer");
}
}
return Status::OK();
}

void S3FileBuffer::submit() {
if (LIKELY(!_buf.empty())) {
_stream_ptr = std::make_shared<StringViewStream>(_buf.data, _size);
if (LIKELY(nullptr != _inner_data)) {
_stream_ptr = std::make_shared<StringViewStream>(_inner_data->_memory._data, _size);
}

_thread_pool->submit_func([buf = this->shared_from_this()]() { buf->_on_upload(); });
}

void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size,
doris::ThreadPool* thread_pool) {
// the nums could be one configuration
size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size;
DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&
(s3_write_buffer_whole_size > s3_write_buffer_size));
LOG_INFO("S3 file buffer pool with {} buffers", buf_num);
_whole_mem_buffer = std::make_unique<char[]>(s3_write_buffer_whole_size);
for (size_t i = 0; i < buf_num; i++) {
Slice s {_whole_mem_buffer.get() + i * s3_write_buffer_size,
static_cast<size_t>(s3_write_buffer_size)};
_free_raw_buffers.emplace_back(s);
}
void S3FileBufferPool::init(doris::ThreadPool* thread_pool) {
_thread_pool = thread_pool;
}

std::shared_ptr<S3FileBuffer> S3FileBufferPool::allocate(bool reserve) {
std::shared_ptr<S3FileBuffer> buf = std::make_shared<S3FileBuffer>(_thread_pool);
int64_t timeout = config::s3_writer_buffer_allocation_timeout_second;
// if need reserve then we must ensure return buf with memory preserved
if (reserve) {
{
std::unique_lock<std::mutex> lck {_lock};
_cv.wait_for(lck, std::chrono::seconds(timeout),
[this]() { return !_free_raw_buffers.empty(); });
if (!_free_raw_buffers.empty()) {
buf->reserve_buffer(_free_raw_buffers.front());
_free_raw_buffers.pop_front();
}
}
return buf;
}
// try to get one memory reserved buffer
{
std::unique_lock<std::mutex> lck {_lock};
if (!_free_raw_buffers.empty()) {
buf->reserve_buffer(_free_raw_buffers.front());
_free_raw_buffers.pop_front();
}
}
// if there is no free buffer and no need to reserve memory, we could return one empty buffer
// if the buf has no memory reserved, it would try to write the data to file cache first
// or it would try to rob buffer from other S3FileBuffer
return buf;
Status S3FileBufferPool::allocate(std::shared_ptr<S3FileBuffer>* buf) {
RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<S3FileBuffer>(_thread_pool));
return Status::OK();
}
} // namespace io
} // namespace doris
31 changes: 6 additions & 25 deletions be/src/io/fs/s3_file_write_bufferpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,8 @@ namespace io {
struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
using Callback = std::function<void()>;

S3FileBuffer(ThreadPool* pool) { _thread_pool = pool; }
~S3FileBuffer() = default;

void rob_buffer(std::shared_ptr<S3FileBuffer>& other) {
_buf = other->_buf;
// we should clear other's memory buffer in case it woule be reclaimed twice
// when calling on_finished
other->_buf.clear();
}

void reserve_buffer(Slice s) { _buf = s; }
S3FileBuffer(ThreadPool* pool);
~S3FileBuffer();

// append data into the memory buffer inside
// or into the file cache if the buffer has no memory buffer
Expand Down Expand Up @@ -109,7 +100,8 @@ struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
size_t _size {0};
std::shared_ptr<std::iostream> _stream_ptr;
// only served as one reserved buffer
Slice _buf;
struct PartData;
std::unique_ptr<PartData> _inner_data;
size_t _append_offset {0};
// not owned
ThreadPool* _thread_pool = nullptr;
Expand All @@ -122,27 +114,16 @@ class S3FileBufferPool {

// should be called one and only once
// at startup
void init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size,
doris::ThreadPool* thread_pool);
void init(doris::ThreadPool* thread_pool);

static S3FileBufferPool* GetInstance() {
static S3FileBufferPool _pool;
return &_pool;
}

void reclaim(Slice buf) {
std::unique_lock<std::mutex> lck {_lock};
_free_raw_buffers.emplace_front(buf);
_cv.notify_all();
}

std::shared_ptr<S3FileBuffer> allocate(bool reserve = false);
Status allocate(std::shared_ptr<S3FileBuffer>* buf);

private:
std::mutex _lock;
std::condition_variable _cv;
std::unique_ptr<char[]> _whole_mem_buffer;
std::list<Slice> _free_raw_buffers;
// not owned
ThreadPool* _thread_pool = nullptr;
};
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ Status S3FileWriter::close() {

if (_bytes_appended == 0 && _create_empty_file) {
// No data written, but need to create an empty file
_pending_buf = S3FileBufferPool::GetInstance()->allocate();
RETURN_IF_ERROR(S3FileBufferPool::GetInstance()->allocate(&_pending_buf));
// if there is no upload id, we need to create a new one
_pending_buf->set_upload_remote_callback(
[this, buf = _pending_buf]() { _put_object(*buf); });
Expand Down Expand Up @@ -238,7 +238,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
return _st;
}
if (!_pending_buf) {
_pending_buf = S3FileBufferPool::GetInstance()->allocate();
RETURN_IF_ERROR(S3FileBufferPool::GetInstance()->allocate(&_pending_buf));
// capture part num by value along with the value of the shared ptr
_pending_buf->set_upload_remote_callback(
[part_num = _cur_part_num, this, cur_buf = _pending_buf]() {
Expand Down
4 changes: 1 addition & 3 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,7 @@ int main(int argc, char** argv) {

// init s3 write buffer pool
doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance();
s3_buffer_pool->init(doris::config::s3_write_buffer_whole_size,
doris::config::s3_write_buffer_size,
exec_env->buffered_reader_prefetch_thread_pool());
s3_buffer_pool->init(exec_env->buffered_reader_prefetch_thread_pool());

// init and open storage engine
doris::EngineOptions options;
Expand Down
Loading