diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index a7bb275cf6fe4e1..1fdee450b74a17f 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -994,6 +994,9 @@ Status ScanLocalState::_start_scanners( _scanner_ctx = vectorized::ScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), state()->scan_queue_mem_limit(), _scan_dependency, + // NOTE: This will logic makes _max_thread_num of ScannerContext to be C(num of cores) * 2 + // For a query with C/2 instance and M scan node, scan task of this query will be C/2 * M * C*2 + // and will be C*C*N at most. // 1. If data distribution is ignored , we use 1 instance to scan. // 2. Else if this operator is not file scan operator, we use config::doris_scanner_thread_pool_thread_num scanners to scan. // 3. Else, file scanner will consume much memory so we use config::doris_scanner_thread_pool_thread_num / query_parallel_instance_num scanners to scan. @@ -1054,6 +1057,9 @@ Status ScanLocalState::_init_profile() { _memory_usage_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_scanner_profile, "MemoryUsage", 1); _free_blocks_memory_usage = _scanner_profile->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES, "MemoryUsage", 1); + _scanner_peak_memory_usage = + _scanner_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES); + _newly_create_free_blocks_num = ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT); _scale_up_scanners_counter = ADD_COUNTER(_scanner_profile, "NumScaleUpScanners", TUnit::UNIT); @@ -1072,6 +1078,8 @@ Status ScanLocalState::_init_profile() { _max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT); + _peak_running_scanner = + _scanner_profile->AddHighWaterMarkCounter("PeakRunningScanner", TUnit::UNIT); return Status::OK(); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index e106c5a2b513836..0d0f1683a79ac22 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -109,6 +109,8 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public RuntimeFilterCon RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; // Max num of scanner thread RuntimeProfile::Counter* _max_scanner_thread_num = nullptr; + RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr; + RuntimeProfile::HighWaterMarkCounter* _scanner_peak_memory_usage = nullptr; // time of get block from scanner RuntimeProfile::Counter* _scan_timer = nullptr; RuntimeProfile::Counter* _scan_cpu_timer = nullptr; diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 52838d7cf4651fd..cbb3d0f572365b2 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -73,11 +73,23 @@ ScannerContext::ScannerContext( limit = -1; } MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio(); + // _max_thread_num controls how many scanners of this ScanOperator can be submitted to scheduler at a time. + // The overall target of our system is to make full utilization of the resources. + // At the same time, we dont want too many tasks are queued by scheduler, that makes the query + // waiting too long, and existing task can not be scheduled in time. + // First of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than + // config::doris_scanner_thread_pool_thread_num. + // For example, on a 64-core machine, the default value of config::doris_scanner_thread_pool_thread_num will be 64*2 =128. + // and the num_parallel_instances of this scan operator will be 64/2=32. + // For a query who has two scan nodes, the _max_thread_num of each scan node instance will be 128 / 32 = 4. + // We have 32 instances of this scan operator, so for the ScanNode, we have 4 * 32 = 128 scanner tasks can be submitted at a time. + // Remember that we have to ScanNode in this query, so the total number of scanner tasks can be submitted at a time is 128 * 2 = 256. _max_thread_num = _state->num_scanner_threads() > 0 ? _state->num_scanner_threads() : config::doris_scanner_thread_pool_thread_num / num_parallel_instances; _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; + // In some situation, there are not too many big tablets involed, so we can reduce the thread number. _max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size()); // 1. Calculate max concurrency // For select * from table limit 10; should just use one thread. @@ -116,7 +128,6 @@ Status ScannerContext::init() { _scanner_sched_counter = _local_state->_scanner_sched_counter; _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num; _scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer; - _free_blocks_memory_usage_mark = _local_state->_free_blocks_memory_usage; _scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time; _scale_up_scanners_counter = _local_state->_scale_up_scanners_counter; @@ -157,9 +168,11 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool force) { vectorized::BlockUPtr block = nullptr; if (_free_blocks.try_dequeue(block)) { DCHECK(block->mem_reuse()); - _free_blocks_memory_usage -= block->allocated_bytes(); - _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); - } else if (_free_blocks_memory_usage < _max_bytes_in_queue || force) { + _block_memory_usage -= block->allocated_bytes(); + // A free block is reused, so the memory usage should be decreased + // The caller of get_free_block will increase the memory usage + update_peak_memory_usage(-block->allocated_bytes()); + } else if (_block_memory_usage < _max_bytes_in_queue || force) { _newly_create_free_blocks_num->update(1); block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 0, true /*ignore invalid slots*/); @@ -168,11 +181,13 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool force) { } void ScannerContext::return_free_block(vectorized::BlockUPtr block) { - if (block->mem_reuse() && _free_blocks_memory_usage < _max_bytes_in_queue) { - _free_blocks_memory_usage += block->allocated_bytes(); - _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); + if (block->mem_reuse() && _block_memory_usage < _max_bytes_in_queue) { + size_t block_size_to_reuse = block->allocated_bytes(); + _block_memory_usage += block_size_to_reuse; block->clear_column_data(); - _free_blocks.enqueue(std::move(block)); + if (_free_blocks.enqueue(std::move(block))) { + update_peak_memory_usage(block_size_to_reuse); + } } } @@ -247,8 +262,8 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo if (_estimated_block_size > block_size) { _estimated_block_size = block_size; } - _free_blocks_memory_usage -= block_size; - _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); + _block_memory_usage -= block_size; + update_peak_memory_usage(-current_block->allocated_bytes()); // consume current block block->swap(*current_block); return_free_block(std::move(current_block)); @@ -275,8 +290,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo for (int i = 0; i < free_blocks_for_each; ++i) { vectorized::BlockUPtr removed_block; if (_free_blocks.try_dequeue(removed_block)) { - _free_blocks_memory_usage -= block->allocated_bytes(); - _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); + _block_memory_usage -= block->allocated_bytes(); } } } @@ -331,8 +345,7 @@ Status ScannerContext::_try_to_scale_up() { int num_add = int(std::min(_num_running_scanners * SCALE_UP_RATIO, _max_thread_num * MAX_SCALE_UP_RATIO - _num_running_scanners)); if (_estimated_block_size > 0) { - int most_add = - (_max_bytes_in_queue - _free_blocks_memory_usage) / _estimated_block_size; + int most_add = (_max_bytes_in_queue - _block_memory_usage) / _estimated_block_size; num_add = std::min(num_add, most_add); } for (int i = 0; i < num_add; ++i) { @@ -467,4 +480,12 @@ void ScannerContext::_set_scanner_done() { _dependency->set_always_ready(); } +void ScannerContext::update_peak_running_scanner(int num) { + _local_state->_peak_running_scanner->add(num); +} + +void ScannerContext::update_peak_memory_usage(int64_t usage) { + _local_state->_scanner_peak_memory_usage->add(usage); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 972ec3a6b306564..03c4e5a4f1bba7c 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -122,10 +122,12 @@ class ScannerContext : public std::enable_shared_from_this, vectorized::BlockUPtr get_free_block(bool force); void return_free_block(vectorized::BlockUPtr block); - inline void inc_free_block_usage(size_t usage) { - _free_blocks_memory_usage += usage; - _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); - } + inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; } + + // Caller should make sure the pipeline task is still running when calling this function + void update_peak_running_scanner(int num); + // Caller should make sure the pipeline task is still running when calling this function + void update_peak_memory_usage(int64_t usage); // Get next block from blocks queue. Called by ScanNode/ScanOperator // Set eos to true if there is no more data to read. @@ -223,7 +225,6 @@ class ScannerContext : public std::enable_shared_from_this, RuntimeProfile::Counter* _scanner_sched_counter = nullptr; RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr; - RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage_mark = nullptr; RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr; RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr; QueryThreadContext _query_thread_context; @@ -231,7 +232,7 @@ class ScannerContext : public std::enable_shared_from_this, // for scaling up the running scanners size_t _estimated_block_size = 0; - std::atomic_long _free_blocks_memory_usage = 0; + std::atomic_long _block_memory_usage = 0; int64_t _last_scale_up_time = 0; int64_t _last_fetch_time = 0; int64_t _total_wait_block_time = 0; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 7f868fba5a666e2..444ff4dbb0cd9f5 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -34,6 +34,7 @@ #include "common/status.h" #include "olap/tablet.h" #include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "util/async_io.h" // IWYU pragma: keep @@ -213,6 +214,9 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, return; } + ctx->update_peak_running_scanner(1); + Defer defer([&] { ctx->update_peak_running_scanner(-1); }); + std::shared_ptr scanner_delegate = scan_task->scanner.lock(); if (scanner_delegate == nullptr) { return; @@ -270,13 +274,18 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, if (free_block == nullptr) { break; } + // We got a new created block or a reused block. + ctx->update_peak_memory_usage(free_block->allocated_bytes()); + ctx->update_peak_memory_usage(-free_block->allocated_bytes()); status = scanner->get_block_after_projects(state, free_block.get(), &eos); + // Projection will truncate useless columns, makes block size change. + auto free_block_bytes = free_block->allocated_bytes(); + ctx->update_peak_memory_usage(free_block_bytes); first_read = false; if (!status.ok()) { LOG(WARNING) << "Scan thread read VScanner failed: " << status.to_string(); break; } - auto free_block_bytes = free_block->allocated_bytes(); raw_bytes_read += free_block_bytes; if (!scan_task->cached_blocks.empty() && scan_task->cached_blocks.back().first->rows() + free_block->rows() <= @@ -284,18 +293,25 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes(); vectorized::MutableBlock mutable_block( scan_task->cached_blocks.back().first.get()); + ctx->update_peak_memory_usage(-mutable_block.allocated_bytes()); status = mutable_block.merge(*free_block); + ctx->update_peak_memory_usage(mutable_block.allocated_bytes()); if (!status.ok()) { LOG(WARNING) << "Block merge failed: " << status.to_string(); break; } + scan_task->cached_blocks.back().second = mutable_block.allocated_bytes(); scan_task->cached_blocks.back().first.get()->set_columns( std::move(mutable_block.mutable_columns())); + + // Return block succeed or not, this free_block is not used by this scan task any more. + ctx->update_peak_memory_usage(-free_block_bytes); + // If block can be reused, its memory usage will be added back. ctx->return_free_block(std::move(free_block)); - ctx->inc_free_block_usage( - scan_task->cached_blocks.back().first->allocated_bytes() - block_size); + ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() - + block_size); } else { - ctx->inc_free_block_usage(free_block->allocated_bytes()); + ctx->inc_block_usage(free_block->allocated_bytes()); scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes); } } // end for while diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 43d791caffae026..a78f8956025cb99 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -143,6 +143,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { } if (state->is_cancelled()) { + // TODO: Should return the specific ErrorStatus instead of just Cancelled. return Status::Cancelled("cancelled"); } *eof = *eof || _should_stop; diff --git a/regression-test/suites/query_profile/scanner_profile.groovy b/regression-test/suites/query_profile/scanner_profile.groovy new file mode 100644 index 000000000000000..38216d211e65ea2 --- /dev/null +++ b/regression-test/suites/query_profile/scanner_profile.groovy @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.json.JsonOutput +import groovy.json.JsonSlurper +import groovy.json.StringEscapeUtils + + +def getProfileList = { + def dst = 'http://' + context.config.feHttpAddress + def conn = new URL(dst + "/rest/v1/query_profile").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + return conn.getInputStream().getText() +} + + +def getProfile = { id -> + def dst = 'http://' + context.config.feHttpAddress + def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + return conn.getInputStream().getText() +} + +suite('scanner_profile') { + sql """ + DROP TABLE IF EXISTS scanner_profile; + """ + sql """ + CREATE TABLE if not exists `scanner_profile` ( + `id` INT, + `name` varchar(32) + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`id`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + // Insert data to table + sql """ + insert into scanner_profile values + (1, "A"),(2, "B"),(3, "C"),(4, "D"),(5,"E"),(6,"F"),(7,"G"),(8,"H"),(9,"K"); + """ + sql """ + insert into scanner_profile values + (10, "A"),(20, "B"),(30, "C"),(40, "D"),(50,"E"),(60,"F"),(70,"G"),(80,"H"),(90,"K"); + """ + sql """ + insert into scanner_profile values + (101, "A"),(201, "B"),(301, "C"),(401, "D"),(501,"E"),(601,"F"),(701,"G"),(801,"H"),(901,"K"); + """ + sql """ + insert into scanner_profile values + (1010, "A"),(2010, "B"),(3010, "C"),(4010, "D"),(5010,"E"),(6010,"F"),(7010,"G"),(8010,"H"),(9010,"K"); + """ + + def uuidString = UUID.randomUUID().toString() + sql "set enable_profile=true" + // With Limit, MaxScannerThreadNum = 1 + sql """ + select "with_limit_1_${uuidString}", * from scanner_profile limit 10; + """ + + def wholeString = getProfileList() + List profileData = new JsonSlurper().parseText(wholeString).data.rows + String queryIdWithLimit1 = ""; + + + logger.info("{}", uuidString) + + for (def profileItem in profileData) { + if (profileItem["Sql Statement"].toString().contains("with_limit_1_${uuidString}")) { + queryIdWithLimit1 = profileItem["Profile ID"].toString() + logger.info("profileItem: {}", profileItem) + } + } + + logger.info("queryIdWithLimit1_${uuidString}: {}", queryIdWithLimit1) + + assertTrue(queryIdWithLimit1 != "") + def String profileWithLimit1 = getProfile(queryIdWithLimit1).toString() + logger.info("query profile {}", profileWithLimit1) + assertTrue(profileWithLimit1.contains("- PeakRunningScanner: 1")) +} \ No newline at end of file