diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f1d7635882edc2..e373125624f50d 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1052,10 +1052,6 @@ DEFINE_mInt32(tablet_path_check_batch_size, "1000"); DEFINE_mInt64(row_column_page_size, "4096"); // it must be larger than or equal to 5MB DEFINE_mInt32(s3_write_buffer_size, "5242880"); -// the size of the whole s3 buffer pool, which indicates the s3 file writer -// can at most buffer 50MB data. And the num of multi part upload task is -// s3_write_buffer_whole_size / s3_write_buffer_size -DEFINE_mInt32(s3_write_buffer_whole_size, "524288000"); DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000"); //disable shrink memory by default @@ -1147,8 +1143,6 @@ DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000"); DEFINE_Bool(enable_snapshot_action, "false"); -DEFINE_mInt32(s3_writer_buffer_allocation_timeout_second, "60"); - DEFINE_mBool(enable_column_type_check, "true"); // Tolerance for the number of partition id 0 in rowset, default 0 diff --git a/be/src/common/config.h b/be/src/common/config.h index ccddc70d7e691e..a5420df10ded9d 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1082,10 +1082,6 @@ DECLARE_mInt32(tablet_path_check_batch_size); DECLARE_mInt64(row_column_page_size); // it must be larger than or equal to 5MB DECLARE_mInt32(s3_write_buffer_size); -// the size of the whole s3 buffer pool, which indicates the s3 file writer -// can at most buffer 50MB data. And the num of multi part upload task is -// s3_write_buffer_whole_size / s3_write_buffer_size -DECLARE_mInt32(s3_write_buffer_whole_size); // the max number of cached file handle for block segemnt DECLARE_mInt64(file_cache_max_file_reader_cache_size); //enable shrink memory @@ -1197,9 +1193,6 @@ DECLARE_mInt32(buffered_reader_read_timeout_ms); // whether to enable /api/snapshot api DECLARE_Bool(enable_snapshot_action); -// The timeout config for S3 write buffer allocation -DECLARE_mInt32(s3_writer_buffer_allocation_timeout_second); - DECLARE_mBool(enable_column_type_check); // Tolerance for the number of partition id 0 in rowset, default 0 diff --git a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp index 77dee409d16efb..5d83bfeeef583f 100644 --- a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp +++ b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp @@ -120,7 +120,7 @@ int main(int argc, char** argv) { .set_max_threads(num_cores) .build(&buffered_reader_prefetch_thread_pool); doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance(); - s3_buffer_pool->init(524288000, 5242880, buffered_reader_prefetch_thread_pool.get()); + s3_buffer_pool->init(buffered_reader_prefetch_thread_pool.get()); try { doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation, std::stoi(FLAGS_threads), diff --git a/be/src/io/fs/s3_file_write_bufferpool.cpp b/be/src/io/fs/s3_file_write_bufferpool.cpp index 5c5aa662316778..52ea3e9e36549a 100644 --- a/be/src/io/fs/s3_file_write_bufferpool.cpp +++ b/be/src/io/fs/s3_file_write_bufferpool.cpp @@ -17,6 +17,7 @@ #include "s3_file_write_bufferpool.h" +#include #include #include @@ -29,13 +30,48 @@ namespace doris { namespace io { + +bvar::Adder s3_file_buffer_allocated("s3_file_buffer_allocated"); + +template > +struct Memory : boost::noncopyable, Allocator { + Memory() = default; + explicit Memory(size_t size) : _size(size) { + alloc(size); + s3_file_buffer_allocated << 1; + } + ~Memory() { + dealloc(); + s3_file_buffer_allocated << -1; + } + void alloc(size_t size) { _data = static_cast(Allocator::alloc(size, 0)); } + void dealloc() { + if (_data == nullptr) { + return; + } + Allocator::free(_data, _size); + _data = nullptr; + } + size_t _size; + char* _data; +}; + +struct S3FileBuffer::PartData { + Memory<> _memory; + PartData() : _memory(config::s3_write_buffer_size) {} + ~PartData() = default; +}; + +S3FileBuffer::S3FileBuffer(ThreadPool* pool) + : _inner_data(std::make_unique()), _thread_pool(pool) {} + +S3FileBuffer::~S3FileBuffer() = default; + void S3FileBuffer::on_finished() { - if (_buf.empty()) { + if (nullptr == _inner_data) { return; } reset(); - S3FileBufferPool::GetInstance()->reclaim(_buf); - _buf.clear(); } // when there is memory preserved, directly write data to buf @@ -45,74 +81,31 @@ Status S3FileBuffer::append_data(const Slice& data) { Defer defer {[&] { _size += data.get_size(); }}; while (true) { // if buf is not empty, it means there is memory preserved for this buf - if (!_buf.empty()) { - memcpy(_buf.data + _size, data.get_data(), data.get_size()); + if (_inner_data != nullptr) { + memcpy(_inner_data->_memory._data + _size, data.get_data(), data.get_size()); break; } else { - // wait allocate buffer pool - auto tmp = S3FileBufferPool::GetInstance()->allocate(true); - if (tmp->get_size() == 0) { - return Status::InternalError("Failed to allocate s3 writer buffer for {} seconds", - config::s3_writer_buffer_allocation_timeout_second); - } - rob_buffer(tmp); + return Status::InternalError("Failed to allocate s3 writer buffer"); } } return Status::OK(); } void S3FileBuffer::submit() { - if (LIKELY(!_buf.empty())) { - _stream_ptr = std::make_shared(_buf.data, _size); + if (LIKELY(nullptr != _inner_data)) { + _stream_ptr = std::make_shared(_inner_data->_memory._data, _size); } _thread_pool->submit_func([buf = this->shared_from_this()]() { buf->_on_upload(); }); } -void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size, - doris::ThreadPool* thread_pool) { - // the nums could be one configuration - size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size; - DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) && - (s3_write_buffer_whole_size > s3_write_buffer_size)); - LOG_INFO("S3 file buffer pool with {} buffers", buf_num); - _whole_mem_buffer = std::make_unique(s3_write_buffer_whole_size); - for (size_t i = 0; i < buf_num; i++) { - Slice s {_whole_mem_buffer.get() + i * s3_write_buffer_size, - static_cast(s3_write_buffer_size)}; - _free_raw_buffers.emplace_back(s); - } +void S3FileBufferPool::init(doris::ThreadPool* thread_pool) { _thread_pool = thread_pool; } -std::shared_ptr S3FileBufferPool::allocate(bool reserve) { - std::shared_ptr buf = std::make_shared(_thread_pool); - int64_t timeout = config::s3_writer_buffer_allocation_timeout_second; - // if need reserve then we must ensure return buf with memory preserved - if (reserve) { - { - std::unique_lock lck {_lock}; - _cv.wait_for(lck, std::chrono::seconds(timeout), - [this]() { return !_free_raw_buffers.empty(); }); - if (!_free_raw_buffers.empty()) { - buf->reserve_buffer(_free_raw_buffers.front()); - _free_raw_buffers.pop_front(); - } - } - return buf; - } - // try to get one memory reserved buffer - { - std::unique_lock lck {_lock}; - if (!_free_raw_buffers.empty()) { - buf->reserve_buffer(_free_raw_buffers.front()); - _free_raw_buffers.pop_front(); - } - } - // if there is no free buffer and no need to reserve memory, we could return one empty buffer - // if the buf has no memory reserved, it would try to write the data to file cache first - // or it would try to rob buffer from other S3FileBuffer - return buf; +Status S3FileBufferPool::allocate(std::shared_ptr* buf) { + RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared(_thread_pool)); + return Status::OK(); } } // namespace io } // namespace doris diff --git a/be/src/io/fs/s3_file_write_bufferpool.h b/be/src/io/fs/s3_file_write_bufferpool.h index b4d3f322904e11..8a946776bad47e 100644 --- a/be/src/io/fs/s3_file_write_bufferpool.h +++ b/be/src/io/fs/s3_file_write_bufferpool.h @@ -38,17 +38,8 @@ namespace io { struct S3FileBuffer : public std::enable_shared_from_this { using Callback = std::function; - S3FileBuffer(ThreadPool* pool) { _thread_pool = pool; } - ~S3FileBuffer() = default; - - void rob_buffer(std::shared_ptr& other) { - _buf = other->_buf; - // we should clear other's memory buffer in case it woule be reclaimed twice - // when calling on_finished - other->_buf.clear(); - } - - void reserve_buffer(Slice s) { _buf = s; } + S3FileBuffer(ThreadPool* pool); + ~S3FileBuffer(); // append data into the memory buffer inside // or into the file cache if the buffer has no memory buffer @@ -109,7 +100,8 @@ struct S3FileBuffer : public std::enable_shared_from_this { size_t _size {0}; std::shared_ptr _stream_ptr; // only served as one reserved buffer - Slice _buf; + struct PartData; + std::unique_ptr _inner_data; size_t _append_offset {0}; // not owned ThreadPool* _thread_pool = nullptr; @@ -122,27 +114,16 @@ class S3FileBufferPool { // should be called one and only once // at startup - void init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size, - doris::ThreadPool* thread_pool); + void init(doris::ThreadPool* thread_pool); static S3FileBufferPool* GetInstance() { static S3FileBufferPool _pool; return &_pool; } - void reclaim(Slice buf) { - std::unique_lock lck {_lock}; - _free_raw_buffers.emplace_front(buf); - _cv.notify_all(); - } - - std::shared_ptr allocate(bool reserve = false); + Status allocate(std::shared_ptr* buf); private: - std::mutex _lock; - std::condition_variable _cv; - std::unique_ptr _whole_mem_buffer; - std::list _free_raw_buffers; // not owned ThreadPool* _thread_pool = nullptr; }; diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 1679385165de6b..17a86049fe0720 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -202,7 +202,7 @@ Status S3FileWriter::close() { if (_bytes_appended == 0 && _create_empty_file) { // No data written, but need to create an empty file - _pending_buf = S3FileBufferPool::GetInstance()->allocate(); + RETURN_IF_ERROR(S3FileBufferPool::GetInstance()->allocate(&_pending_buf)); // if there is no upload id, we need to create a new one _pending_buf->set_upload_remote_callback( [this, buf = _pending_buf]() { _put_object(*buf); }); @@ -238,7 +238,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { return _st; } if (!_pending_buf) { - _pending_buf = S3FileBufferPool::GetInstance()->allocate(); + RETURN_IF_ERROR(S3FileBufferPool::GetInstance()->allocate(&_pending_buf)); // capture part num by value along with the value of the shared ptr _pending_buf->set_upload_remote_callback( [part_num = _cur_part_num, this, cur_buf = _pending_buf]() { diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 19259b5e3da9c2..fff1c41f4be15a 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -468,9 +468,7 @@ int main(int argc, char** argv) { // init s3 write buffer pool doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance(); - s3_buffer_pool->init(doris::config::s3_write_buffer_whole_size, - doris::config::s3_write_buffer_size, - exec_env->buffered_reader_prefetch_thread_pool()); + s3_buffer_pool->init(exec_env->buffered_reader_prefetch_thread_pool()); // init and open storage engine doris::EngineOptions options;