diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 9df1a184f609ef..de1458c240dff3 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1025,9 +1025,8 @@ DEFINE_mInt32(tablet_path_check_batch_size, "1000"); DEFINE_mInt64(row_column_page_size, "4096"); // it must be larger than or equal to 5MB DEFINE_mInt64(s3_write_buffer_size, "5242880"); -DEFINE_mInt32(s3_task_check_interval, "60"); -// The timeout config for S3 buffer allocation -DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300"); +// Log interval when doing s3 upload task +DEFINE_mInt32(s3_file_writer_log_interval_second, "60"); DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000"); DEFINE_mInt64(hdfs_write_batch_buffer_size_mb, "4"); // 4MB @@ -1217,6 +1216,15 @@ DEFINE_mBool(enable_injection_point, "false"); DEFINE_mBool(ignore_schema_change_check, "false"); +// The min thread num for BufferedReaderPrefetchThreadPool +DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread, "16"); +// The max thread num for BufferedReaderPrefetchThreadPool +DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread, "64"); +// The min thread num for S3FileUploadThreadPool +DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16"); +// The max thread num for S3FileUploadThreadPool +DEFINE_Int64(num_s3_file_upload_thread_pool_max_thread, "64"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 81910dd2553fad..4139d76b6bcb7a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1070,9 +1070,8 @@ DECLARE_mInt32(tablet_path_check_batch_size); DECLARE_mInt64(row_column_page_size); // it must be larger than or equal to 5MB DECLARE_mInt64(s3_write_buffer_size); -DECLARE_mInt32(s3_task_check_interval); -// The timeout config for S3 buffer allocation -DECLARE_mInt32(s3_writer_buffer_allocation_timeout); +// Log interval when doing s3 upload task +DECLARE_mInt32(s3_file_writer_log_interval_second); // the max number of cached file handle for block segemnt DECLARE_mInt64(file_cache_max_file_reader_cache_size); DECLARE_mInt64(hdfs_write_batch_buffer_size_mb); @@ -1296,6 +1295,15 @@ DECLARE_mBool(enable_injection_point); DECLARE_mBool(ignore_schema_change_check); +// The min thread num for BufferedReaderPrefetchThreadPool +DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread); +// The max thread num for BufferedReaderPrefetchThreadPool +DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread); +// The min thread num for S3FileUploadThreadPool +DECLARE_Int64(num_s3_file_upload_thread_pool_min_thread); +// The max thread num for S3FileUploadThreadPool +DECLARE_Int64(num_s3_file_upload_thread_pool_max_thread); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index 30fb3a8633891c..283605f23bed91 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -184,7 +184,7 @@ struct DownloadTaskExecutor { LOG_WARNING("").error(st); } } - auto timeout_duration = config::s3_task_check_interval; + auto timeout_duration = config::s3_file_writer_log_interval_second; timespec current_time; // We don't need high accuracy here, so we use time(nullptr) // since it's the fastest way to get current time(second) diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 9df1ac847af365..84487f496ac1e9 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -135,7 +135,7 @@ Status S3FileWriter::_create_multi_upload_request() { } void S3FileWriter::_wait_until_finish(std::string_view task_name) { - auto timeout_duration = config::s3_writer_buffer_allocation_timeout; + auto timeout_duration = config::s3_file_writer_log_interval_second; auto msg = fmt::format( "{} multipart upload already takes {} seconds, bucket={}, key={}, upload_id={}", task_name, timeout_duration, _bucket, _path.native(), _upload_id); diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 5cbb5829ee0ac1..5a7e39cf158c41 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -142,6 +142,17 @@ static void init_doris_metrics(const std::vector& store_paths) { DorisMetrics::instance()->initialize(init_system_metrics, disk_devices, network_interfaces); } +// Used to calculate the num of min thread and max thread based on the passed config +static pair get_num_threads(size_t min_num, size_t max_num) { + auto num_cores = doris::CpuInfo::num_cores(); + min_num = (min_num == 0) ? num_cores : min_num; + max_num = (max_num == 0) ? num_cores : max_num; + auto factor = max_num / min_num; + min_num = std::min(num_cores * factor, min_num); + max_num = std::min(min_num * factor, max_num); + return {min_num, max_num}; +} + Status ExecEnv::init(ExecEnv* env, const std::vector& store_paths, const std::vector& spill_store_paths, const std::set& broken_paths) { @@ -184,9 +195,12 @@ Status ExecEnv::_init(const std::vector& store_paths, .set_max_queue_size(config::send_batch_thread_pool_queue_size) .build(&_send_batch_thread_pool)); + auto [buffered_reader_min_threads, buffered_reader_max_threads] = + get_num_threads(config::num_buffered_reader_prefetch_thread_pool_min_thread, + config::num_buffered_reader_prefetch_thread_pool_max_thread); static_cast(ThreadPoolBuilder("BufferedReaderPrefetchThreadPool") - .set_min_threads(16) - .set_max_threads(64) + .set_min_threads(buffered_reader_min_threads) + .set_max_threads(buffered_reader_max_threads) .build(&_buffered_reader_prefetch_thread_pool)); static_cast(ThreadPoolBuilder("SendTableStatsThreadPool") @@ -199,9 +213,12 @@ Status ExecEnv::_init(const std::vector& store_paths, .set_max_threads(16) .build(&_s3_downloader_download_poller_thread_pool)); + auto [s3_file_upload_min_threads, s3_file_upload_max_threads] = + get_num_threads(config::num_s3_file_upload_thread_pool_min_thread, + config::num_s3_file_upload_thread_pool_max_thread); static_cast(ThreadPoolBuilder("S3FileUploadThreadPool") - .set_min_threads(16) - .set_max_threads(64) + .set_min_threads(s3_file_upload_min_threads) + .set_max_threads(s3_file_upload_max_threads) .build(&_s3_file_upload_thread_pool)); // min num equal to fragment pool's min num