Skip to content

Commit

Permalink
M
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh committed Oct 10, 2024
1 parent c0749bc commit 2fcc148
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 86 deletions.
11 changes: 1 addition & 10 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,16 +997,7 @@ Status ScanLocalState<Derived>::_start_scanners(
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
state()->scan_queue_mem_limit(), _scan_dependency,
// NOTE: This will logic makes _max_thread_num of ScannerContext to be C(num of cores) * 2
// For a query with C/2 instance and M scan node, scan task of this query will be C/2 * M * C*2
// and will be C*C*N at most.
// 1. If data distribution is ignored , we use 1 instance to scan.
// 2. Else if this operator is not file scan operator, we use config::doris_scanner_thread_pool_thread_num scanners to scan.
// 3. Else, file scanner will consume much memory so we use config::doris_scanner_thread_pool_thread_num / query_parallel_instance_num scanners to scan.
p.ignore_data_distribution() || !p.is_file_scan_operator()
? 1
: state()->query_parallel_instance_num());
_scan_dependency, p.ignore_data_distribution());
return Status::OK();
}

Expand Down
172 changes: 115 additions & 57 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,19 @@

#include <mutex>
#include <ostream>
#include <tuple>
#include <utility>

#include "common/config.h"
#include "common/status.h"
#include "olap/tablet.h"
#include "pipeline/exec/scan_operator.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/exec/scan/vscan_node.h"

namespace doris::vectorized {
Expand All @@ -43,8 +46,7 @@ ScannerContext::ScannerContext(
RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, std::shared_ptr<pipeline::Dependency> dependency,
const int num_parallel_instances)
std::shared_ptr<pipeline::Dependency> dependency, bool ignore_data_distribution)
: HasTaskExecutionCtx(state),
_state(state),
_local_state(local_state),
Expand All @@ -54,53 +56,140 @@ ScannerContext::ScannerContext(
_output_row_descriptor(output_row_descriptor),
_batch_size(state->batch_size()),
limit(limit_),
_max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) *
num_parallel_instances),
_scanner_scheduler(state->exec_env()->scanner_scheduler()),
_scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
_all_scanners(scanners.begin(), scanners.end()),
_num_parallel_instances(num_parallel_instances) {
_ignore_data_distribution(ignore_data_distribution) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
ctx_id = UniqueId::gen_uid().to_string();
_scanners.enqueue_bulk(scanners.begin(), scanners.size());
if (limit < 0) {
limit = -1;
}
MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio();
_query_thread_context = {_query_id, _state->query_mem_tracker(),
_state->get_query_ctx()->workload_group()};
_dependency = dependency;

DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
}

// After init function call, should not access _parent
Status ScannerContext::init() {
_scanner_profile = _local_state->_scanner_profile;
_scanner_sched_counter = _local_state->_scanner_sched_counter;
_newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
_scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
_scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
_scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;

#ifndef BE_TEST
// 3. get thread token
if (!_state->get_query_ctx()) {
return Status::InternalError("Query context of {} is not set",
print_id(_state->query_id()));
}

thread_token = _state->get_query_ctx()->get_token();

if (_state->get_query_ctx()->get_scan_scheduler()) {
_should_reset_thread_name = false;
}

#endif
_local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
thread_token == nullptr ? "False" : "True");

const int num_parallel_instances = _state->query_parallel_instance_num();

// _max_bytes_in_queue controls the maximum memory that can be used by a single scan instance.
// scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
// is larger than 10MB.
_max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10);

