Skip to content

Commit

Permalink
[opt](scanner profile) More counter for scanner (#40144)
Browse files Browse the repository at this point in the history
New profile metrics to monitor the schedule process of scanner.
```
VScanner:
    ...
    -  PeakMemoryUsage:  16.31  MB
    -  PeakRunningScanner:  1
    ...
```
In general, the value of `PeakMemoryUsage` is increased when any of the
scan tasks gets a block, decreased when ScanOperator consumes block from
block_queue.
  • Loading branch information
zhiqiang-hhhh committed Sep 23, 2024
1 parent e4dc631 commit acb4ee8
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 24 deletions.
8 changes: 8 additions & 0 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,9 @@ Status ScanLocalState<Derived>::_start_scanners(
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
state()->scan_queue_mem_limit(), _scan_dependency,
// NOTE: This will logic makes _max_thread_num of ScannerContext to be C(num of cores) * 2
// For a query with C/2 instance and M scan node, scan task of this query will be C/2 * M * C*2
// and will be C*C*N at most.
// 1. If data distribution is ignored , we use 1 instance to scan.
// 2. Else if this operator is not file scan operator, we use config::doris_scanner_thread_pool_thread_num scanners to scan.
// 3. Else, file scanner will consume much memory so we use config::doris_scanner_thread_pool_thread_num / query_parallel_instance_num scanners to scan.
Expand Down Expand Up @@ -1054,6 +1057,9 @@ Status ScanLocalState<Derived>::_init_profile() {
_memory_usage_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_scanner_profile, "MemoryUsage", 1);
_free_blocks_memory_usage =
_scanner_profile->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES, "MemoryUsage", 1);
_scanner_peak_memory_usage =
_scanner_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES);

_newly_create_free_blocks_num =
ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT);
_scale_up_scanners_counter = ADD_COUNTER(_scanner_profile, "NumScaleUpScanners", TUnit::UNIT);
Expand All @@ -1072,6 +1078,8 @@ Status ScanLocalState<Derived>::_init_profile() {

_max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT);

_peak_running_scanner =
_scanner_profile->AddHighWaterMarkCounter("PeakRunningScanner", TUnit::UNIT);
return Status::OK();
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public RuntimeFilterCon
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
// Max num of scanner thread
RuntimeProfile::Counter* _max_scanner_thread_num = nullptr;
RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr;
RuntimeProfile::HighWaterMarkCounter* _scanner_peak_memory_usage = nullptr;
// time of get block from scanner
RuntimeProfile::Counter* _scan_timer = nullptr;
RuntimeProfile::Counter* _scan_cpu_timer = nullptr;
Expand Down
49 changes: 35 additions & 14 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,23 @@ ScannerContext::ScannerContext(
limit = -1;
}
MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio();
// _max_thread_num controls how many scanners of this ScanOperator can be submitted to scheduler at a time.
// The overall target of our system is to make full utilization of the resources.
// At the same time, we dont want too many tasks are queued by scheduler, that makes the query
// waiting too long, and existing task can not be scheduled in time.
// First of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than
// config::doris_scanner_thread_pool_thread_num.
// For example, on a 64-core machine, the default value of config::doris_scanner_thread_pool_thread_num will be 64*2 =128.
// and the num_parallel_instances of this scan operator will be 64/2=32.
// For a query who has two scan nodes, the _max_thread_num of each scan node instance will be 128 / 32 = 4.
// We have 32 instances of this scan operator, so for the ScanNode, we have 4 * 32 = 128 scanner tasks can be submitted at a time.
// Remember that we have to ScanNode in this query, so the total number of scanner tasks can be submitted at a time is 128 * 2 = 256.
_max_thread_num =
_state->num_scanner_threads() > 0
? _state->num_scanner_threads()
: config::doris_scanner_thread_pool_thread_num / num_parallel_instances;
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
// In some situation, there are not too many big tablets involed, so we can reduce the thread number.
_max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());
// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
Expand Down Expand Up @@ -116,7 +128,6 @@ Status ScannerContext::init() {
_scanner_sched_counter = _local_state->_scanner_sched_counter;
_newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
_scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
_free_blocks_memory_usage_mark = _local_state->_free_blocks_memory_usage;
_scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
_scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;

Expand Down Expand Up @@ -157,9 +168,11 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool force) {
vectorized::BlockUPtr block = nullptr;
if (_free_blocks.try_dequeue(block)) {
DCHECK(block->mem_reuse());
_free_blocks_memory_usage -= block->allocated_bytes();
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
} else if (_free_blocks_memory_usage < _max_bytes_in_queue || force) {
_block_memory_usage -= block->allocated_bytes();
// A free block is reused, so the memory usage should be decreased
// The caller of get_free_block will increase the memory usage
update_peak_memory_usage(-block->allocated_bytes());
} else if (_block_memory_usage < _max_bytes_in_queue || force) {
_newly_create_free_blocks_num->update(1);
block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 0,
true /*ignore invalid slots*/);
Expand All @@ -168,11 +181,13 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool force) {
}

