Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](scanner profile) More counter for scanner #40144 #41059

Open
wants to merge 1 commit into
base: branch-3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,9 @@ Status ScanLocalState<Derived>::_start_scanners(
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
state()->scan_queue_mem_limit(), _scan_dependency,
// NOTE: This will logic makes _max_thread_num of ScannerContext to be C(num of cores) * 2
// For a query with C/2 instance and M scan node, scan task of this query will be C/2 * M * C*2
// and will be C*C*N at most.
// 1. If data distribution is ignored , we use 1 instance to scan.
// 2. Else if this operator is not file scan operator, we use config::doris_scanner_thread_pool_thread_num scanners to scan.
// 3. Else, file scanner will consume much memory so we use config::doris_scanner_thread_pool_thread_num / query_parallel_instance_num scanners to scan.
Expand Down Expand Up @@ -1054,6 +1057,9 @@ Status ScanLocalState<Derived>::_init_profile() {
_memory_usage_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_scanner_profile, "MemoryUsage", 1);
_free_blocks_memory_usage =
_scanner_profile->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES, "MemoryUsage", 1);
_scanner_peak_memory_usage =
_scanner_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES);

_newly_create_free_blocks_num =
ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT);
_scale_up_scanners_counter = ADD_COUNTER(_scanner_profile, "NumScaleUpScanners", TUnit::UNIT);
Expand All @@ -1072,6 +1078,8 @@ Status ScanLocalState<Derived>::_init_profile() {

_max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT);

_peak_running_scanner =
_scanner_profile->AddHighWaterMarkCounter("PeakRunningScanner", TUnit::UNIT);
return Status::OK();
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public RuntimeFilterCon
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
// Max num of scanner thread
RuntimeProfile::Counter* _max_scanner_thread_num = nullptr;
RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr;
RuntimeProfile::HighWaterMarkCounter* _scanner_peak_memory_usage = nullptr;
// time of get block from scanner
RuntimeProfile::Counter* _scan_timer = nullptr;
RuntimeProfile::Counter* _scan_cpu_timer = nullptr;
Expand Down
49 changes: 35 additions & 14 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,23 @@ ScannerContext::ScannerContext(
limit = -1;
}
MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio();
// _max_thread_num controls how many scanners of this ScanOperator can be submitted to scheduler at a time.
// The overall target of our system is to make full utilization of the resources.
// At the same time, we dont want too many tasks are queued by scheduler, that makes the query
// waiting too long, and existing task can not be scheduled in time.
// First of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than
// config::doris_scanner_thread_pool_thread_num.
// For example, on a 64-core machine, the default value of config::doris_scanner_thread_pool_thread_num will be 64*2 =128.
// and the num_parallel_instances of this scan operator will be 64/2=32.
// For a query who has two scan nodes, the _max_thread_num of each scan node instance will be 128 / 32 = 4.
// We have 32 instances of this scan operator, so for the ScanNode, we have 4 * 32 = 128 scanner tasks can be submitted at a time.
// Remember that we have to ScanNode in this query, so the total number of scanner tasks can be submitted at a time is 128 * 2 = 256.
_max_thread_num =
_state->num_scanner_threads() > 0
? _state->num_scanner_threads()
: config::doris_scanner_thread_pool_thread_num / num_parallel_instances;
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
// In some situation, there are not too many big tablets involed, so we can reduce the thread number.
_max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());
// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
Expand Down Expand Up @@ -116,7 +128,6 @@ Status ScannerContext::init() {
_scanner_sched_counter = _local_state->_scanner_sched_counter;
_newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
_scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
_free_blocks_memory_usage_mark = _local_state->_free_blocks_memory_usage;
_scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
_scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;

Expand Down Expand Up @@ -157,9 +168,11 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool force) {
vectorized::BlockUPtr block = nullptr;
if (_free_blocks.try_dequeue(block)) {
DCHECK(block->mem_reuse());
_free_blocks_memory_usage -= block->allocated_bytes();
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
} else if (_free_blocks_memory_usage < _max_bytes_in_queue || force) {
_block_memory_usage -= block->allocated_bytes();
// A free block is reused, so the memory usage should be decreased
// The caller of get_free_block will increase the memory usage
update_peak_memory_usage(-block->allocated_bytes());
} else if (_block_memory_usage < _max_bytes_in_queue || force) {
_newly_create_free_blocks_num->update(1);
block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 0,
true /*ignore invalid slots*/);
Expand All @@ -168,11 +181,13 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool force) {
}