// Provide more memory for wide tables, increase proportionally by multiples of 300
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
if (scanners.empty()) {

// TODO: Where is the proper position to place this code?
if (_all_scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}
_scanners.enqueue_bulk(scanners.begin(), scanners.size());
if (limit < 0) {
limit = -1;

// This is a track implementation.
// The logic is kept only for the purpose of the potential performance issue.
bool submit_many_scan_tasks_for_potential_performance_issue = true;
auto scanner = _all_scanners.front().lock();
DCHECK(scanner != nullptr);
// A query could have remote scan task and local scan task at the same time.
// So we need to compute the _scanner_scheduler in each scan operator instead of query context.
SimplifiedScanScheduler* simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler();
SimplifiedScanScheduler* remote_scan_task_scheduler =
_state->get_query_ctx()->get_remote_scan_scheduler();
if (scanner->_scanner->get_storage_type() == TabletStorageType::STORAGE_TYPE_LOCAL) {
// scan_scheduler could be empty if query does not have a workload group.
if (simple_scan_scheduler) {
_scanner_scheduler = simple_scan_scheduler;
} else {
_scanner_scheduler = _scanner_scheduler_global->get_local_scan_thread_pool();
}
} else {
// remote_scan_task_scheduler could be empty if query does not have a workload group.
if (remote_scan_task_scheduler) {
_scanner_scheduler = remote_scan_task_scheduler;
} else {
_scanner_scheduler = _scanner_scheduler_global->get_remote_scan_thread_pool();
}
}
MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio();

if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size) {
submit_many_scan_tasks_for_potential_performance_issue = false;
}

// _max_thread_num controls how many scanners of this ScanOperator can be submitted to scheduler at a time.
// The overall target of our system is to make full utilization of the resources.
// At the same time, we dont want too many tasks are queued by scheduler, that makes the query
// waiting too long, and existing task can not be scheduled in time.
// First of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than
// config::doris_scanner_thread_pool_thread_num.
// At the same time, we dont want too many tasks are queued by scheduler, that is not necessary.
// So, first of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than
// 2 * config::doris_scanner_thread_pool_thread_num, so that we can make all io threads busy.
// For example, on a 64-core machine, the default value of config::doris_scanner_thread_pool_thread_num will be 64*2 =128.
// and the num_parallel_instances of this scan operator will be 64/2=32.
// For a query who has two scan nodes, the _max_thread_num of each scan node instance will be 128 / 32 = 4.
// We have 32 instances of this scan operator, so for the ScanNode, we have 4 * 32 = 128 scanner tasks can be submitted at a time.
// Remember that we have to ScanNode in this query, so the total number of scanner tasks can be submitted at a time is 128 * 2 = 256.
_max_thread_num =
_state->num_scanner_threads() > 0
? _state->num_scanner_threads()
: config::doris_scanner_thread_pool_thread_num / num_parallel_instances;
// For a query who has one scan nodes, the _max_thread_num of each scan node instance will be 4 * 128 / 32 = 16.
// We have 32 instances of this scan operator, so for the ScanNode, we have 16 * 32 = 8 * 64 = 512 scanner tasks can be submitted at a time.
_max_thread_num = _state->num_scanner_threads() > 0 ? _state->num_scanner_threads() : 0;

if (_max_thread_num == 0) {
// NOTE: When ignore_data_distribution is true, the parallelism
// of the scan operator is regarded as 1 (actually maybe not).
// That will make the number of scan task can be submitted to the scheduler
// in a vary large value. This logicl is kept from the older implementation.
if (submit_many_scan_tasks_for_potential_performance_issue || _ignore_data_distribution) {
_max_thread_num = config::doris_scanner_thread_pool_thread_num / 1;
} else {
_max_thread_num =
4 * (config::doris_scanner_thread_pool_thread_num / num_parallel_instances);
// In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed
// in parallel. We need to make sure the _max_thread_num is smaller than previous value.
_max_thread_num =
std::min(_max_thread_num, config::doris_scanner_thread_pool_thread_num);
}
}

_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
// In some situation, there are not too many big tablets involed, so we can reduce the thread number.
_max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());
// NOTE: when _all_scanners.size is zero, the _max_thread_num will be 0.
_max_thread_num = std::min(_max_thread_num, (int32_t)_all_scanners.size());

// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if (_local_state->should_run_serial()) {
_max_thread_num = 1;
}

// when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
// becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
// you can refer https://github.com/apache/doris/issues/35340 for details.
int32_t max_column_reader_num = state->query_options().max_column_reader_num;
int32_t max_column_reader_num = _state->query_options().max_column_reader_num;
if (_max_thread_num != 1 && max_column_reader_num > 0) {
int32_t scan_column_num = _output_tuple_desc->slots().size();
int32_t current_column_num = scan_column_num * _max_thread_num;
Expand All @@ -110,46 +199,15 @@ ScannerContext::ScannerContext(
if (new_max_thread_num < _max_thread_num) {
int32_t origin_max_thread_num = _max_thread_num;
_max_thread_num = new_max_thread_num;
LOG(INFO) << "downgrade query:" << print_id(state->query_id())
LOG(INFO) << "downgrade query:" << print_id(_state->query_id())
<< " scan's max_thread_num from " << origin_max_thread_num << " to "
<< _max_thread_num << ",column num: " << scan_column_num
<< ", max_column_reader_num: " << max_column_reader_num;
}
}
}

_query_thread_context = {_query_id, _state->query_mem_tracker(),
_state->get_query_ctx()->workload_group()};
_dependency = dependency;

DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
}

