Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement] add a lower bound for bytes in scanner queue #28912

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ DEFINE_mInt32(doris_scanner_queue_size, "1024");
DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
DEFINE_mInt32(min_bytes_in_scanner_queue, "67108864");
// number of max scan keys
DEFINE_mInt32(doris_max_scan_key_num, "48");
// the max number of push down values of a single column.
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ DECLARE_mInt32(doris_scanner_queue_size);
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
DECLARE_mInt32(min_bytes_in_scanner_queue);
// number of max scan keys
DECLARE_mInt32(doris_max_scan_key_num);
// the max number of push down values of a single column.
Expand Down
7 changes: 7 additions & 0 deletions be/src/io/fs/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/bit_util.h"
#include "util/bvar_helper.h"

namespace doris {
namespace io {
struct IOContext;

static bvar::Status<int64_t> g_bytes_in_stream_load_pipe("doris_bytes_in_stream_load_pipe", 0);

StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t min_chunk_size,
int64_t total_length, bool use_proto)
: _buffered_bytes(0),
Expand Down Expand Up @@ -82,6 +85,7 @@ Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice result, size_t* byt
_buffered_bytes -= buf->limit;
_put_cond.notify_one();
}
g_bytes_in_stream_load_pipe.set_value(_buffered_bytes + _proto_buffered_bytes);
}
DCHECK(*bytes_read == bytes_req)
<< "*bytes_read=" << *bytes_read << ", bytes_req=" << bytes_req;
Expand Down Expand Up @@ -191,6 +195,7 @@ Status StreamLoadPipe::_read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_
row_ptr.release();
}
_put_cond.notify_one();
g_bytes_in_stream_load_pipe.set_value(_buffered_bytes + _proto_buffered_bytes);
return Status::OK();
}

Expand Down Expand Up @@ -220,6 +225,8 @@ Status StreamLoadPipe::_append(const ByteBufferPtr& buf, size_t proto_byte_size)
}
}
_get_cond.notify_one();
g_bytes_in_stream_load_pipe.set_value(_buffered_bytes + _proto_buffered_bytes);

return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions be/src/util/bvar_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#pragma once
#include <bvar/bvar.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'bvar/bvar.h' file not found [clang-diagnostic-error]

#include <bvar/bvar.h>
         ^

#include <bvar/latency_recorder.h>

#include "defer_op.h"
Expand Down
29 changes: 24 additions & 5 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "runtime/exec_env.h"
#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
#include "util/bvar_helper.h"
#include "util/pretty_printer.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
Expand All @@ -45,6 +46,9 @@ namespace doris::vectorized {

using namespace std::chrono_literals;

static bvar::Status<int64_t> g_bytes_in_scanner_queue("doris_bytes_in_scanner_queue", 0);
static bvar::Status<int64_t> g_num_running_scanners("doris_num_running_scanners", 0);

ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc,
const std::list<std::shared_ptr<ScannerDelegate>>& scanners,
int64_t limit_, int64_t max_bytes_in_blocks_queue,
Expand Down Expand Up @@ -169,6 +173,9 @@ Status ScannerContext::init() {
_free_blocks_capacity = _max_thread_num * _block_per_scanner;
auto block = get_free_block();
_estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
int min_blocks = (config::min_bytes_in_scanner_queue + _estimated_block_bytes - 1) /
_estimated_block_bytes;
_free_blocks_capacity = std::max(_free_blocks_capacity, min_blocks);
return_free_block(std::move(block));

#ifndef BE_TEST
Expand Down Expand Up @@ -232,19 +239,27 @@ void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> block)
}

void ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) {
std::lock_guard l(_transfer_lock);
auto old_bytes_in_queue = _cur_bytes_in_queue;
int64_t old_bytes_in_queue = 0;

for (auto& b : blocks) {
auto st = validate_block_schema(b.get());
if (!st.ok()) {
set_status_on_error(st, false);
set_status_on_error(st, true);
}
}

{
std::lock_guard l(_transfer_lock);
old_bytes_in_queue = _cur_bytes_in_queue;
for (auto& b : blocks) {
_cur_bytes_in_queue += b->allocated_bytes();
_blocks_queue.push_back(std::move(b));
}
_cur_bytes_in_queue += b->allocated_bytes();
_blocks_queue.push_back(std::move(b));
}
blocks.clear();
_blocks_queue_added_cv.notify_one();
_queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue);
g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);
}

bool ScannerContext::empty_in_queue(int id) {
Expand Down Expand Up @@ -327,6 +342,8 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
}
}

g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);

if (!merge_blocks.empty()) {
vectorized::MutableBlock m(block->get());
for (auto& merge_block : merge_blocks) {
Expand Down Expand Up @@ -367,11 +384,13 @@ Status ScannerContext::validate_block_schema(Block* block) {
void ScannerContext::inc_num_running_scanners(int32_t inc) {
std::lock_guard l(_transfer_lock);
_num_running_scanners += inc;
g_num_running_scanners.set_value(_num_running_scanners);
}

void ScannerContext::dec_num_running_scanners(int32_t scanner_dec) {
std::lock_guard l(_transfer_lock);
_num_running_scanners -= scanner_dec;
g_num_running_scanners.set_value(_num_running_scanners);
}

void ScannerContext::set_status_on_error(const Status& status, bool need_lock) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext> {

// todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when executing shared scan
inline bool should_be_scheduled() const {
VLOG_NOTICE << "curl_bytes_in_queue " << _cur_bytes_in_queue << " max_bytes_in_queue "
<< _max_bytes_in_queue << " _serving_blocks_num " << _serving_blocks_num
<< " _free_blocks_capacity " << _free_blocks_capacity
<< " estimated_block_bytes " << _estimated_block_bytes;
return !done() && (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
(_serving_blocks_num < allowed_blocks_num());
}
Expand Down
8 changes: 2 additions & 6 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
// judge if we need to yield. So we record all raw data read in this round
// scan, if this exceeds row number or bytes threshold, we yield this thread.
std::vector<vectorized::BlockUPtr> blocks;
int64_t raw_rows_read = scanner->get_rows_read();
int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
int64_t raw_bytes_read = 0;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
int num_rows_in_block = 0;
Expand All @@ -341,11 +339,10 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
// queue, it will affect query latency and query concurrency for example ssb 3.3.
auto should_do_scan = [&, batch_size = state->batch_size(),
time = state->wait_full_block_schedule_times()]() {
if (raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold) {
if (raw_bytes_read < raw_bytes_threshold) {
return true;
} else if (num_rows_in_block < batch_size) {
return raw_bytes_read < raw_bytes_threshold * time &&
raw_rows_read < raw_rows_threshold * time;
return raw_bytes_read < raw_bytes_threshold * time;
}
return false;
};
Expand Down Expand Up @@ -392,7 +389,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
blocks.push_back(std::move(block));
}
}
raw_rows_read = scanner->get_rows_read();
} // end for while

// if we failed, check status.
Expand Down
Loading