Skip to content

Commit

Permalink
[chore](config) Add config to control BufferedReader and S3FileWriter…
Browse files Browse the repository at this point in the history
…'s thread pool's min max nums (#33974)
  • Loading branch information
ByteYue authored Apr 25, 2024
1 parent 6b9f132 commit 3034ac3
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 12 deletions.
14 changes: 11 additions & 3 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1025,9 +1025,8 @@ DEFINE_mInt32(tablet_path_check_batch_size, "1000");
DEFINE_mInt64(row_column_page_size, "4096");
// it must be larger than or equal to 5MB
DEFINE_mInt64(s3_write_buffer_size, "5242880");
DEFINE_mInt32(s3_task_check_interval, "60");
// The timeout config for S3 buffer allocation
DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300");
// Log interval when doing s3 upload task
DEFINE_mInt32(s3_file_writer_log_interval_second, "60");
DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
DEFINE_mInt64(hdfs_write_batch_buffer_size_mb, "4"); // 4MB

Expand Down Expand Up @@ -1217,6 +1216,15 @@ DEFINE_mBool(enable_injection_point, "false");

DEFINE_mBool(ignore_schema_change_check, "false");

// The min thread num for BufferedReaderPrefetchThreadPool
DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread, "16");
// The max thread num for BufferedReaderPrefetchThreadPool
DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread, "64");
// The min thread num for S3FileUploadThreadPool
DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16");
// The max thread num for S3FileUploadThreadPool
DEFINE_Int64(num_s3_file_upload_thread_pool_max_thread, "64");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
14 changes: 11 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1070,9 +1070,8 @@ DECLARE_mInt32(tablet_path_check_batch_size);
DECLARE_mInt64(row_column_page_size);
// it must be larger than or equal to 5MB
DECLARE_mInt64(s3_write_buffer_size);
DECLARE_mInt32(s3_task_check_interval);
// The timeout config for S3 buffer allocation
DECLARE_mInt32(s3_writer_buffer_allocation_timeout);
// Log interval when doing s3 upload task
DECLARE_mInt32(s3_file_writer_log_interval_second);
// the max number of cached file handle for block segemnt
DECLARE_mInt64(file_cache_max_file_reader_cache_size);
DECLARE_mInt64(hdfs_write_batch_buffer_size_mb);
Expand Down Expand Up @@ -1296,6 +1295,15 @@ DECLARE_mBool(enable_injection_point);

DECLARE_mBool(ignore_schema_change_check);

// The min thread num for BufferedReaderPrefetchThreadPool
DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread);
// The max thread num for BufferedReaderPrefetchThreadPool
DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread);
// The min thread num for S3FileUploadThreadPool
DECLARE_Int64(num_s3_file_upload_thread_pool_min_thread);
// The max thread num for S3FileUploadThreadPool
DECLARE_Int64(num_s3_file_upload_thread_pool_max_thread);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ struct DownloadTaskExecutor {
LOG_WARNING("").error(st);
}
}
auto timeout_duration = config::s3_task_check_interval;
auto timeout_duration = config::s3_file_writer_log_interval_second;
timespec current_time;
// We don't need high accuracy here, so we use time(nullptr)
// since it's the fastest way to get current time(second)
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ Status S3FileWriter::_create_multi_upload_request() {
}

void S3FileWriter::_wait_until_finish(std::string_view task_name) {
auto timeout_duration = config::s3_writer_buffer_allocation_timeout;
auto timeout_duration = config::s3_file_writer_log_interval_second;
auto msg = fmt::format(
"{} multipart upload already takes {} seconds, bucket={}, key={}, upload_id={}",
task_name, timeout_duration, _bucket, _path.native(), _upload_id);
Expand Down
25 changes: 21 additions & 4 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,17 @@ static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
DorisMetrics::instance()->initialize(init_system_metrics, disk_devices, network_interfaces);
}

// Used to calculate the num of min thread and max thread based on the passed config
static pair<size_t, size_t> get_num_threads(size_t min_num, size_t max_num) {
auto num_cores = doris::CpuInfo::num_cores();
min_num = (min_num == 0) ? num_cores : min_num;
max_num = (max_num == 0) ? num_cores : max_num;
auto factor = max_num / min_num;
min_num = std::min(num_cores * factor, min_num);
max_num = std::min(min_num * factor, max_num);
return {min_num, max_num};
}

Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths,
const std::vector<StorePath>& spill_store_paths,
const std::set<std::string>& broken_paths) {
Expand Down Expand Up @@ -184,9 +195,12 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
.set_max_queue_size(config::send_batch_thread_pool_queue_size)
.build(&_send_batch_thread_pool));

auto [buffered_reader_min_threads, buffered_reader_max_threads] =
get_num_threads(config::num_buffered_reader_prefetch_thread_pool_min_thread,
config::num_buffered_reader_prefetch_thread_pool_max_thread);
static_cast<void>(ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
.set_min_threads(16)
.set_max_threads(64)
.set_min_threads(buffered_reader_min_threads)
.set_max_threads(buffered_reader_max_threads)
.build(&_buffered_reader_prefetch_thread_pool));

static_cast<void>(ThreadPoolBuilder("SendTableStatsThreadPool")
Expand All @@ -199,9 +213,12 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
.set_max_threads(16)
.build(&_s3_downloader_download_poller_thread_pool));

auto [s3_file_upload_min_threads, s3_file_upload_max_threads] =
get_num_threads(config::num_s3_file_upload_thread_pool_min_thread,
config::num_s3_file_upload_thread_pool_max_thread);
static_cast<void>(ThreadPoolBuilder("S3FileUploadThreadPool")
.set_min_threads(16)
.set_max_threads(64)
.set_min_threads(s3_file_upload_min_threads)
.set_max_threads(s3_file_upload_max_threads)
.build(&_s3_file_upload_thread_pool));

// min num equal to fragment pool's min num
Expand Down

0 comments on commit 3034ac3

Please sign in to comment.