void ScannerContext::return_free_block(vectorized::BlockUPtr block) {
if (block->mem_reuse() && _free_blocks_memory_usage < _max_bytes_in_queue) {
_free_blocks_memory_usage += block->allocated_bytes();
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
if (block->mem_reuse() && _block_memory_usage < _max_bytes_in_queue) {
size_t block_size_to_reuse = block->allocated_bytes();
_block_memory_usage += block_size_to_reuse;
block->clear_column_data();
_free_blocks.enqueue(std::move(block));
if (_free_blocks.enqueue(std::move(block))) {
update_peak_memory_usage(block_size_to_reuse);
}
}
}

Expand Down Expand Up @@ -247,8 +262,8 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
if (_estimated_block_size > block_size) {
_estimated_block_size = block_size;
}
_free_blocks_memory_usage -= block_size;
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
_block_memory_usage -= block_size;
update_peak_memory_usage(-current_block->allocated_bytes());
// consume current block
block->swap(*current_block);
return_free_block(std::move(current_block));
Expand All @@ -275,8 +290,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
for (int i = 0; i < free_blocks_for_each; ++i) {
vectorized::BlockUPtr removed_block;
if (_free_blocks.try_dequeue(removed_block)) {
_free_blocks_memory_usage -= block->allocated_bytes();
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
_block_memory_usage -= block->allocated_bytes();
}
}
}
Expand Down Expand Up @@ -331,8 +345,7 @@ Status ScannerContext::_try_to_scale_up() {
int num_add = int(std::min(_num_running_scanners * SCALE_UP_RATIO,
_max_thread_num * MAX_SCALE_UP_RATIO - _num_running_scanners));
if (_estimated_block_size > 0) {
int most_add =
(_max_bytes_in_queue - _free_blocks_memory_usage) / _estimated_block_size;
int most_add = (_max_bytes_in_queue - _block_memory_usage) / _estimated_block_size;
num_add = std::min(num_add, most_add);
}
for (int i = 0; i < num_add; ++i) {
Expand Down Expand Up @@ -467,4 +480,12 @@ void ScannerContext::_set_scanner_done() {
_dependency->set_always_ready();
}

void ScannerContext::update_peak_running_scanner(int num) {
_local_state->_peak_running_scanner->add(num);
}

void ScannerContext::update_peak_memory_usage(int64_t usage) {
_local_state->_scanner_peak_memory_usage->add(usage);
}

} // namespace doris::vectorized
13 changes: 7 additions & 6 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,

vectorized::BlockUPtr get_free_block(bool force);
void return_free_block(vectorized::BlockUPtr block);
inline void inc_free_block_usage(size_t usage) {
_free_blocks_memory_usage += usage;
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
}
inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; }

// Caller should make sure the pipeline task is still running when calling this function
void update_peak_running_scanner(int num);
// Caller should make sure the pipeline task is still running when calling this function
void update_peak_memory_usage(int64_t usage);

// Get next block from blocks queue. Called by ScanNode/ScanOperator
// Set eos to true if there is no more data to read.
Expand Down Expand Up @@ -223,15 +225,14 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage_mark = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
QueryThreadContext _query_thread_context;
std::shared_ptr<pipeline::Dependency> _dependency = nullptr;

// for scaling up the running scanners
size_t _estimated_block_size = 0;
std::atomic_long _free_blocks_memory_usage = 0;
std::atomic_long _block_memory_usage = 0;
int64_t _last_scale_up_time = 0;
int64_t _last_fetch_time = 0;
int64_t _total_wait_block_time = 0;
Expand Down
24 changes: 20 additions & 4 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "common/status.h"
#include "olap/tablet.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/async_io.h" // IWYU pragma: keep
Expand Down Expand Up @@ -213,6 +214,9 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
return;
}

ctx->update_peak_running_scanner(1);
Defer defer([&] { ctx->update_peak_running_scanner(-1); });

