From 738ab8c0fd914686eff229e6a737d0a4ec7fee6b Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Tue, 19 Dec 2023 18:02:30 +0800 Subject: [PATCH] 1 --- be/src/common/daemon.cpp | 19 +++++++++++++++++++ be/src/common/daemon.h | 7 +++++++ be/src/util/mem_info.cpp | 9 +++++---- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 3879ef9ff8b2e89..fb30b3d1e091b98 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -63,6 +63,8 @@ namespace doris { +CountDownLatch Daemon::_je_purge_dirty_pages_thread_latch {1}; + void Daemon::tcmalloc_gc_thread() { // TODO All cache GC wish to be supported #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && \ @@ -352,6 +354,18 @@ void Daemon::block_spill_gc_thread() { } } +void Daemon::je_purge_dirty_pages_thread() const { + _je_purge_dirty_pages_thread_latch.reset(1); + do { + _je_purge_dirty_pages_thread_latch.wait(); + if (_is_stopped) { + break; + } + doris::MemInfo::je_purge_all_arena_dirty_pages(); + _je_purge_dirty_pages_thread_latch.reset(1); + } while (true); +} + void Daemon::start() { Status st; st = Thread::create( @@ -381,6 +395,9 @@ void Daemon::start() { st = Thread::create( "Daemon", "block_spill_gc_thread", [this]() { this->block_spill_gc_thread(); }, &_threads.emplace_back()); + st = Thread::create( + "Daemon", "je_purge_dirty_pages_thread", + [this]() { this->je_purge_dirty_pages_thread(); }, &_threads.emplace_back()); CHECK(st.ok()) << st; } @@ -390,7 +407,9 @@ void Daemon::stop() { LOG(INFO) << "Doris daemon stop returned since no bg threads latch."; return; } + _is_stopped = true; _stop_background_threads_latch.count_down(); + _je_purge_dirty_pages_thread_latch.count_down(); for (auto&& t : _threads) { if (t) { t->join(); diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index 139584ba93f9c83..fbfe8a2dfc8cae3 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -36,6 +36,10 @@ class Daemon { // Stop background threads void stop(); + static void count_down_je_purge_dirty_pages_thread_latch() { + _je_purge_dirty_pages_thread_latch.count_down(); + } + private: void tcmalloc_gc_thread(); void memory_maintenance_thread(); @@ -43,8 +47,11 @@ class Daemon { void memtable_memory_limiter_tracker_refresh_thread(); void calculate_metrics_thread(); void block_spill_gc_thread(); + void je_purge_dirty_pages_thread() const; CountDownLatch _stop_background_threads_latch; + static CountDownLatch _je_purge_dirty_pages_thread_latch; + bool _is_stopped {}; std::vector> _threads; }; } // namespace doris diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 416ae1ae200b3c3..2fc46093beddfa2 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -37,6 +37,7 @@ #include #include "common/config.h" +#include "common/daemon.h" #include "common/status.h" #include "gutil/strings/split.h" #include "runtime/exec_env.h" @@ -129,7 +130,7 @@ bool MemInfo::process_minor_gc() { std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); Defer defer {[&]() { - je_purge_all_arena_dirty_pages(); + Daemon::count_down_je_purge_dirty_pages_thread_latch(); std::stringstream ss; profile->pretty_print(&ss); LOG(INFO) << fmt::format( @@ -139,7 +140,7 @@ bool MemInfo::process_minor_gc() { }}; freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get()); - je_purge_all_arena_dirty_pages(); + Daemon::count_down_je_purge_dirty_pages_thread_latch(); if (freed_mem > _s_process_minor_gc_size) { return true; } @@ -180,7 +181,7 @@ bool MemInfo::process_full_gc() { std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); Defer defer {[&]() { - je_purge_all_arena_dirty_pages(); + Daemon::count_down_je_purge_dirty_pages_thread_latch(); std::stringstream ss; profile->pretty_print(&ss); LOG(INFO) << fmt::format( @@ -190,7 +191,7 @@ bool MemInfo::process_full_gc() { }}; freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get()); - je_purge_all_arena_dirty_pages(); + Daemon::count_down_je_purge_dirty_pages_thread_latch(); if (freed_mem > _s_process_full_gc_size) { return true; }