Skip to content

Commit

Permalink
[enhance](S3) Add timeout config for s3 buffer allocation #26125 (#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
ByteYue authored Dec 22, 2023
1 parent 2bc39d5 commit 4b445d9
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 6 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,8 @@ DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000");

DEFINE_Bool(enable_snapshot_action, "false");

DEFINE_mInt32(s3_writer_buffer_allocation_timeout_second, "60");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,9 @@ DECLARE_mInt32(buffered_reader_read_timeout_ms);
// whether to enable /api/snapshot api
DECLARE_Bool(enable_snapshot_action);

// The timeout config for S3 write buffer allocation
DECLARE_mInt32(s3_writer_buffer_allocation_timeout_second);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
18 changes: 14 additions & 4 deletions be/src/io/fs/s3_file_write_bufferpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "s3_file_write_bufferpool.h"

#include <chrono>
#include <cstring>

#include "common/config.h"
Expand All @@ -40,7 +41,7 @@ void S3FileBuffer::on_finished() {
// when there is memory preserved, directly write data to buf
// TODO:(AlexYue): write to file cache otherwise, then we'll wait for free buffer
// and to rob it
void S3FileBuffer::append_data(const Slice& data) {
Status S3FileBuffer::append_data(const Slice& data) {
Defer defer {[&] { _size += data.get_size(); }};
while (true) {
// if buf is not empty, it means there is memory preserved for this buf
Expand All @@ -50,9 +51,14 @@ void S3FileBuffer::append_data(const Slice& data) {
} else {
// wait allocate buffer pool
auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
if (tmp->get_size() == 0) {
return Status::InternalError("Failed to allocate s3 writer buffer for {} seconds",
config::s3_writer_buffer_allocation_timeout_second);
}
rob_buffer(tmp);
}
}
return Status::OK();
}

void S3FileBuffer::submit() {
Expand Down Expand Up @@ -81,13 +87,17 @@ void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t s3_write

std::shared_ptr<S3FileBuffer> S3FileBufferPool::allocate(bool reserve) {
std::shared_ptr<S3FileBuffer> buf = std::make_shared<S3FileBuffer>(_thread_pool);
int64_t timeout = config::s3_writer_buffer_allocation_timeout_second;
// if need reserve then we must ensure return buf with memory preserved
if (reserve) {
{
std::unique_lock<std::mutex> lck {_lock};
_cv.wait(lck, [this]() { return !_free_raw_buffers.empty(); });
buf->reserve_buffer(_free_raw_buffers.front());
_free_raw_buffers.pop_front();
_cv.wait_for(lck, std::chrono::seconds(timeout),
[this]() { return !_free_raw_buffers.empty(); });
if (!_free_raw_buffers.empty()) {
buf->reserve_buffer(_free_raw_buffers.front());
_free_raw_buffers.pop_front();
}
}
return buf;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/s3_file_write_bufferpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {

// append data into the memory buffer inside
// or into the file cache if the buffer has no memory buffer
void append_data(const Slice& data);
Status append_data(const Slice& data);
// upload to S3 and file cache in async threadpool
void submit();
// set the callback to upload to S3 file
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {

// if the buffer has memory buf inside, the data would be written into memory first then S3 then file cache
// it would be written to cache then S3 if the buffer doesn't have memory preserved
_pending_buf->append_data(Slice {data[i].get_data() + pos, data_size_to_append});
RETURN_IF_ERROR(_pending_buf->append_data(
Slice {data[i].get_data() + pos, data_size_to_append}));

// if it's the last part, it could be less than 5MB, or it must
// satisfy that the size is larger than or euqal to 5MB
Expand Down

0 comments on commit 4b445d9

Please sign in to comment.