Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](move-memtable) remove IndexStream in LoadStream #35143

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 62 additions & 83 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
g_load_stream_flush_wait_ms << time_ms;
g_load_stream_flush_running_threads << 1;
if (time_ms >= 100) {
LOG(INFO) << "waited " << time_ms << "ms for flush token task limit" << *this;
}
return flush_token->submit_func(flush_func);
}

Expand Down Expand Up @@ -260,71 +263,6 @@ Status TabletStream::close() {
return st;
}

IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
std::shared_ptr<OlapTableSchemaParam> schema,
LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
: _id(id),
_load_id(load_id),
_txn_id(txn_id),
_schema(schema),
_load_stream_mgr(load_stream_mgr) {
_profile = profile->create_child(fmt::format("IndexStream {}", id), true, true);
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
_close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
}

Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
SCOPED_TIMER(_append_data_timer);
int64_t tablet_id = header.tablet_id();
TabletStreamSharedPtr tablet_stream;
{
std::lock_guard lock_guard(_lock);
auto it = _tablet_streams_map.find(tablet_id);
if (it == _tablet_streams_map.end()) {
RETURN_IF_ERROR(_init_tablet_stream(tablet_stream, tablet_id, header.partition_id()));
} else {
tablet_stream = it->second;
}
}

return tablet_stream->append_data(header, data);
}

Status IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id) {
tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr,
_profile);
_tablet_streams_map[tablet_id] = tablet_stream;
RETURN_IF_ERROR(tablet_stream->init(_schema, _id, partition_id));
return Status::OK();
}

Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
std::lock_guard lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);
// open all need commit tablets
for (const auto& tablet : tablets_to_commit) {
TabletStreamSharedPtr tablet_stream;
auto it = _tablet_streams_map.find(tablet.tablet_id());
if (it == _tablet_streams_map.end() && _id == tablet.index_id()) {
RETURN_IF_ERROR(
_init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id()));
}
}

for (auto& [_, tablet_stream] : _tablet_streams_map) {
auto st = tablet_stream->close();
if (st.ok()) {
success_tablet_ids->push_back(tablet_stream->id());
} else {
LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st;
failed_tablets->emplace_back(tablet_stream->id(), st);
}
}
return Status::OK();
}

// TODO: Profile is temporary disabled, because:
// 1. It's not being processed by the upstream for now
// 2. There are some problems in _profile->to_thrift()
Expand Down Expand Up @@ -365,15 +303,13 @@ Status LoadStream::init(const POpenLoadStreamRequest* request) {
_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(request->schema()));
for (auto& index : request->schema().indexes()) {
_index_streams_map[index.id()] = std::make_shared<IndexStream>(
_load_id, index.id(), _txn_id, _schema, _load_stream_mgr, _profile.get());
_indexes.insert(index.id());
}
LOG(INFO) << "succeed to init load stream " << *this;
return Status::OK();
}

Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
bool LoadStream::_mark_close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit) {
std::lock_guard<bthread::Mutex> lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);

Expand All @@ -389,17 +325,38 @@ Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_t
_tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(),
tablets_to_commit.end());

if (_close_load_cnt < _total_streams) {
// do not return commit info if there is remaining streams.
return Status::OK();
// return is_last
return _close_load_cnt == _total_streams;
}

Status LoadStream::_do_close(std::vector<int64_t>* success_tablet_ids,
FailedTablets* failed_tablets) {
MonotonicStopWatch timer;
timer.start();
// open all tablets need to commit
for (const auto& tablet : _tablets_to_commit) {
TabletStreamSharedPtr tablet_stream;
auto it = _tablet_streams_map.find(tablet.tablet_id());
if (it == _tablet_streams_map.end()) {
RETURN_IF_ERROR(_init_tablet_stream(tablet_stream, tablet.tablet_id(),
tablet.index_id(), tablet.partition_id()));
}
}

for (auto& [_, index_stream] : _index_streams_map) {
RETURN_IF_ERROR(
index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets));
for (auto& [_, tablet_stream] : _tablet_streams_map) {
auto st = tablet_stream->close();
if (st.ok()) {
success_tablet_ids->push_back(tablet_stream->id());
} else {
LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st;
failed_tablets->emplace_back(tablet_stream->id(), st);
}
}
timer.stop();
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size()
<< ", failed_tablet_num=" << failed_tablets->size();
<< ", failed_tablet_num=" << failed_tablets->size() << ", close_time=" << time_ms
<< "ms";
return Status::OK();
}

