Skip to content

Commit

Permalink
[opt](scanner) optimize the number of threads of scanners (apache#28640)
Browse files Browse the repository at this point in the history
1. Remove `doris_max_remote_scanner_thread_pool_thread_num`, use `doris_scanner_thread_pool_thread_num` only.
2. Set the default value `doris_scanner_thread_pool_thread_num` as `std::max(48, CpuInfo::num_cores() * 4)`
  • Loading branch information
AshinGau authored and HappenLee committed Jan 12, 2024
1 parent 6e0984d commit 3e653fd
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 8 deletions.
12 changes: 11 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "common/status.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "util/cpu_info.h"

namespace doris::config {

Expand Down Expand Up @@ -228,7 +229,14 @@ DEFINE_Bool(doris_enable_scanner_thread_pool_per_disk, "true");
DEFINE_mInt64(doris_blocking_priority_queue_wait_timeout_ms, "500");
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
DEFINE_Int32(doris_scanner_thread_pool_thread_num, "48");
DEFINE_Int32(doris_scanner_thread_pool_thread_num, "-1");
DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> bool {
if (config == -1) {
CpuInfo::init();
doris_scanner_thread_pool_thread_num = std::max(48, CpuInfo::num_cores() * 4);
}
return true;
});
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
// number of olap scanner thread pool queue size
DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400");
Expand Down Expand Up @@ -864,6 +872,8 @@ DEFINE_mInt32(parquet_rowgroup_max_buffer_mb, "128");
// Max buffer size for parquet chunk column
DEFINE_mInt32(parquet_column_max_buffer_mb, "8");
DEFINE_mDouble(max_amplified_read_ratio, "0.8");
DEFINE_mInt32(merged_oss_min_io_size, "1048576");
DEFINE_mInt32(merged_hdfs_min_io_size, "8192");

// OrcReader
DEFINE_mInt32(orc_natural_read_size_mb, "8");
Expand Down
6 changes: 5 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ DECLARE_Bool(doris_enable_scanner_thread_pool_per_disk);
DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms);
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
DECLARE_Int32(doris_scanner_thread_pool_thread_num);
DECLARE_mInt32(doris_scanner_thread_pool_thread_num);
// max number of remote scanner thread pool size
// if equal to -1, value is std::max(512, CpuInfo::num_cores() * 10)
DECLARE_Int32(doris_max_remote_scanner_thread_pool_thread_num);
Expand Down Expand Up @@ -921,6 +921,10 @@ DECLARE_mInt32(parquet_rowgroup_max_buffer_mb);
DECLARE_mInt32(parquet_column_max_buffer_mb);
// Merge small IO, the max amplified read ratio
DECLARE_mDouble(max_amplified_read_ratio);
// Equivalent min size of each IO that can reach the maximum storage speed limit
// 1MB for oss, 8KB for hdfs
DECLARE_mInt32(merged_oss_min_io_size);
DECLARE_mInt32(merged_hdfs_min_io_size);

// OrcReader
DECLARE_mInt32(orc_natural_read_size_mb);
Expand Down
7 changes: 3 additions & 4 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ class MergeRangeFileReader : public io::FileReader {
static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024; // 8MB
static constexpr size_t BOX_SIZE = 1 * 1024 * 1024; // 1MB
static constexpr size_t SMALL_IO = 2 * 1024 * 1024; // 2MB
static constexpr size_t HDFS_MIN_IO_SIZE = 4 * 1024; // 4KB
static constexpr size_t OSS_MIN_IO_SIZE = 512 * 1024; // 512KB
static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128

MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
Expand All @@ -146,8 +144,9 @@ class MergeRangeFileReader : public io::FileReader {
_is_oss = typeid_cast<io::S3FileReader*>(_reader.get()) != nullptr;
_max_amplified_ratio = config::max_amplified_read_ratio;
// Equivalent min size of each IO that can reach the maximum storage speed limit:
// 512KB for oss, 4KB for hdfs
_equivalent_io_size = _is_oss ? OSS_MIN_IO_SIZE : HDFS_MIN_IO_SIZE;
// 1MB for oss, 8KB for hdfs
_equivalent_io_size =
_is_oss ? config::merged_oss_min_io_size : config::merged_hdfs_min_io_size;
for (const PrefetchRange& range : _random_access_ranges) {
_statistics.apply_bytes += range.end_offset - range.start_offset;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ Status ScannerScheduler::init(ExecEnv* env) {
_remote_thread_pool_max_size = config::doris_max_remote_scanner_thread_pool_thread_num != -1
? config::doris_max_remote_scanner_thread_pool_thread_num
: std::max(512, CpuInfo::num_cores() * 10);
_remote_thread_pool_max_size =
std::max(_remote_thread_pool_max_size, config::doris_scanner_thread_pool_thread_num);
_remote_scan_thread_pool = std::make_unique<PriorityThreadPool>(
_remote_thread_pool_max_size, config::doris_remote_scanner_thread_pool_queue_size,
"RemoteScanThreadPool");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public class HudiJniScanner extends JniScanner {
private static final ScheduledExecutorService cleanResolverService = Executors.newScheduledThreadPool(1);

static {
int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 2 + 1, 4);
if (numThreads > 32) {
int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 2, 4);
if (numThreads > 48) {
numThreads = Runtime.getRuntime().availableProcessors();
}
avroReadPool = Executors.newFixedThreadPool(numThreads,
Expand Down

0 comments on commit 3e653fd

Please sign in to comment.