void ScannerContext::return_free_block(vectorized::BlockUPtr block) {
if (block->mem_reuse() && _free_blocks_memory_usage < _max_bytes_in_queue) {
_free_blocks_memory_usage += block->allocated_bytes();
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
if (block->mem_reuse() && _block_memory_usage < _max_bytes_in_queue) {
size_t block_size_to_reuse = block->allocated_bytes();
_block_memory_usage += block_size_to_reuse;
block->clear_column_data();
_free_blocks.enqueue(std::move(block));
if (_free_blocks.enqueue(std::move(block))) {
update_peak_memory_usage(block_size_to_reuse);
}
}
}

Expand Down Expand Up @@ -247,8 +262,8 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
if (_estimated_block_size > block_size) {
_estimated_block_size = block_size;
}
_free_blocks_memory_usage -= block_size;
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
_block_memory_usage -= block_size;
update_peak_memory_usage(-current_block->allocated_bytes());
// consume current block
block->swap(*current_block);
return_free_block(std::move(current_block));
Expand All @@ -275,8 +290,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
for (int i = 0; i < free_blocks_for_each; ++i) {
vectorized::BlockUPtr removed_block;
if (_free_blocks.try_dequeue(removed_block)) {
_free_blocks_memory_usage -= block->allocated_bytes();
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
_block_memory_usage -= block->allocated_bytes();
}
}
}
Expand Down Expand Up @@ -331,8 +345,7 @@ Status ScannerContext::_try_to_scale_up() {
int num_add = int(std::min(_num_running_scanners * SCALE_UP_RATIO,
_max_thread_num * MAX_SCALE_UP_RATIO - _num_running_scanners));
if (_estimated_block_size > 0) {
int most_add =
(_max_bytes_in_queue - _free_blocks_memory_usage) / _estimated_block_size;
int most_add = (_max_bytes_in_queue - _block_memory_usage) / _estimated_block_size;
num_add = std::min(num_add, most_add);
}
for (int i = 0; i < num_add; ++i) {
Expand Down Expand Up @@ -467,4 +480,12 @@ void ScannerContext::_set_scanner_done() {
_dependency->set_always_ready();
}

void ScannerContext::update_peak_running_scanner(int num) {
_local_state->_peak_running_scanner->add(num);
}

void ScannerContext::update_peak_memory_usage(int64_t usage) {
_local_state->_scanner_peak_memory_usage->add(usage);
}

} // namespace doris::vectorized
13 changes: 7 additions & 6 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,

vectorized::BlockUPtr get_free_block(bool force);
void return_free_block(vectorized::BlockUPtr block);
inline void inc_free_block_usage(size_t usage) {
_free_blocks_memory_usage += usage;
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
}
inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; }

// Caller should make sure the pipeline task is still running when calling this function
void update_peak_running_scanner(int num);
// Caller should make sure the pipeline task is still running when calling this function
void update_peak_memory_usage(int64_t usage);

// Get next block from blocks queue. Called by ScanNode/ScanOperator
// Set eos to true if there is no more data to read.
Expand Down Expand Up @@ -223,15 +225,14 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage_mark = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
QueryThreadContext _query_thread_context;
std::shared_ptr<pipeline::Dependency> _dependency = nullptr;

// for scaling up the running scanners
size_t _estimated_block_size = 0;
std::atomic_long _free_blocks_memory_usage = 0;
std::atomic_long _block_memory_usage = 0;
int64_t _last_scale_up_time = 0;
int64_t _last_fetch_time = 0;
int64_t _total_wait_block_time = 0;
Expand Down
24 changes: 20 additions & 4 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "common/status.h"
#include "olap/tablet.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/async_io.h" // IWYU pragma: keep
Expand Down Expand Up @@ -213,6 +214,9 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
return;
}

ctx->update_peak_running_scanner(1);
Defer defer([&] { ctx->update_peak_running_scanner(-1); });