Expand Down Expand Up @@ -497,21 +454,39 @@ void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader& hdr)
VLOG_DEBUG << "header parse result: " << hdr.DebugString();
}

Status LoadStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t index_id, int64_t partition_id) {
tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr,
_profile.get());
_tablet_streams_map[tablet_id] = tablet_stream;
RETURN_IF_ERROR(tablet_stream->init(_schema, index_id, partition_id));
return Status::OK();
}

Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) {
SCOPED_TIMER(_append_data_timer);
IndexStreamSharedPtr index_stream;

int64_t index_id = header.index_id();
DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid",
{ index_id = UNKNOWN_ID_FOR_TEST; });
auto it = _index_streams_map.find(index_id);
if (it == _index_streams_map.end()) {
if (!_indexes.contains(index_id)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>("unknown index_id {}", index_id);
} else {
index_stream = it->second;
}

return index_stream->append_data(header, data);
int64_t tablet_id = header.tablet_id();
TabletStreamSharedPtr tablet_stream;
{
std::lock_guard lock_guard(_lock);
auto it = _tablet_streams_map.find(tablet_id);
if (it == _tablet_streams_map.end()) {
RETURN_IF_ERROR(
_init_tablet_stream(tablet_stream, tablet_id, index_id, header.partition_id()));
} else {
tablet_stream = it->second;
}
}

return tablet_stream->append_data(header, data);
}

int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) {
Expand Down Expand Up @@ -586,7 +561,11 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf*
std::vector<int64_t> success_tablet_ids;
FailedTablets failed_tablets;
std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end());
auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
auto st = Status::OK();
bool is_last = _mark_close(hdr.src_id(), tablets_to_commit);
if (is_last) {
st = _do_close(&success_tablet_ids, &failed_tablets);
}
_report_result(id, st, success_tablet_ids, failed_tablets, true);
brpc::StreamClose(id);
} break;
Expand Down
44 changes: 10 additions & 34 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,36 +76,6 @@ class TabletStream {

using TabletStreamSharedPtr = std::shared_ptr<TabletStream>;

class IndexStream {
public:
IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
std::shared_ptr<OlapTableSchemaParam> schema, LoadStreamMgr* load_stream_mgr,
RuntimeProfile* profile);

Status append_data(const PStreamHeader& header, butil::IOBuf* data);

Status close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);

private:
Status _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id);

private:
int64_t _id;
std::unordered_map<int64_t /*tabletid*/, TabletStreamSharedPtr> _tablet_streams_map;
bthread::Mutex _lock;
PUniqueId _load_id;
int64_t _txn_id;
std::shared_ptr<OlapTableSchemaParam> _schema;
std::unordered_map<int64_t, int64_t> _tablet_partitions;
RuntimeProfile* _profile = nullptr;
RuntimeProfile::Counter* _append_data_timer = nullptr;
RuntimeProfile::Counter* _close_wait_timer = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
};
using IndexStreamSharedPtr = std::shared_ptr<IndexStream>;

using StreamId = brpc::StreamId;
class LoadStream : public brpc::StreamInputHandler {
public:
Expand All @@ -119,9 +89,6 @@ class LoadStream : public brpc::StreamInputHandler {
_open_streams[src_id]++;
}

Status close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);

// callbacks called by brpc
int on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) override;
void on_idle_timeout(StreamId id) override;
Expand Down Expand Up @@ -150,9 +117,18 @@ class LoadStream : public brpc::StreamInputHandler {

Status _write_stream(StreamId stream, butil::IOBuf& buf);

Status _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t index_id, int64_t partition_id);

// return is_last
bool _mark_close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit);

Status _do_close(std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);

private:
PUniqueId _load_id;
std::unordered_map<int64_t, IndexStreamSharedPtr> _index_streams_map;
std::unordered_set<int64_t> _indexes;
std::unordered_map<int64_t, TabletStreamSharedPtr> _tablet_streams_map;
int32_t _total_streams = 0;
int32_t _close_load_cnt = 0;
std::atomic<int32_t> _close_rpc_cnt = 0;
Expand Down
2 changes: 0 additions & 2 deletions be/test/runtime/load_stream_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,6 @@ class LoadStreamMgrTest : public testing::Test {
return Status::OK();
}

Status close() { return Status::OK(); }

private:
brpc::StreamId _stream;
brpc::Controller _cntl;
Expand Down
Loading