Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](scanner) optimize the number of threads of scanners #28640

Merged
merged 1 commit into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "common/status.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "util/cpu_info.h"

namespace doris {
namespace config {
Expand Down Expand Up @@ -235,7 +236,14 @@ DEFINE_Bool(doris_enable_scanner_thread_pool_per_disk, "true");
DEFINE_mInt64(doris_blocking_priority_queue_wait_timeout_ms, "500");
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
DEFINE_Int32(doris_scanner_thread_pool_thread_num, "48");
DEFINE_Int32(doris_scanner_thread_pool_thread_num, "-1");
DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> bool {
if (config == -1) {
CpuInfo::init();
doris_scanner_thread_pool_thread_num = std::max(48, CpuInfo::num_cores() * 4);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::max ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if doris_scanner_thread_pool_thread_num not set, doris_scanner_thread_pool_thread_num >= 48

}
return true;
});
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
// number of olap scanner thread pool queue size
DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400");
Expand Down Expand Up @@ -883,6 +891,8 @@ DEFINE_mInt32(parquet_rowgroup_max_buffer_mb, "128");
// Max buffer size for parquet chunk column
DEFINE_mInt32(parquet_column_max_buffer_mb, "8");
DEFINE_mDouble(max_amplified_read_ratio, "0.8");
DEFINE_mInt32(merged_oss_min_io_size, "1048576");
DEFINE_mInt32(merged_hdfs_min_io_size, "8192");

// OrcReader
DEFINE_mInt32(orc_natural_read_size_mb, "8");
Expand Down
6 changes: 5 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ DECLARE_Bool(doris_enable_scanner_thread_pool_per_disk);
DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms);
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
DECLARE_Int32(doris_scanner_thread_pool_thread_num);
DECLARE_mInt32(doris_scanner_thread_pool_thread_num);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest not removing the old config, for compatibility.
Because there may some user who set these configs already, we have to make sure it can keep same value after upgrade

// max number of remote scanner thread pool size
// if equal to -1, value is std::max(512, CpuInfo::num_cores() * 10)
DECLARE_Int32(doris_max_remote_scanner_thread_pool_thread_num);
Expand Down Expand Up @@ -940,6 +940,10 @@ DECLARE_mInt32(parquet_rowgroup_max_buffer_mb);
DECLARE_mInt32(parquet_column_max_buffer_mb);
// Merge small IO, the max amplified read ratio
DECLARE_mDouble(max_amplified_read_ratio);
// Equivalent min size of each IO that can reach the maximum storage speed limit
// 1MB for oss, 8KB for hdfs
DECLARE_mInt32(merged_oss_min_io_size);
DECLARE_mInt32(merged_hdfs_min_io_size);

// OrcReader
DECLARE_mInt32(orc_natural_read_size_mb);
Expand Down
7 changes: 3 additions & 4 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ class MergeRangeFileReader : public io::FileReader {
static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024; // 8MB
static constexpr size_t BOX_SIZE = 1 * 1024 * 1024; // 1MB
static constexpr size_t SMALL_IO = 2 * 1024 * 1024; // 2MB
static constexpr size_t HDFS_MIN_IO_SIZE = 4 * 1024; // 4KB
static constexpr size_t OSS_MIN_IO_SIZE = 512 * 1024; // 512KB
static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128

MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
Expand All @@ -146,8 +144,9 @@ class MergeRangeFileReader : public io::FileReader {
_is_oss = typeid_cast<io::S3FileReader*>(_reader.get()) != nullptr;
_max_amplified_ratio = config::max_amplified_read_ratio;
// Equivalent min size of each IO that can reach the maximum storage speed limit:
// 512KB for oss, 4KB for hdfs
_equivalent_io_size = _is_oss ? OSS_MIN_IO_SIZE : HDFS_MIN_IO_SIZE;
// 1MB for oss, 8KB for hdfs
_equivalent_io_size =
_is_oss ? config::merged_oss_min_io_size : config::merged_hdfs_min_io_size;
for (const PrefetchRange& range : _random_access_ranges) {
_statistics.apply_bytes += range.end_offset - range.start_offset;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ Status ScannerScheduler::init(ExecEnv* env) {
_remote_thread_pool_max_size = config::doris_max_remote_scanner_thread_pool_thread_num != -1
? config::doris_max_remote_scanner_thread_pool_thread_num
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::max(512, CpuInfo::num_cores() * 10)
this value is for object storage, think it twice.

: std::max(512, CpuInfo::num_cores() * 10);
_remote_thread_pool_max_size =
std::max(_remote_thread_pool_max_size, config::doris_scanner_thread_pool_thread_num);
_remote_scan_thread_pool = std::make_unique<PriorityThreadPool>(
_remote_thread_pool_max_size, config::doris_remote_scanner_thread_pool_queue_size,
"RemoteScanThreadPool");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public class HudiJniScanner extends JniScanner {
private static final ScheduledExecutorService cleanResolverService = Executors.newScheduledThreadPool(1);

static {
int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 2 + 1, 4);
if (numThreads > 32) {
int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 2, 4);
if (numThreads > 48) {
numThreads = Runtime.getRuntime().availableProcessors();
}
avroReadPool = Executors.newFixedThreadPool(numThreads,
Expand Down
Loading