Skip to content

Commit

Permalink
pass benchmark tool
Browse files Browse the repository at this point in the history
no lambda

check

init
  • Loading branch information
ByteYue committed Jul 31, 2024
1 parent 0ad9600 commit 397818a
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 99 deletions.
6 changes: 0 additions & 6 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1052,10 +1052,6 @@ DEFINE_mInt32(tablet_path_check_batch_size, "1000");
DEFINE_mInt64(row_column_page_size, "4096");
// it must be larger than or equal to 5MB
DEFINE_mInt32(s3_write_buffer_size, "5242880");
// the size of the whole s3 buffer pool, which indicates the s3 file writer
// can at most buffer 50MB data. And the num of multi part upload task is
// s3_write_buffer_whole_size / s3_write_buffer_size
DEFINE_mInt32(s3_write_buffer_whole_size, "524288000");
DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");

//disable shrink memory by default
Expand Down Expand Up @@ -1147,8 +1143,6 @@ DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000");

DEFINE_Bool(enable_snapshot_action, "false");

DEFINE_mInt32(s3_writer_buffer_allocation_timeout_second, "60");

DEFINE_mBool(enable_column_type_check, "true");

// Tolerance for the number of partition id 0 in rowset, default 0
Expand Down
7 changes: 0 additions & 7 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1082,10 +1082,6 @@ DECLARE_mInt32(tablet_path_check_batch_size);
DECLARE_mInt64(row_column_page_size);
// it must be larger than or equal to 5MB
DECLARE_mInt32(s3_write_buffer_size);
// the size of the whole s3 buffer pool, which indicates the s3 file writer
// can at most buffer 50MB data. And the num of multi part upload task is
// s3_write_buffer_whole_size / s3_write_buffer_size
DECLARE_mInt32(s3_write_buffer_whole_size);
// the max number of cached file handle for block segemnt
DECLARE_mInt64(file_cache_max_file_reader_cache_size);
//enable shrink memory
Expand Down Expand Up @@ -1197,9 +1193,6 @@ DECLARE_mInt32(buffered_reader_read_timeout_ms);
// whether to enable /api/snapshot api
DECLARE_Bool(enable_snapshot_action);

// The timeout config for S3 write buffer allocation
DECLARE_mInt32(s3_writer_buffer_allocation_timeout_second);

DECLARE_mBool(enable_column_type_check);

