Skip to content

Commit

Permalink
[enhencement](segcompaction) cancel inflight segcompaction tasks fast…
Browse files Browse the repository at this point in the history
…er when load finish (apache#28901)

[Goal]
When building the rowset writer, avoid waiting for inflight segcompaction
to elimite long tail latency for load.

[Current situation]
1. The segcompaction of a rowset is executed serially. During the build phase,
we need to wait for the completion of the inflight segcompaction task.

2. If the rowset writer finishes writing and starts building meta, then segments
that have not been compacted will not be submitted to segcompaction worker.
We simply ignore them to accelerate the build process.

3. But this is not enough. If a segcompaction task has already been submitted to
the worker thread pool, we will set a cancelled flag for the worker,
and nothing will be done during execution to complete the task ASAP.

4. But this is still not enough. Although the latency of the segcompaction task
has been shortened by aforemetioned method, tasks may still be queuing in the
thread pool.

[Solution]
We can increase the worker thread pool to avoid queuing congestion, but this is
not the best solution.
Segcompaction should be a best effort work, and should not use too many CPU and
memory resources. So we adopted the strategy of unbinding build and segcompaction,
specifically:

1. For the segcompaction task that is performing compaction operations, we should
not interrupt it, otherwise it may cause file corruption

2. For those tasks still queued, we no longer care about their results (because
these tasks will know they are cancelled and will not perform any actual operations),
so we just ignore them and continue with the subsequent rowset build process

Signed-off-by: freemandealer <[email protected]>
  • Loading branch information
freemandealer authored and HappenLee committed Jan 12, 2024
1 parent f184e78 commit 13a69a2
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 27 deletions.
15 changes: 10 additions & 5 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -996,17 +996,22 @@ Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionT
return _submit_compaction_task(tablet, compaction_type, force);
}

