Skip to content

Commit

Permalink
[enhancement](scanner) add a lower bound for bytes in scanner queue (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring authored Jan 26, 2024
1 parent 1286f34 commit 920aeac
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 6 deletions.
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ DEFINE_mInt32(doris_scanner_queue_size, "1024");
DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
DEFINE_mInt32(min_bytes_in_scanner_queue, "67108864");
// number of max scan keys
DEFINE_mInt32(doris_max_scan_key_num, "48");
// the max number of push down values of a single column.
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ DECLARE_mInt32(doris_scanner_queue_size);
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
DECLARE_mInt32(min_bytes_in_scanner_queue);
// number of max scan keys
DECLARE_mInt32(doris_max_scan_key_num);
// the max number of push down values of a single column.
Expand Down
10 changes: 10 additions & 0 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ namespace doris::vectorized {

using namespace std::chrono_literals;

static bvar::Status<int64_t> g_bytes_in_scanner_queue("doris_bytes_in_scanner_queue", 0);
static bvar::Status<int64_t> g_num_running_scanners("doris_num_running_scanners", 0);

ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<ScannerDelegate>>& scanners,
Expand Down Expand Up @@ -179,6 +182,9 @@ Status ScannerContext::init() {
_free_blocks_capacity = _max_thread_num * _block_per_scanner;
auto block = get_free_block();
_estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
int min_blocks = (config::min_bytes_in_scanner_queue + _estimated_block_bytes - 1) /
_estimated_block_bytes;
_free_blocks_capacity = std::max(_free_blocks_capacity, min_blocks);
return_free_block(std::move(block));

#ifndef BE_TEST
Expand Down Expand Up @@ -258,6 +264,7 @@ void ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
}
_blocks_queue_added_cv.notify_one();
_queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue);
g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);
}

bool ScannerContext::empty_in_queue(int id) {
Expand Down Expand Up @@ -334,6 +341,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
}
}

g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);
if (!merge_blocks.empty()) {
vectorized::MutableBlock m(block->get());
for (auto& merge_block : merge_blocks) {
Expand Down Expand Up @@ -375,6 +383,7 @@ Status ScannerContext::validate_block_schema(Block* block) {
void ScannerContext::inc_num_running_scanners(int32_t inc) {
std::lock_guard l(_transfer_lock);
_num_running_scanners += inc;
g_num_running_scanners.set_value(_num_running_scanners);
}

void ScannerContext::set_status_on_error(const Status& status, bool need_lock) {
Expand Down Expand Up @@ -484,6 +493,7 @@ void ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr<ScannerDel
// before we call the following if() block.
{
--_num_running_scanners;
g_num_running_scanners.set_value(_num_running_scanners);
if (scanner->_scanner->need_to_close()) {
--_num_unfinished_scanners;
if (_num_unfinished_scanners == 0) {
Expand Down
8 changes: 2 additions & 6 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
// judge if we need to yield. So we record all raw data read in this round
// scan, if this exceeds row number or bytes threshold, we yield this thread.
std::vector<vectorized::BlockUPtr> blocks;
int64_t raw_rows_read = scanner->get_rows_read();
int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
int64_t raw_bytes_read = 0;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
int num_rows_in_block = 0;
Expand All @@ -347,11 +345,10 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
// queue, it will affect query latency and query concurrency for example ssb 3.3.
auto should_do_scan = [&, batch_size = state->batch_size(),
time = state->wait_full_block_schedule_times()]() {
if (raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold) {
if (raw_bytes_read < raw_bytes_threshold) {
return true;
} else if (num_rows_in_block < batch_size) {
return raw_bytes_read < raw_bytes_threshold * time &&
raw_rows_read < raw_rows_threshold * time;
return raw_bytes_read < raw_bytes_threshold * time;
}
return false;
};
Expand Down Expand Up @@ -400,7 +397,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
blocks.push_back(std::move(block));
}
}
raw_rows_read = scanner->get_rows_read();
} // end for while

// if we failed, check status.
Expand Down

0 comments on commit 920aeac

Please sign in to comment.