std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
if (scanner_delegate == nullptr) {
return;
Expand Down Expand Up @@ -270,32 +274,44 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
if (free_block == nullptr) {
break;
}
// We got a new created block or a reused block.
ctx->update_peak_memory_usage(free_block->allocated_bytes());
ctx->update_peak_memory_usage(-free_block->allocated_bytes());
status = scanner->get_block_after_projects(state, free_block.get(), &eos);
// Projection will truncate useless columns, makes block size change.
auto free_block_bytes = free_block->allocated_bytes();
ctx->update_peak_memory_usage(free_block_bytes);
first_read = false;
if (!status.ok()) {
LOG(WARNING) << "Scan thread read VScanner failed: " << status.to_string();
break;
}
auto free_block_bytes = free_block->allocated_bytes();
raw_bytes_read += free_block_bytes;
if (!scan_task->cached_blocks.empty() &&
scan_task->cached_blocks.back().first->rows() + free_block->rows() <=
ctx->batch_size()) {
size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes();
vectorized::MutableBlock mutable_block(
scan_task->cached_blocks.back().first.get());
ctx->update_peak_memory_usage(-mutable_block.allocated_bytes());
status = mutable_block.merge(*free_block);
ctx->update_peak_memory_usage(mutable_block.allocated_bytes());
if (!status.ok()) {
LOG(WARNING) << "Block merge failed: " << status.to_string();
break;
}
scan_task->cached_blocks.back().second = mutable_block.allocated_bytes();
scan_task->cached_blocks.back().first.get()->set_columns(
std::move(mutable_block.mutable_columns()));

// Return block succeed or not, this free_block is not used by this scan task any more.
ctx->update_peak_memory_usage(-free_block_bytes);
// If block can be reused, its memory usage will be added back.
ctx->return_free_block(std::move(free_block));
ctx->inc_free_block_usage(
scan_task->cached_blocks.back().first->allocated_bytes() - block_size);
ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
block_size);
} else {
ctx->inc_free_block_usage(free_block->allocated_bytes());
ctx->inc_block_usage(free_block->allocated_bytes());
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
}
} // end for while
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/vscanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
}

if (state->is_cancelled()) {
// TODO: Should return the specific ErrorStatus instead of just Cancelled.
return Status::Cancelled("cancelled");
}
*eof = *eof || _should_stop;
Expand Down
104 changes: 104 additions & 0 deletions regression-test/suites/query_profile/scanner_profile.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import groovy.json.StringEscapeUtils


def getProfileList = {
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
return conn.getInputStream().getText()
}


def getProfile = { id ->
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
return conn.getInputStream().getText()
}

suite('scanner_profile') {
sql """
DROP TABLE IF EXISTS scanner_profile;
"""
sql """
CREATE TABLE if not exists `scanner_profile` (
`id` INT,
`name` varchar(32)
) ENGINE=OLAP
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

// Insert data to table
sql """
insert into scanner_profile values
(1, "A"),(2, "B"),(3, "C"),(4, "D"),(5,"E"),(6,"F"),(7,"G"),(8,"H"),(9,"K");
"""
sql """
insert into scanner_profile values
(10, "A"),(20, "B"),(30, "C"),(40, "D"),(50,"E"),(60,"F"),(70,"G"),(80,"H"),(90,"K");
"""
sql """
insert into scanner_profile values
(101, "A"),(201, "B"),(301, "C"),(401, "D"),(501,"E"),(601,"F"),(701,"G"),(801,"H"),(901,"K");
"""
sql """
insert into scanner_profile values
(1010, "A"),(2010, "B"),(3010, "C"),(4010, "D"),(5010,"E"),(6010,"F"),(7010,"G"),(8010,"H"),(9010,"K");
"""

def uuidString = UUID.randomUUID().toString()
sql "set enable_profile=true"
// With Limit, MaxScannerThreadNum = 1
sql """
select "with_limit_1_${uuidString}", * from scanner_profile limit 10;
"""

def wholeString = getProfileList()
List profileData = new JsonSlurper().parseText(wholeString).data.rows
String queryIdWithLimit1 = "";


logger.info("{}", uuidString)

for (def profileItem in profileData) {
if (profileItem["Sql Statement"].toString().contains("with_limit_1_${uuidString}")) {
queryIdWithLimit1 = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
}

logger.info("queryIdWithLimit1_${uuidString}: {}", queryIdWithLimit1)

assertTrue(queryIdWithLimit1 != "")
def String profileWithLimit1 = getProfile(queryIdWithLimit1).toString()
logger.info("query profile {}", profileWithLimit1)
assertTrue(profileWithLimit1.contains("- PeakRunningScanner: 1"))
}
Loading