// Tolerance for the number of partition id 0 in rowset, default 0
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/benchmark/fs_benchmark_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ int main(int argc, char** argv) {
.set_max_threads(num_cores)
.build(&buffered_reader_prefetch_thread_pool);
doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance();
s3_buffer_pool->init(524288000, 5242880, buffered_reader_prefetch_thread_pool.get());
s3_buffer_pool->init(buffered_reader_prefetch_thread_pool.get());

try {
doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation, std::stoi(FLAGS_threads),
Expand Down
103 changes: 48 additions & 55 deletions be/src/io/fs/s3_file_write_bufferpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "s3_file_write_bufferpool.h"

#include <boost/core/noncopyable.hpp>
#include <chrono>
#include <cstring>

Expand All @@ -29,13 +30,48 @@

namespace doris {
namespace io {

bvar::Adder<uint64_t> s3_file_buffer_allocated("s3_file_buffer_allocated");

template <typename Allocator = Allocator<false, false, false>>
struct Memory : boost::noncopyable, Allocator {
Memory() = default;
explicit Memory(size_t size) : _size(size) {
alloc(size);
s3_file_buffer_allocated << 1;
}
~Memory() {
dealloc();
s3_file_buffer_allocated << -1;
}
void alloc(size_t size) { _data = static_cast<char*>(Allocator::alloc(size, 0)); }
void dealloc() {
if (_data == nullptr) {
return;
}
Allocator::free(_data, _size);
_data = nullptr;
}
size_t _size;
char* _data;
};

struct S3FileBuffer::PartData {
Memory<> _memory;
PartData() : _memory(config::s3_write_buffer_size) {}
~PartData() = default;
};

S3FileBuffer::S3FileBuffer(ThreadPool* pool)
: _inner_data(std::make_unique<S3FileBuffer::PartData>()), _thread_pool(pool) {}

S3FileBuffer::~S3FileBuffer() = default;

void S3FileBuffer::on_finished() {
if (_buf.empty()) {
if (nullptr == _inner_data) {
return;
}
reset();
S3FileBufferPool::GetInstance()->reclaim(_buf);
_buf.clear();
}

// when there is memory preserved, directly write data to buf
Expand All @@ -45,74 +81,31 @@ Status S3FileBuffer::append_data(const Slice& data) {
Defer defer {[&] { _size += data.get_size(); }};
while (true) {
// if buf is not empty, it means there is memory preserved for this buf
if (!_buf.empty()) {
memcpy(_buf.data + _size, data.get_data(), data.get_size());
if (_inner_data != nullptr) {
memcpy(_inner_data->_memory._data + _size, data.get_data(), data.get_size());
break;
} else {
// wait allocate buffer pool
auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
if (tmp->get_size() == 0) {
return Status::InternalError("Failed to allocate s3 writer buffer for {} seconds",
config::s3_writer_buffer_allocation_timeout_second);
}
rob_buffer(tmp);
return Status::InternalError("Failed to allocate s3 writer buffer");
}
}
return Status::OK();
}

void S3FileBuffer::submit() {
if (LIKELY(!_buf.empty())) {
_stream_ptr = std::make_shared<StringViewStream>(_buf.data, _size);
if (LIKELY(nullptr != _inner_data)) {
_stream_ptr = std::make_shared<StringViewStream>(_inner_data->_memory._data, _size);
}

_thread_pool->submit_func([buf = this->shared_from_this()]() { buf->_on_upload(); });
}

void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size,
doris::ThreadPool* thread_pool) {
// the nums could be one configuration
size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size;
DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&
(s3_write_buffer_whole_size > s3_write_buffer_size));
LOG_INFO("S3 file buffer pool with {} buffers", buf_num);
_whole_mem_buffer = std::make_unique<char[]>(s3_write_buffer_whole_size);
for (size_t i = 0; i < buf_num; i++) {
Slice s {_whole_mem_buffer.get() + i * s3_write_buffer_size,
static_cast<size_t>(s3_write_buffer_size)};
_free_raw_buffers.emplace_back(s);
}
void S3FileBufferPool::init(doris::ThreadPool* thread_pool) {
_thread_pool = thread_pool;
}

std::shared_ptr<S3FileBuffer> S3FileBufferPool::allocate(bool reserve) {
std::shared_ptr<S3FileBuffer> buf = std::make_shared<S3FileBuffer>(_thread_pool);
int64_t timeout = config::s3_writer_buffer_allocation_timeout_second;
// if need reserve then we must ensure return buf with memory preserved
if (reserve) {
{
std::unique_lock<std::mutex> lck {_lock};
_cv.wait_for(lck, std::chrono::seconds(timeout),
[this]() { return !_free_raw_buffers.empty(); });
if (!_free_raw_buffers.empty()) {
buf->reserve_buffer(_free_raw_buffers.front());
_free_raw_buffers.pop_front();
}
}
return buf;
}
// try to get one memory reserved buffer
{
std::unique_lock<std::mutex> lck {_lock};
if (!_free_raw_buffers.empty()) {
buf->reserve_buffer(_free_raw_buffers.front());
_free_raw_buffers.pop_front();
}
}
// if there is no free buffer and no need to reserve memory, we could return one empty buffer
// if the buf has no memory reserved, it would try to write the data to file cache first
// or it would try to rob buffer from other S3FileBuffer
return buf;
Status S3FileBufferPool::allocate(std::shared_ptr<S3FileBuffer>* buf) {
RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<S3FileBuffer>(_thread_pool));
return Status::OK();
}
} // namespace io
} // namespace doris
31 changes: 6 additions & 25 deletions be/src/io/fs/s3_file_write_bufferpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,8 @@ namespace io {
struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
using Callback = std::function<void()>;

S3FileBuffer(ThreadPool* pool) { _thread_pool = pool; }
~S3FileBuffer() = default;

void rob_buffer(std::shared_ptr<S3FileBuffer>& other) {
_buf = other->_buf;
// we should clear other's memory buffer in case it woule be reclaimed twice
// when calling on_finished
other->_buf.clear();
}

void reserve_buffer(Slice s) { _buf = s; }
S3FileBuffer(ThreadPool* pool);
~S3FileBuffer();

// append data into the memory buffer inside
// or into the file cache if the buffer has no memory buffer
Expand Down Expand Up @@ -109,7 +100,8 @@ struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
size_t _size {0};
std::shared_ptr<std::iostream> _stream_ptr;
// only served as one reserved buffer
Slice _buf;
struct PartData;
std::unique_ptr<PartData> _inner_data;
size_t _append_offset {0};
// not owned
ThreadPool* _thread_pool = nullptr;
Expand All @@ -122,27 +114,16 @@ class S3FileBufferPool {

// should be called one and only once
// at startup
void init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size,
doris::ThreadPool* thread_pool);
void init(doris::ThreadPool* thread_pool);

static S3FileBufferPool* GetInstance() {
static S3FileBufferPool _pool;
return &_pool;
}

void reclaim(Slice buf) {
std::unique_lock<std::mutex> lck {_lock};
_free_raw_buffers.emplace_front(buf);
_cv.notify_all();
}

std::shared_ptr<S3FileBuffer> allocate(bool reserve = false);
Status allocate(std::shared_ptr<S3FileBuffer>* buf);

private:
std::mutex _lock;
std::condition_variable _cv;
std::unique_ptr<char[]> _whole_mem_buffer;
std::list<Slice> _free_raw_buffers;
// not owned
ThreadPool* _thread_pool = nullptr;
};
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ Status S3FileWriter::close() {

if (_bytes_appended == 0 && _create_empty_file) {
// No data written, but need to create an empty file
_pending_buf = S3FileBufferPool::GetInstance()->allocate();
RETURN_IF_ERROR(S3FileBufferPool::GetInstance()->allocate(&_pending_buf));
// if there is no upload id, we need to create a new one
_pending_buf->set_upload_remote_callback(
[this, buf = _pending_buf]() { _put_object(*buf); });
Expand Down Expand Up @@ -238,7 +238,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
return _st;
}
if (!_pending_buf) {
_pending_buf = S3FileBufferPool::GetInstance()->allocate();
RETURN_IF_ERROR(S3FileBufferPool::GetInstance()->allocate(&_pending_buf));
// capture part num by value along with the value of the shared ptr
_pending_buf->set_upload_remote_callback(
[part_num = _cur_part_num, this, cur_buf = _pending_buf]() {
Expand Down
4 changes: 1 addition & 3 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,7 @@ int main(int argc, char** argv) {

// init s3 write buffer pool
doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance();
s3_buffer_pool->init(doris::config::s3_write_buffer_whole_size,
doris::config::s3_write_buffer_size,
exec_env->buffered_reader_prefetch_thread_pool());
s3_buffer_pool->init(exec_env->buffered_reader_prefetch_thread_pool());

// init and open storage engine
doris::EngineOptions options;
Expand Down

0 comments on commit 397818a

Please sign in to comment.