From 1dd5e8ca2091ed48c4bf52cee7d5b345bc5f3d3e Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Fri, 22 Dec 2023 16:37:33 +0800 Subject: [PATCH] [improve](load) limit delta writer flush task parallelism --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 2 ++ be/src/olap/delta_writer.cpp | 4 ++++ be/src/olap/delta_writer_v2.cpp | 5 +++++ 4 files changed, 13 insertions(+) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 0ce0f63ba4e602..6ba74ec4db1c21 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -592,6 +592,8 @@ DEFINE_mDouble(memtable_insert_memory_ratio, "1.4"); DEFINE_mInt64(write_buffer_size, "209715200"); // max buffer size used in memtable for the aggregated table, default 400MB DEFINE_mInt64(write_buffer_size_for_agg, "419430400"); +// max parallel flush task per memtable writer +DEFINE_mInt32(memtable_flush_running_count_limit, "5"); DEFINE_Int32(load_process_max_memory_limit_percent, "50"); // 50% diff --git a/be/src/common/config.h b/be/src/common/config.h index 49cdaba03517c6..12714f4b809f73 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -651,6 +651,8 @@ DECLARE_mDouble(memtable_insert_memory_ratio); DECLARE_mInt64(write_buffer_size); // max buffer size used in memtable for the aggregated table, default 400MB DECLARE_mInt64(write_buffer_size_for_agg); +// max parallel flush task per memtable writer +DECLARE_mInt32(memtable_flush_running_count_limit); DECLARE_Int32(load_process_max_memory_limit_percent); // 50% diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index dc5973bf04d4d0..1c2f24ae6d94d3 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -126,6 +126,10 @@ Status BaseDeltaWriter::write(const vectorized::Block* block, const std::vector< if (!_is_init && !_is_cancelled) { RETURN_IF_ERROR(init()); } + while (_memtable_writer->get_flush_token_stats().flush_running_count >= + config::memtable_flush_running_count_limit) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } return _memtable_writer->write(block, row_idxs, is_append); } Status BaseDeltaWriter::wait_flush() { diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 672fe4476c9f2f..cfe059f1890c89 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -37,6 +37,7 @@ #include "gutil/strings/numbers.h" #include "io/fs/file_writer.h" // IWYU pragma: keep #include "olap/data_dir.h" +#include "olap/memtable_flush_executor.h" #include "olap/olap_define.h" #include "olap/rowset/beta_rowset.h" #include "olap/rowset/beta_rowset_writer_v2.h" @@ -152,6 +153,10 @@ Status DeltaWriterV2::write(const vectorized::Block* block, const std::vectorget_flush_token_stats().flush_running_count >= + config::memtable_flush_running_count_limit) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } SCOPED_RAW_TIMER(&_write_memtable_time); return _memtable_writer->write(block, row_idxs, is_append); }