From abea6021a783a69de1bc69ca125aa8eff596c260 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 21 May 2024 17:46:16 +0800 Subject: [PATCH] [refactor](move-memtable) remove IndexStream in LoadStream --- be/src/runtime/load_stream.cpp | 145 ++++++++++++--------------- be/src/runtime/load_stream.h | 44 ++------ be/test/runtime/load_stream_test.cpp | 2 - 3 files changed, 72 insertions(+), 119 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index ccd49ec0031141..c74b24d2703d3d 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -172,6 +172,9 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data int64_t time_ms = timer.elapsed_time() / 1000 / 1000; g_load_stream_flush_wait_ms << time_ms; g_load_stream_flush_running_threads << 1; + if (time_ms >= 100) { + LOG(INFO) << "waited " << time_ms << "ms for flush token task limit" << *this; + } return flush_token->submit_func(flush_func); } @@ -260,71 +263,6 @@ Status TabletStream::close() { return st; } -IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id, - std::shared_ptr schema, - LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) - : _id(id), - _load_id(load_id), - _txn_id(txn_id), - _schema(schema), - _load_stream_mgr(load_stream_mgr) { - _profile = profile->create_child(fmt::format("IndexStream {}", id), true, true); - _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); - _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); -} - -Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { - SCOPED_TIMER(_append_data_timer); - int64_t tablet_id = header.tablet_id(); - TabletStreamSharedPtr tablet_stream; - { - std::lock_guard lock_guard(_lock); - auto it = _tablet_streams_map.find(tablet_id); - if (it == _tablet_streams_map.end()) { - RETURN_IF_ERROR(_init_tablet_stream(tablet_stream, tablet_id, header.partition_id())); - } else { - tablet_stream = it->second; - } - } - - return tablet_stream->append_data(header, data); -} - -Status IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, - int64_t partition_id) { - tablet_stream = std::make_shared(_load_id, tablet_id, _txn_id, _load_stream_mgr, - _profile); - _tablet_streams_map[tablet_id] = tablet_stream; - RETURN_IF_ERROR(tablet_stream->init(_schema, _id, partition_id)); - return Status::OK(); -} - -Status IndexStream::close(const std::vector& tablets_to_commit, - std::vector* success_tablet_ids, FailedTablets* failed_tablets) { - std::lock_guard lock_guard(_lock); - SCOPED_TIMER(_close_wait_timer); - // open all need commit tablets - for (const auto& tablet : tablets_to_commit) { - TabletStreamSharedPtr tablet_stream; - auto it = _tablet_streams_map.find(tablet.tablet_id()); - if (it == _tablet_streams_map.end() && _id == tablet.index_id()) { - RETURN_IF_ERROR( - _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id())); - } - } - - for (auto& [_, tablet_stream] : _tablet_streams_map) { - auto st = tablet_stream->close(); - if (st.ok()) { - success_tablet_ids->push_back(tablet_stream->id()); - } else { - LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st; - failed_tablets->emplace_back(tablet_stream->id(), st); - } - } - return Status::OK(); -} - // TODO: Profile is temporary disabled, because: // 1. It's not being processed by the upstream for now // 2. There are some problems in _profile->to_thrift() @@ -365,15 +303,13 @@ Status LoadStream::init(const POpenLoadStreamRequest* request) { _schema = std::make_shared(); RETURN_IF_ERROR(_schema->init(request->schema())); for (auto& index : request->schema().indexes()) { - _index_streams_map[index.id()] = std::make_shared( - _load_id, index.id(), _txn_id, _schema, _load_stream_mgr, _profile.get()); + _indexes.insert(index.id()); } LOG(INFO) << "succeed to init load stream " << *this; return Status::OK(); } -Status LoadStream::close(int64_t src_id, const std::vector& tablets_to_commit, - std::vector* success_tablet_ids, FailedTablets* failed_tablets) { +bool LoadStream::_mark_close(int64_t src_id, const std::vector& tablets_to_commit) { std::lock_guard lock_guard(_lock); SCOPED_TIMER(_close_wait_timer); @@ -389,17 +325,38 @@ Status LoadStream::close(int64_t src_id, const std::vector& tablets_t _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(), tablets_to_commit.end()); - if (_close_load_cnt < _total_streams) { - // do not return commit info if there is remaining streams. - return Status::OK(); + // return is_last + return _close_load_cnt == _total_streams; +} + +Status LoadStream::_do_close(std::vector* success_tablet_ids, + FailedTablets* failed_tablets) { + MonotonicStopWatch timer; + timer.start(); + // open all tablets need to commit + for (const auto& tablet : _tablets_to_commit) { + TabletStreamSharedPtr tablet_stream; + auto it = _tablet_streams_map.find(tablet.tablet_id()); + if (it == _tablet_streams_map.end()) { + RETURN_IF_ERROR(_init_tablet_stream(tablet_stream, tablet.tablet_id(), + tablet.index_id(), tablet.partition_id())); + } } - for (auto& [_, index_stream] : _index_streams_map) { - RETURN_IF_ERROR( - index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets)); + for (auto& [_, tablet_stream] : _tablet_streams_map) { + auto st = tablet_stream->close(); + if (st.ok()) { + success_tablet_ids->push_back(tablet_stream->id()); + } else { + LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st; + failed_tablets->emplace_back(tablet_stream->id(), st); + } } + timer.stop(); + int64_t time_ms = timer.elapsed_time() / 1000 / 1000; LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size() - << ", failed_tablet_num=" << failed_tablets->size(); + << ", failed_tablet_num=" << failed_tablets->size() << ", close_time=" << time_ms + << "ms"; return Status::OK(); } @@ -497,21 +454,39 @@ void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader& hdr) VLOG_DEBUG << "header parse result: " << hdr.DebugString(); } +Status LoadStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, + int64_t index_id, int64_t partition_id) { + tablet_stream = std::make_shared(_load_id, tablet_id, _txn_id, _load_stream_mgr, + _profile.get()); + _tablet_streams_map[tablet_id] = tablet_stream; + RETURN_IF_ERROR(tablet_stream->init(_schema, index_id, partition_id)); + return Status::OK(); +} + Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) { SCOPED_TIMER(_append_data_timer); - IndexStreamSharedPtr index_stream; int64_t index_id = header.index_id(); DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid", { index_id = UNKNOWN_ID_FOR_TEST; }); - auto it = _index_streams_map.find(index_id); - if (it == _index_streams_map.end()) { + if (!_indexes.contains(index_id)) { return Status::Error("unknown index_id {}", index_id); - } else { - index_stream = it->second; } - return index_stream->append_data(header, data); + int64_t tablet_id = header.tablet_id(); + TabletStreamSharedPtr tablet_stream; + { + std::lock_guard lock_guard(_lock); + auto it = _tablet_streams_map.find(tablet_id); + if (it == _tablet_streams_map.end()) { + RETURN_IF_ERROR( + _init_tablet_stream(tablet_stream, tablet_id, index_id, header.partition_id())); + } else { + tablet_stream = it->second; + } + } + + return tablet_stream->append_data(header, data); } int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) { @@ -586,7 +561,11 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* std::vector success_tablet_ids; FailedTablets failed_tablets; std::vector tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end()); - auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets); + auto st = Status::OK(); + bool is_last = _mark_close(hdr.src_id(), tablets_to_commit); + if (is_last) { + st = _do_close(&success_tablet_ids, &failed_tablets); + } _report_result(id, st, success_tablet_ids, failed_tablets, true); brpc::StreamClose(id); } break; diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index be1cb7756a115d..731d71820d0834 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -76,36 +76,6 @@ class TabletStream { using TabletStreamSharedPtr = std::shared_ptr; -class IndexStream { -public: - IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id, - std::shared_ptr schema, LoadStreamMgr* load_stream_mgr, - RuntimeProfile* profile); - - Status append_data(const PStreamHeader& header, butil::IOBuf* data); - - Status close(const std::vector& tablets_to_commit, - std::vector* success_tablet_ids, FailedTablets* failed_tablet_ids); - -private: - Status _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, - int64_t partition_id); - -private: - int64_t _id; - std::unordered_map _tablet_streams_map; - bthread::Mutex _lock; - PUniqueId _load_id; - int64_t _txn_id; - std::shared_ptr _schema; - std::unordered_map _tablet_partitions; - RuntimeProfile* _profile = nullptr; - RuntimeProfile::Counter* _append_data_timer = nullptr; - RuntimeProfile::Counter* _close_wait_timer = nullptr; - LoadStreamMgr* _load_stream_mgr = nullptr; -}; -using IndexStreamSharedPtr = std::shared_ptr; - using StreamId = brpc::StreamId; class LoadStream : public brpc::StreamInputHandler { public: @@ -119,9 +89,6 @@ class LoadStream : public brpc::StreamInputHandler { _open_streams[src_id]++; } - Status close(int64_t src_id, const std::vector& tablets_to_commit, - std::vector* success_tablet_ids, FailedTablets* failed_tablet_ids); - // callbacks called by brpc int on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) override; void on_idle_timeout(StreamId id) override; @@ -150,9 +117,18 @@ class LoadStream : public brpc::StreamInputHandler { Status _write_stream(StreamId stream, butil::IOBuf& buf); + Status _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, + int64_t index_id, int64_t partition_id); + + // return is_last + bool _mark_close(int64_t src_id, const std::vector& tablets_to_commit); + + Status _do_close(std::vector* success_tablet_ids, FailedTablets* failed_tablet_ids); + private: PUniqueId _load_id; - std::unordered_map _index_streams_map; + std::unordered_set _indexes; + std::unordered_map _tablet_streams_map; int32_t _total_streams = 0; int32_t _close_load_cnt = 0; std::atomic _close_rpc_cnt = 0; diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index 1d9ff6b347cfcc..d601928e72bbd4 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -482,8 +482,6 @@ class LoadStreamMgrTest : public testing::Test { return Status::OK(); } - Status close() { return Status::OK(); } - private: brpc::StreamId _stream; brpc::Controller _cntl;