Skip to content

Commit

Permalink
[enhance](PrefetchReader) Make the prefetch timeout one config (#27371)…
Browse files Browse the repository at this point in the history
… (#27530)
  • Loading branch information
ByteYue authored Dec 22, 2023
1 parent 615a6e1 commit 2bc39d5
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 18 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,8 @@ DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
// Download binlog rate limit, unit is KB/s, 0 means no limit
DEFINE_Int32(download_binlog_rate_limit_kbs, "0");

DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000");

DEFINE_Bool(enable_snapshot_action, "false");

// clang-format off
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,8 @@ DECLARE_Int32(ingest_binlog_work_pool_size);
// Download binlog rate limit, unit is KB/s
DECLARE_Int32(download_binlog_rate_limit_kbs);

DECLARE_mInt32(buffered_reader_read_timeout_ms);

// whether to enable /api/snapshot api
DECLARE_Bool(enable_snapshot_action);

Expand Down
37 changes: 19 additions & 18 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,20 +387,15 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz
return Status::OK();
}

// the condition variable would wait at most 10 seconds
// otherwise it would quit the procedure and treat it
// as one time out error status and would make the load
// task failed
constexpr static int WAIT_TIME_OUT_MS = 10000;

// there exists occasions where the buffer is already closed but
// some prior tasks are still queued in thread pool, so we have to check whether
// the buffer is closed each time the condition variable is notified.
void PrefetchBuffer::reset_offset(size_t offset) {
{
std::unique_lock lck {_lock};
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
[this]() { return _buffer_status != BufferStatus::PENDING; })) {
if (!_prefetched.wait_for(
lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() { return _buffer_status != BufferStatus::PENDING; })) {
_prefetch_status = Status::TimedOut("time out when reset prefetch buffer");
return;
}
Expand All @@ -427,10 +422,12 @@ void PrefetchBuffer::reset_offset(size_t offset) {
void PrefetchBuffer::prefetch_buffer() {
{
std::unique_lock lck {_lock};
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() {
return _buffer_status == BufferStatus::RESET ||
_buffer_status == BufferStatus::CLOSED;
})) {
if (!_prefetched.wait_for(
lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() {
return _buffer_status == BufferStatus::RESET ||
_buffer_status == BufferStatus::CLOSED;
})) {
_prefetch_status = Status::TimedOut("time out when invoking prefetch buffer");
return;
}
Expand Down Expand Up @@ -470,7 +467,8 @@ void PrefetchBuffer::prefetch_buffer() {
_statis.prefetch_request_io += 1;
_statis.prefetch_request_bytes += _len;
std::unique_lock lck {_lock};
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
if (!_prefetched.wait_for(lck,
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() { return _buffer_status == BufferStatus::PENDING; })) {
_prefetch_status = Status::TimedOut("time out when invoking prefetch buffer");
return;
Expand Down Expand Up @@ -555,10 +553,12 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
{
std::unique_lock lck {_lock};
// buffer must be prefetched or it's closed
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() {
return _buffer_status == BufferStatus::PREFETCHED ||
_buffer_status == BufferStatus::CLOSED;
})) {
if (!_prefetched.wait_for(
lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() {
return _buffer_status == BufferStatus::PREFETCHED ||
_buffer_status == BufferStatus::CLOSED;
})) {
_prefetch_status = Status::TimedOut("time out when read prefetch buffer");
return _prefetch_status;
}
Expand Down Expand Up @@ -594,7 +594,8 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
void PrefetchBuffer::close() {
std::unique_lock lck {_lock};
// in case _reader still tries to write to the buf after we close the buffer
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
if (!_prefetched.wait_for(lck,
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() { return _buffer_status != BufferStatus::PENDING; })) {
_prefetch_status = Status::TimedOut("time out when close prefetch buffer");
return;
Expand Down

0 comments on commit 2bc39d5

Please sign in to comment.