std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
if (scanner_delegate == nullptr) {
return;
Expand Down Expand Up @@ -270,32 +274,44 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
if (free_block == nullptr) {
break;
}
// We got a new created block or a reused block.
ctx->update_peak_memory_usage(free_block->allocated_bytes());
ctx->update_peak_memory_usage(-free_block->allocated_bytes());
status = scanner->get_block_after_projects(state, free_block.get(), &eos);
// Projection will truncate useless columns, makes block size change.
auto free_block_bytes = free_block->allocated_bytes();
ctx->update_peak_memory_usage(free_block_bytes);
first_read = false;
if (!status.ok()) {
LOG(WARNING) << "Scan thread read VScanner failed: " << status.to_string();
break;
}
auto free_block_bytes = free_block->allocated_bytes();
raw_bytes_read += free_block_bytes;
if (!scan_task->cached_blocks.empty() &&
scan_task->cached_blocks.back().first->rows() + free_block->rows() <=
ctx->batch_size()) {
size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes();
vectorized::MutableBlock mutable_block(
scan_task->cached_blocks.back().first.get());
ctx->update_peak_memory_usage(-mutable_block.allocated_bytes());
status = mutable_block.merge(*free_block);
ctx->update_peak_memory_usage(mutable_block.allocated_bytes());
if (!status.ok()) {
LOG(WARNING) << "Block merge failed: " << status.to_string();
break;
}
scan_task->cached_blocks.back().second = mutable_block.allocated_bytes();
scan_task->cached_blocks.back().first.get()->set_columns(
std::move(mutable_block.mutable_columns()));

// Return block succeed or not, this free_block is not used by this scan task any more.
ctx->update_peak_memory_usage(-free_block_bytes);
// If block can be reused, its memory usage will be added back.
ctx->return_free_block(std::move(free_block));
ctx->inc_free_block_usage(
scan_task->cached_blocks.back().first->allocated_bytes() - block_size);
ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
block_size);
} else {
ctx->inc_free_block_usage(free_block->allocated_bytes());
ctx->inc_block_usage(free_block->allocated_bytes());
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
}
} // end for while
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/vscanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
}

if (state->is_cancelled()) {
// TODO: Should return the specific ErrorStatus instead of just Cancelled.
return Status::Cancelled("cancelled");
}
*eof = *eof || _should_stop;
Expand Down
104 changes: 104 additions & 0 deletions regression-test/suites/query_profile/scanner_profile.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import groovy.json.StringEscapeUtils


def getProfileList = {
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
return conn.getInputStream().getText()
}


def getProfile = { id ->
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
return conn.getInputStream().getText()
}

suite('scanner_profile') {
sql """
DROP TABLE IF EXISTS scanner_profile;
"""
sql """
CREATE TABLE if not exists `scanner_profile` (
`id` INT,
`name` varchar(32)
) ENGINE=OLAP
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

// Insert data to table
sql """
insert into scanner_profile values
(1, "A"),(2, "B"),(3, "C"),(4, "D"),(5,"E"),(6,"F"),(7,"G"),(8,"H"),(9,"K");
"""
sql """
insert into scanner_profile values
(10, "A"),(20, "B"),(30, "C"),(40, "D"),(50,"E"),(60,"F"),(70,"G"),(80,"H"),(90,"K");
"""
sql """
insert into scanner_profile values
(101, "A"),(201, "B"),(301, "C"),(401, "D"),(501,"E"),(601,"F"),(701,"G"),(801,"H"),(901,"K");
"""
sql """
insert into scanner_profile values
(1010, "A"),(2010, "B"),(3010, "C"),(4010, "D"),(5010,"E"),(6010,"F"),(7010,"G"),(8010,"H"),(9010,"K");
"""

def uuidString = UUID.randomUUID().toString()
sql "set enable_profile=true"
// With Limit, MaxScannerThreadNum = 1
sql """
select "with_limit_1_${uuidString}", * from scanner_profile limit 10;
"""

def wholeString = getProfileList()
List profileData = new JsonSlurper().parseText(wholeString).data.rows
String queryIdWithLimit1 = "";


logger.info("{}", uuidString)

for (def profileItem in profileData) {
if (profileItem["Sql Statement"].toString().contains("with_limit_1_${uuidString}")) {
queryIdWithLimit1 = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
}

logger.info("queryIdWithLimit1_${uuidString}: {}", queryIdWithLimit1)

assertTrue(queryIdWithLimit1 != "")
def String profileWithLimit1 = getProfile(queryIdWithLimit1).toString()
logger.info("query profile {}", profileWithLimit1)
assertTrue(profileWithLimit1.contains("- PeakRunningScanner: 1"))
}

0 comments on commit acb4ee8

Please sign in to comment.