diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 80cd167260c04d..203b9a4b3107df 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -66,6 +66,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true); _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime"); + _close_timer = ADD_TIMER(_profile, "CloseTime"); _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); } @@ -266,30 +267,23 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data return _status; } -Status TabletStream::close() { +void TabletStream::_close() { + std::lock_guard lock(_close_lock); + if (_closed == true) { + return; + } + Defer notify {[&]() { + _closed = true; + _close_cv.notify_all(); + }}; if (!_status.ok()) { - return _status; + return; } - SCOPED_TIMER(_close_wait_timer); - bthread::Mutex mu; - std::unique_lock lock(mu); - bthread::ConditionVariable cv; - auto wait_func = [this, &mu, &cv] { - signal::set_signal_task_id(_load_id); - for (auto& token : _flush_tokens) { - token->wait(); - } - std::lock_guard lock(mu); - cv.notify_one(); - }; - bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func); - if (ret) { - cv.wait(lock); - } else { - _status = Status::Error( - "there is not enough thread resource for close load"); - return _status; + SCOPED_TIMER(_close_timer); + signal::set_signal_task_id(_load_id); + for (auto& token : _flush_tokens) { + token->wait(); } DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; }); @@ -297,32 +291,37 @@ Status TabletStream::close() { _status = Status::Corruption( "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, _num_segments, _next_segid.load(), print_id(_load_id)); - return _status; + return; } // it is necessary to check status after wait_func, // for create_rowset could fail during add_segment when loading to MOW table, // in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump. if (!_status.ok()) { - return _status; + return; } + _status = _load_stream_writer->close(); +} - auto close_func = [this, &mu, &cv]() { - signal::set_signal_task_id(_load_id); - auto st = _load_stream_writer->close(); - if (!st.ok() && _status.ok()) { - _status = st; - } - std::lock_guard lock(mu); - cv.notify_one(); - }; - ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func); - if (ret) { - cv.wait(lock); - } else { +void TabletStream::close_async() { + if (_closing.exchange(true)) { + LOG(WARNING) << "skip double closing " << *this; + return; + } + bool ret = _load_stream_mgr->heavy_work_pool()->try_offer([&]() { _close(); }); + if (!ret) { _status = Status::Error( - "there is not enough thread resource for close load"); + "there is not enough thread resource for close_async"); } +} + +Status TabletStream::close_wait() { + SCOPED_TIMER(_close_wait_timer); + std::unique_lock lock(_close_lock); + if (_closed == true) { + return _status; + } + _close_cv.wait(lock); return _status; } @@ -392,7 +391,11 @@ void IndexStream::close(const std::vector& tablets_to_commit, } for (auto& [_, tablet_stream] : _tablet_streams_map) { - auto st = tablet_stream->close(); + tablet_stream->close_async(); + } + + for (auto& [_, tablet_stream] : _tablet_streams_map) { + auto st = tablet_stream->close_wait(); if (st.ok()) { success_tablet_ids->push_back(tablet_stream->id()); } else { diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 3b649c688355fe..36cbc56982d234 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -54,12 +54,14 @@ class TabletStream { Status add_segment(const PStreamHeader& header, butil::IOBuf* data); void add_num_segments(int64_t num_segments) { _num_segments += num_segments; } void disable_num_segments_check() { _check_num_segments = false; } - Status close(); + void close_async(); + Status close_wait(); int64_t id() const { return _id; } friend std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream); private: + void _close(); int64_t _id; LoadStreamWriterSharedPtr _load_stream_writer; std::vector> _flush_tokens; @@ -71,9 +73,14 @@ class TabletStream { Status _status; PUniqueId _load_id; int64_t _txn_id; + std::atomic _closing = false; + bool _closed = false; + std::mutex _close_lock; + std::condition_variable _close_cv; RuntimeProfile* _profile = nullptr; RuntimeProfile::Counter* _append_data_timer = nullptr; RuntimeProfile::Counter* _add_segment_timer = nullptr; + RuntimeProfile::Counter* _close_timer = nullptr; RuntimeProfile::Counter* _close_wait_timer = nullptr; LoadStreamMgr* _load_stream_mgr = nullptr; };