Skip to content

Commit

Permalink
[refactor](move-memtable) remove IndexStream in LoadStream
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed May 21, 2024
1 parent ff6492f commit abea602
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 119 deletions.
145 changes: 62 additions & 83 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
g_load_stream_flush_wait_ms << time_ms;
g_load_stream_flush_running_threads << 1;
if (time_ms >= 100) {
LOG(INFO) << "waited " << time_ms << "ms for flush token task limit" << *this;
}
return flush_token->submit_func(flush_func);
}

Expand Down Expand Up @@ -260,71 +263,6 @@ Status TabletStream::close() {
return st;
}

IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
std::shared_ptr<OlapTableSchemaParam> schema,
LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
: _id(id),
_load_id(load_id),
_txn_id(txn_id),
_schema(schema),
_load_stream_mgr(load_stream_mgr) {
_profile = profile->create_child(fmt::format("IndexStream {}", id), true, true);
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
_close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
}

Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
SCOPED_TIMER(_append_data_timer);
int64_t tablet_id = header.tablet_id();
TabletStreamSharedPtr tablet_stream;
{
std::lock_guard lock_guard(_lock);
auto it = _tablet_streams_map.find(tablet_id);
if (it == _tablet_streams_map.end()) {
RETURN_IF_ERROR(_init_tablet_stream(tablet_stream, tablet_id, header.partition_id()));
} else {
tablet_stream = it->second;
}
}

return tablet_stream->append_data(header, data);
}

Status IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id) {
tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr,
_profile);
_tablet_streams_map[tablet_id] = tablet_stream;
RETURN_IF_ERROR(tablet_stream->init(_schema, _id, partition_id));
return Status::OK();
}

Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
std::lock_guard lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);
// open all need commit tablets
for (const auto& tablet : tablets_to_commit) {
TabletStreamSharedPtr tablet_stream;
auto it = _tablet_streams_map.find(tablet.tablet_id());
if (it == _tablet_streams_map.end() && _id == tablet.index_id()) {
RETURN_IF_ERROR(
_init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id()));
}
}

for (auto& [_, tablet_stream] : _tablet_streams_map) {
auto st = tablet_stream->close();
if (st.ok()) {
success_tablet_ids->push_back(tablet_stream->id());
} else {
LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st;
failed_tablets->emplace_back(tablet_stream->id(), st);
}
}
return Status::OK();
}

// TODO: Profile is temporary disabled, because:
// 1. It's not being processed by the upstream for now
// 2. There are some problems in _profile->to_thrift()
Expand Down Expand Up @@ -365,15 +303,13 @@ Status LoadStream::init(const POpenLoadStreamRequest* request) {
_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(request->schema()));
for (auto& index : request->schema().indexes()) {
_index_streams_map[index.id()] = std::make_shared<IndexStream>(
_load_id, index.id(), _txn_id, _schema, _load_stream_mgr, _profile.get());
_indexes.insert(index.id());
}
LOG(INFO) << "succeed to init load stream " << *this;
return Status::OK();
}

Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
bool LoadStream::_mark_close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit) {
std::lock_guard<bthread::Mutex> lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);

Expand All @@ -389,17 +325,38 @@ Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_t
_tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(),
tablets_to_commit.end());

if (_close_load_cnt < _total_streams) {
// do not return commit info if there is remaining streams.
return Status::OK();
// return is_last
return _close_load_cnt == _total_streams;
}

Status LoadStream::_do_close(std::vector<int64_t>* success_tablet_ids,
FailedTablets* failed_tablets) {
MonotonicStopWatch timer;
timer.start();
// open all tablets need to commit
for (const auto& tablet : _tablets_to_commit) {
TabletStreamSharedPtr tablet_stream;
auto it = _tablet_streams_map.find(tablet.tablet_id());
if (it == _tablet_streams_map.end()) {
RETURN_IF_ERROR(_init_tablet_stream(tablet_stream, tablet.tablet_id(),
tablet.index_id(), tablet.partition_id()));
}
}

for (auto& [_, index_stream] : _index_streams_map) {
RETURN_IF_ERROR(
index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets));
for (auto& [_, tablet_stream] : _tablet_streams_map) {
auto st = tablet_stream->close();
if (st.ok()) {
success_tablet_ids->push_back(tablet_stream->id());
} else {
LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st;
failed_tablets->emplace_back(tablet_stream->id(), st);
}
}
timer.stop();
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size()
<< ", failed_tablet_num=" << failed_tablets->size();
<< ", failed_tablet_num=" << failed_tablets->size() << ", close_time=" << time_ms
<< "ms";
return Status::OK();
}

