From 13a69a28d7a8ec310313c0c17a917f60e769c8ca Mon Sep 17 00:00:00 2001 From: zhengyu Date: Thu, 28 Dec 2023 14:32:29 +0800 Subject: [PATCH] [enhencement](segcompaction) cancel inflight segcompaction tasks faster when load finish (#28901) [Goal] When building the rowset writer, avoid waiting for inflight segcompaction to elimite long tail latency for load. [Current situation] 1. The segcompaction of a rowset is executed serially. During the build phase, we need to wait for the completion of the inflight segcompaction task. 2. If the rowset writer finishes writing and starts building meta, then segments that have not been compacted will not be submitted to segcompaction worker. We simply ignore them to accelerate the build process. 3. But this is not enough. If a segcompaction task has already been submitted to the worker thread pool, we will set a cancelled flag for the worker, and nothing will be done during execution to complete the task ASAP. 4. But this is still not enough. Although the latency of the segcompaction task has been shortened by aforemetioned method, tasks may still be queuing in the thread pool. [Solution] We can increase the worker thread pool to avoid queuing congestion, but this is not the best solution. Segcompaction should be a best effort work, and should not use too many CPU and memory resources. So we adopted the strategy of unbinding build and segcompaction, specifically: 1. For the segcompaction task that is performing compaction operations, we should not interrupt it, otherwise it may cause file corruption 2. For those tasks still queued, we no longer care about their results (because these tasks will know they are cancelled and will not perform any actual operations), so we just ignore them and continue with the subsequent rowset build process Signed-off-by: freemandealer --- be/src/olap/olap_server.cpp | 15 +++++++++----- be/src/olap/rowset/beta_rowset_writer.cpp | 24 +++++++++++------------ be/src/olap/rowset/beta_rowset_writer.h | 4 ++-- be/src/olap/rowset/segcompaction.cpp | 15 +++++++++++--- be/src/olap/rowset/segcompaction.h | 6 ++++-- be/src/olap/storage_engine.h | 7 ++++--- 6 files changed, 44 insertions(+), 27 deletions(-) diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 29aa4eb05fa099c..9ee24dee19ab62a 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -996,17 +996,22 @@ Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionT return _submit_compaction_task(tablet, compaction_type, force); } -Status StorageEngine::_handle_seg_compaction(SegcompactionWorker* worker, - SegCompactionCandidatesSharedPtr segments) { +Status StorageEngine::_handle_seg_compaction(std::shared_ptr worker, + SegCompactionCandidatesSharedPtr segments, + uint64_t submission_time) { + // note: be aware that worker->_writer maybe released when the task is cancelled + uint64_t exec_queue_time = GetCurrentTimeMicros() - submission_time; + LOG(INFO) << "segcompaction thread pool queue time(ms): " << exec_queue_time / 1000; worker->compact_segments(segments); // return OK here. error will be reported via BetaRowsetWriter::_segcompaction_status return Status::OK(); } -Status StorageEngine::submit_seg_compaction_task(SegcompactionWorker* worker, +Status StorageEngine::submit_seg_compaction_task(std::shared_ptr worker, SegCompactionCandidatesSharedPtr segments) { - return _seg_compaction_thread_pool->submit_func( - std::bind(&StorageEngine::_handle_seg_compaction, this, worker, segments)); + uint64_t submission_time = GetCurrentTimeMicros(); + return _seg_compaction_thread_pool->submit_func(std::bind( + &StorageEngine::_handle_seg_compaction, this, worker, segments, submission_time)); } Status StorageEngine::process_index_change_task(const TAlterInvertedIndexReq& request) { diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 6850ce0f43c6bd3..ce30a03c8005505 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -105,7 +105,7 @@ BaseBetaRowsetWriter::BaseBetaRowsetWriter() _total_index_size(0) {} BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine) - : _engine(engine), _segcompaction_worker(std::make_unique(this)) {} + : _engine(engine), _segcompaction_worker(std::make_shared(this)) {} BaseBetaRowsetWriter::~BaseBetaRowsetWriter() { // TODO(lingbin): Should wrapper exception logic, no need to know file ops directly. @@ -381,14 +381,9 @@ Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, u return Status::OK(); } +// return true if there isn't any flying segcompaction, otherwise return false bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() { - std::lock_guard l(_is_doing_segcompaction_lock); - if (!_is_doing_segcompaction) { - _is_doing_segcompaction = true; - return true; - } else { - return false; - } + return !_is_doing_segcompaction.exchange(true); } Status BetaRowsetWriter::_segcompaction_if_necessary() { @@ -410,7 +405,7 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() { LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment << ", segcompacted_point:" << _segcompacted_point; - status = _engine.submit_seg_compaction_task(_segcompaction_worker.get(), segments); + status = _engine.submit_seg_compaction_task(_segcompaction_worker, segments); if (status.ok()) { return status; } @@ -546,9 +541,14 @@ Status BetaRowsetWriter::_close_file_writers() { // if _segment_start_id is not zero, that means it's a transient rowset writer for // MoW partial update, don't need to do segment compaction. if (_segment_start_id == 0) { - _segcompaction_worker->cancel(); - RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(), - "segcompaction failed when build new rowset"); + if (_segcompaction_worker->cancel()) { + std::lock_guard lk(_is_doing_segcompaction_lock); + _is_doing_segcompaction = false; + _segcompacting_cond.notify_all(); + } else { + RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(), + "segcompaction failed when build new rowset"); + } RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(), "rename last segments failed when build new rowset"); if (_segcompaction_worker->get_file_writer()) { diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 347994e243e7a47..0bd96f282918122 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -225,11 +225,11 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter { // already been segment compacted std::atomic _num_segcompacted {0}; // index for segment compaction - std::unique_ptr _segcompaction_worker; + std::shared_ptr _segcompaction_worker; // ensure only one inflight segcompaction task for each rowset std::atomic _is_doing_segcompaction {false}; - // enforce compare-and-swap on _is_doing_segcompaction + // enforce condition variable on _is_doing_segcompaction std::mutex _is_doing_segcompaction_lock; std::condition_variable _segcompacting_cond; diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index be92557da237d66..ad3e78a6081964d 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -302,10 +302,12 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segments) { Status status = Status::OK(); - if (_cancelled) { - LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task"; - } else { + if (_is_compacting_state_mutable.exchange(false)) { status = _do_compact_segments(segments); + } else { + // note: be aware that _writer maybe released when the task is cancelled + LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task"; + return; } if (!status.ok()) { int16_t errcode = status.code(); @@ -330,6 +332,13 @@ void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segm _writer->_is_doing_segcompaction = false; _writer->_segcompacting_cond.notify_all(); } + _is_compacting_state_mutable = true; +} + +bool SegcompactionWorker::cancel() { + // return true if the task is canncellable (actual compaction is not started) + // return false when the task is not cancellable (it is in the middle of segcompaction) + return _is_compacting_state_mutable.exchange(false); } } // namespace doris diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index e2b5812ad8c32e6..5aef89992d30b82 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -54,7 +54,7 @@ class SegcompactionWorker { io::FileWriterPtr& get_file_writer() { return _file_writer; } // set the cancel flag, tasks already started will not be cancelled. - void cancel() { _cancelled = true; } + bool cancel(); private: Status _create_segment_writer_for_segcompaction( @@ -77,6 +77,8 @@ class SegcompactionWorker { // Currently cloud storage engine doesn't need segcompaction BetaRowsetWriter* _writer = nullptr; io::FileWriterPtr _file_writer; - std::atomic _cancelled = false; + + // the state is not mutable when 1)actual compaction operation started or 2) cancelled + std::atomic _is_compacting_state_mutable = true; }; } // namespace doris diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 77d3efeaaf8e69a..750e7a4ca21cab8 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -196,7 +196,7 @@ class StorageEngine { Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, bool force); - Status submit_seg_compaction_task(SegcompactionWorker* worker, + Status submit_seg_compaction_task(std::shared_ptr worker, SegCompactionCandidatesSharedPtr segments); std::unique_ptr& tablet_publish_txn_thread_pool() { @@ -313,8 +313,9 @@ class StorageEngine { void _remove_unused_remote_files_callback(); void _cold_data_compaction_producer_callback(); - Status _handle_seg_compaction(SegcompactionWorker* worker, - SegCompactionCandidatesSharedPtr segments); + Status _handle_seg_compaction(std::shared_ptr worker, + SegCompactionCandidatesSharedPtr segments, + uint64_t submission_time); Status _handle_index_change(IndexBuilderSharedPtr index_builder);