// After init function call, should not access _parent
Status ScannerContext::init() {
_scanner_profile = _local_state->_scanner_profile;
_scanner_sched_counter = _local_state->_scanner_sched_counter;
_newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
_scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
_scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
_scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;
_scanner_memory_used_counter = _local_state->_memory_used_counter;

#ifndef BE_TEST
// 3. get thread token
if (_state->get_query_ctx()) {
thread_token = _state->get_query_ctx()->get_token();
_simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler();
if (_simple_scan_scheduler) {
_should_reset_thread_name = false;
}
_remote_scan_task_scheduler = _state->get_query_ctx()->get_remote_scan_scheduler();
}
#endif

COUNTER_SET(_local_state->_max_scanner_thread_num, (int64_t)_max_thread_num);
_local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
thread_token == nullptr ? "False" : "True");

// submit `_max_thread_num` running scanners to `ScannerScheduler`
// When a running scanners is finished, it will submit one of the remaining scanners.
Expand Down Expand Up @@ -205,7 +263,7 @@ bool ScannerContext::empty_in_queue(int id) {
Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
_scanner_sched_counter->update(1);
_num_scheduled_scanners++;
return _scanner_scheduler->submit(shared_from_this(), scan_task);
return _scanner_scheduler_global->submit(shared_from_this(), scan_task);
}

void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> scan_task) {
Expand Down
18 changes: 7 additions & 11 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,8 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, int64_t max_bytes_in_blocks_queue,
std::shared_ptr<pipeline::Dependency> dependency,
const int num_parallel_instances);
int64_t limit_, std::shared_ptr<pipeline::Dependency> dependency,
bool ignore_data_distribution);

~ScannerContext() override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
Expand Down Expand Up @@ -162,9 +161,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,

bool empty_in_queue(int id);

SimplifiedScanScheduler* get_simple_scan_scheduler() { return _simple_scan_scheduler; }

SimplifiedScanScheduler* get_remote_scan_scheduler() { return _remote_scan_task_scheduler; }
SimplifiedScanScheduler* get_scan_scheduler() { return _scanner_scheduler; }

void stop_scanners(RuntimeState* state);

Expand Down Expand Up @@ -212,17 +209,15 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
int64_t limit;

int32_t _max_thread_num = 0;
int64_t _max_bytes_in_queue;
doris::vectorized::ScannerScheduler* _scanner_scheduler;
SimplifiedScanScheduler* _simple_scan_scheduler = nullptr;
SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
int64_t _max_bytes_in_queue = 0;
doris::vectorized::ScannerScheduler* _scanner_scheduler_global = nullptr;
SimplifiedScanScheduler* _scanner_scheduler = nullptr;
moodycamel::ConcurrentQueue<std::weak_ptr<ScannerDelegate>> _scanners;
int32_t _num_scheduled_scanners = 0;
int32_t _num_finished_scanners = 0;
int32_t _num_running_scanners = 0;
// weak pointer for _scanners, used in stop function
std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
const int _num_parallel_instances;
std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
// This counter refers to scan operator's local state
Expand All @@ -233,6 +228,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
QueryThreadContext _query_thread_context;
std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
bool _ignore_data_distribution = false;

// for scaling up the running scanners
size_t _estimated_block_size = 0;
Expand Down
8 changes: 1 addition & 7 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,7 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
scanner_delegate->_scanner->start_wait_worker_timer();
TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
auto sumbit_task = [&]() {
bool is_local = type == TabletStorageType::STORAGE_TYPE_LOCAL;
SimplifiedScanScheduler* scan_sched =
is_local ? ctx->get_simple_scan_scheduler() : ctx->get_remote_scan_scheduler();
if (!scan_sched) { // query without workload group
scan_sched =
is_local ? _local_scan_thread_pool.get() : _remote_scan_thread_pool.get();
}
SimplifiedScanScheduler* scan_sched = ctx->get_scan_scheduler();
auto work_func = [scanner_ref = scan_task, ctx]() {
DorisMetrics::instance()->scanner_task_queued->increment(-1);
DorisMetrics::instance()->scanner_task_running->increment(1);
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ class ScannerScheduler {

static int get_remote_scan_thread_queue_size();

SimplifiedScanScheduler* get_local_scan_thread_pool() { return _local_scan_thread_pool.get(); }

SimplifiedScanScheduler* get_remote_scan_thread_pool() {
return _remote_scan_thread_pool.get();
}

private:
static void _scanner_scan(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,10 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
public long maxExecMemByte = 2147483648L;

@VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT)
@VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT,
description = {"每个 Scan Instance 的 block queue 能够保存多少字节的 block",
"How many bytes of block can be saved in the block queue of each Scan Instance"})
// 100MB
public long maxScanQueueMemByte = 2147483648L / 20;

@VariableMgr.VarAttr(name = NUM_SCANNER_THREADS, needForward = true, description = {
Expand Down

0 comments on commit 2fcc148

Please sign in to comment.