Status StorageEngine::_handle_seg_compaction(SegcompactionWorker* worker,
SegCompactionCandidatesSharedPtr segments) {
Status StorageEngine::_handle_seg_compaction(std::shared_ptr<SegcompactionWorker> worker,
SegCompactionCandidatesSharedPtr segments,
uint64_t submission_time) {
// note: be aware that worker->_writer maybe released when the task is cancelled
uint64_t exec_queue_time = GetCurrentTimeMicros() - submission_time;
LOG(INFO) << "segcompaction thread pool queue time(ms): " << exec_queue_time / 1000;
worker->compact_segments(segments);
// return OK here. error will be reported via BetaRowsetWriter::_segcompaction_status
return Status::OK();
}

Status StorageEngine::submit_seg_compaction_task(SegcompactionWorker* worker,
Status StorageEngine::submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker,
SegCompactionCandidatesSharedPtr segments) {
return _seg_compaction_thread_pool->submit_func(
std::bind<void>(&StorageEngine::_handle_seg_compaction, this, worker, segments));
uint64_t submission_time = GetCurrentTimeMicros();
return _seg_compaction_thread_pool->submit_func(std::bind<void>(
&StorageEngine::_handle_seg_compaction, this, worker, segments, submission_time));
}

Status StorageEngine::process_index_change_task(const TAlterInvertedIndexReq& request) {
Expand Down
24 changes: 12 additions & 12 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ BaseBetaRowsetWriter::BaseBetaRowsetWriter()
_total_index_size(0) {}

BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine)
: _engine(engine), _segcompaction_worker(std::make_unique<SegcompactionWorker>(this)) {}
: _engine(engine), _segcompaction_worker(std::make_shared<SegcompactionWorker>(this)) {}

BaseBetaRowsetWriter::~BaseBetaRowsetWriter() {
// TODO(lingbin): Should wrapper exception logic, no need to know file ops directly.
Expand Down Expand Up @@ -381,14 +381,9 @@ Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, u
return Status::OK();
}

// return true if there isn't any flying segcompaction, otherwise return false
bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() {
std::lock_guard<std::mutex> l(_is_doing_segcompaction_lock);
if (!_is_doing_segcompaction) {
_is_doing_segcompaction = true;
return true;
} else {
return false;
}
return !_is_doing_segcompaction.exchange(true);
}

Status BetaRowsetWriter::_segcompaction_if_necessary() {
Expand All @@ -410,7 +405,7 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id
<< " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment
<< ", segcompacted_point:" << _segcompacted_point;
status = _engine.submit_seg_compaction_task(_segcompaction_worker.get(), segments);
status = _engine.submit_seg_compaction_task(_segcompaction_worker, segments);
if (status.ok()) {
return status;
}
Expand Down Expand Up @@ -546,9 +541,14 @@ Status BetaRowsetWriter::_close_file_writers() {
// if _segment_start_id is not zero, that means it's a transient rowset writer for
// MoW partial update, don't need to do segment compaction.
if (_segment_start_id == 0) {
_segcompaction_worker->cancel();
RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(),
"segcompaction failed when build new rowset");
if (_segcompaction_worker->cancel()) {
std::lock_guard lk(_is_doing_segcompaction_lock);
_is_doing_segcompaction = false;
_segcompacting_cond.notify_all();
} else {
RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(),
"segcompaction failed when build new rowset");
}
RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(),
"rename last segments failed when build new rowset");
if (_segcompaction_worker->get_file_writer()) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {
// already been segment compacted
std::atomic<int32_t> _num_segcompacted {0}; // index for segment compaction

std::unique_ptr<SegcompactionWorker> _segcompaction_worker;
std::shared_ptr<SegcompactionWorker> _segcompaction_worker;

// ensure only one inflight segcompaction task for each rowset
std::atomic<bool> _is_doing_segcompaction {false};
// enforce compare-and-swap on _is_doing_segcompaction
// enforce condition variable on _is_doing_segcompaction
std::mutex _is_doing_segcompaction_lock;
std::condition_variable _segcompacting_cond;

Expand Down
15 changes: 12 additions & 3 deletions be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,12 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt

void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segments) {
Status status = Status::OK();
if (_cancelled) {
LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task";
} else {
if (_is_compacting_state_mutable.exchange(false)) {
status = _do_compact_segments(segments);
} else {
// note: be aware that _writer maybe released when the task is cancelled
LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task";
return;
}
if (!status.ok()) {
int16_t errcode = status.code();
Expand All @@ -330,6 +332,13 @@ void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segm
_writer->_is_doing_segcompaction = false;
_writer->_segcompacting_cond.notify_all();
}
_is_compacting_state_mutable = true;
}

bool SegcompactionWorker::cancel() {
// return true if the task is canncellable (actual compaction is not started)
// return false when the task is not cancellable (it is in the middle of segcompaction)
return _is_compacting_state_mutable.exchange(false);
}

} // namespace doris
6 changes: 4 additions & 2 deletions be/src/olap/rowset/segcompaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class SegcompactionWorker {
io::FileWriterPtr& get_file_writer() { return _file_writer; }

// set the cancel flag, tasks already started will not be cancelled.
void cancel() { _cancelled = true; }
bool cancel();

private:
Status _create_segment_writer_for_segcompaction(
Expand All @@ -77,6 +77,8 @@ class SegcompactionWorker {
// Currently cloud storage engine doesn't need segcompaction
BetaRowsetWriter* _writer = nullptr;
io::FileWriterPtr _file_writer;
std::atomic<bool> _cancelled = false;

// the state is not mutable when 1)actual compaction operation started or 2) cancelled
std::atomic<bool> _is_compacting_state_mutable = true;
};
} // namespace doris
7 changes: 4 additions & 3 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class StorageEngine {

Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
bool force);
Status submit_seg_compaction_task(SegcompactionWorker* worker,
Status submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker,
SegCompactionCandidatesSharedPtr segments);

std::unique_ptr<ThreadPool>& tablet_publish_txn_thread_pool() {
Expand Down Expand Up @@ -313,8 +313,9 @@ class StorageEngine {
void _remove_unused_remote_files_callback();
void _cold_data_compaction_producer_callback();

Status _handle_seg_compaction(SegcompactionWorker* worker,
SegCompactionCandidatesSharedPtr segments);
Status _handle_seg_compaction(std::shared_ptr<SegcompactionWorker> worker,
SegCompactionCandidatesSharedPtr segments,
uint64_t submission_time);

Status _handle_index_change(IndexBuilderSharedPtr index_builder);

Expand Down

0 comments on commit 13a69a2

Please sign in to comment.