Expand Down Expand Up @@ -497,21 +454,39 @@ void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader& hdr)
VLOG_DEBUG << "header parse result: " << hdr.DebugString();
}

Status LoadStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t index_id, int64_t partition_id) {
tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr,
_profile.get());
_tablet_streams_map[tablet_id] = tablet_stream;
RETURN_IF_ERROR(tablet_stream->init(_schema, index_id, partition_id));
return Status::OK();
}

Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) {
SCOPED_TIMER(_append_data_timer);
IndexStreamSharedPtr index_stream;

int64_t index_id = header.index_id();
DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid",
{ index_id = UNKNOWN_ID_FOR_TEST; });
auto it = _index_streams_map.find(index_id);
if (it == _index_streams_map.end()) {
if (!_indexes.contains(index_id)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>("unknown index_id {}", index_id);
} else {
index_stream = it->second;
}

return index_stream->append_data(header, data);
int64_t tablet_id = header.tablet_id();
TabletStreamSharedPtr tablet_stream;
{
std::lock_guard lock_guard(_lock);
auto it = _tablet_streams_map.find(tablet_id);
if (it == _tablet_streams_map.end()) {
RETURN_IF_ERROR(
_init_tablet_stream(tablet_stream, tablet_id, index_id, header.partition_id()));
} else {
tablet_stream = it->second;
}
}

return tablet_stream->append_data(header, data);
}

int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) {
Expand Down Expand Up @@ -586,7 +561,11 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf*
std::vector<int64_t> success_tablet_ids;
FailedTablets failed_tablets;
std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end());
auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
auto st = Status::OK();
bool is_last = _mark_close(hdr.src_id(), tablets_to_commit);
if (is_last) {
st = _do_close(&success_tablet_ids, &failed_tablets);
}
_report_result(id, st, success_tablet_ids, failed_tablets, true);
brpc::StreamClose(id);
} break;
Expand Down
44 changes: 10 additions & 34 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,36 +76,6 @@ class TabletStream {

using TabletStreamSharedPtr = std::shared_ptr<TabletStream>;

class IndexStream {
public:
IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
std::shared_ptr<OlapTableSchemaParam> schema, LoadStreamMgr* load_stream_mgr,
RuntimeProfile* profile);

Status append_data(const PStreamHeader& header, butil::IOBuf* data);

Status close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);

private:
Status _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id);

private:
int64_t _id;
std::unordered_map<int64_t /*tabletid*/, TabletStreamSharedPtr> _tablet_streams_map;
bthread::Mutex _lock;
PUniqueId _load_id;
int64_t _txn_id;
std::shared_ptr<OlapTableSchemaParam> _schema;
std::unordered_map<int64_t, int64_t> _tablet_partitions;
RuntimeProfile* _profile = nullptr;
RuntimeProfile::Counter* _append_data_timer = nullptr;
RuntimeProfile::Counter* _close_wait_timer = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
};
using IndexStreamSharedPtr = std::shared_ptr<IndexStream>;

using StreamId = brpc::StreamId;
class LoadStream : public brpc::StreamInputHandler {
public:
Expand All @@ -119,9 +89,6 @@ class LoadStream : public brpc::StreamInputHandler {
_open_streams[src_id]++;
}

Status close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);

// callbacks called by brpc
int on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) override;
void on_idle_timeout(StreamId id) override;
Expand Down Expand Up @@ -150,9 +117,18 @@ class LoadStream : public brpc::StreamInputHandler {

Status _write_stream(StreamId stream, butil::IOBuf& buf);

Status _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t index_id, int64_t partition_id);

// return is_last
bool _mark_close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit);

Status _do_close(std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);

private:
PUniqueId _load_id;
std::unordered_map<int64_t, IndexStreamSharedPtr> _index_streams_map;
std::unordered_set<int64_t> _indexes;
std::unordered_map<int64_t, TabletStreamSharedPtr> _tablet_streams_map;
int32_t _total_streams = 0;
int32_t _close_load_cnt = 0;
std::atomic<int32_t> _close_rpc_cnt = 0;
Expand Down
2 changes: 0 additions & 2 deletions be/test/runtime/load_stream_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,6 @@ class LoadStreamMgrTest : public testing::Test {
return Status::OK();
}

Status close() { return Status::OK(); }

private:
brpc::StreamId _stream;
brpc::Controller _cntl;
Expand Down

0 comments on commit abea602

Please sign in to comment.