Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](load) use memtable writer in memtable memory limiter #22780

Merged
merged 2 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,6 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
}
return _memtable_writer.write(block, row_idxs, is_append);
}

Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
return _memtable_writer.flush_memtable_and_wait(need_wait);
}

Status DeltaWriter::wait_flush() {
return _memtable_writer.wait_flush();
}
Expand Down Expand Up @@ -385,14 +380,6 @@ int64_t DeltaWriter::mem_consumption(MemType mem) {
return _memtable_writer.mem_consumption(mem);
}

int64_t DeltaWriter::active_memtable_mem_consumption() {
return _memtable_writer.active_memtable_mem_consumption();
}

int64_t DeltaWriter::partition_id() const {
return _req.partition_id;
}

void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam* table_schema_param,
const TabletSchema& ori_tablet_schema) {
Expand Down
18 changes: 6 additions & 12 deletions be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,16 @@ class DeltaWriter {
Status cancel();
Status cancel_with_status(const Status& st);

// submit current memtable to flush queue, and wait all memtables in flush queue
// to be flushed.
// This is currently for reducing mem consumption of this delta writer.
// If need_wait is true, it will wait for all memtable in flush queue to be flushed.
// Otherwise, it will just put memtables to the flush queue and return.
Status flush_memtable_and_wait(bool need_wait);

int64_t partition_id() const;

int64_t mem_consumption(MemType mem);
int64_t active_memtable_mem_consumption();

// Wait all memtable in flush queue to be flushed
Status wait_flush();

int64_t tablet_id() { return _tablet->tablet_id(); }
int64_t partition_id() const { return _req.partition_id; }

int64_t txn_id() { return _req.txn_id; }
int64_t tablet_id() const { return _req.tablet_id; }

int64_t txn_id() const { return _req.txn_id; }

void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);

Expand All @@ -126,6 +118,8 @@ class DeltaWriter {
// For UT
DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; }

MemTableWriter* memtable_writer() { return &_memtable_writer; }

private:
DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile,
const UniqueId& load_id);
Expand Down
16 changes: 8 additions & 8 deletions be/src/olap/memtable_memory_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "olap/memtable_memory_limiter.h"

#include "common/config.h"
#include "olap/delta_writer.h"
#include "olap/memtable_writer.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/metrics.h"
Expand Down Expand Up @@ -61,12 +61,12 @@ Status MemTableMemoryLimiter::init(int64_t process_mem_limit) {
return Status::OK();
}

void MemTableMemoryLimiter::register_writer(DeltaWriter* writer) {
void MemTableMemoryLimiter::register_writer(MemTableWriter* writer) {
std::lock_guard<std::mutex> l(_lock);
_writers.insert(writer);
}

void MemTableMemoryLimiter::deregister_writer(DeltaWriter* writer) {
void MemTableMemoryLimiter::deregister_writer(MemTableWriter* writer) {
std::lock_guard<std::mutex> l(_lock);
_writers.erase(writer);
}
Expand Down Expand Up @@ -130,8 +130,8 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
if (!st.ok()) {
auto err_msg = fmt::format(
"tablet writer failed to reduce mem consumption by flushing memtable, "
"tablet_id={}, txn_id={}, err={}",
writer->tablet_id(), writer->txn_id(), st.to_string());
"tablet_id={}, err={}",
writer->tablet_id(), st.to_string());
LOG(WARNING) << err_msg;
writer->cancel_with_status(st);
}
Expand All @@ -150,7 +150,7 @@ void MemTableMemoryLimiter::handle_memtable_flush() {

std::ostringstream oss;
oss << "reducing memory of " << writers_to_reduce_mem.size()
<< " delta writers (total mem: "
<< " memtable writers (total mem: "
<< PrettyPrinter::print_bytes(mem_consumption_in_picked_writer)
<< ", max mem: " << PrettyPrinter::print_bytes(writers_to_reduce_mem.front().mem_size)
<< ", min mem:" << PrettyPrinter::print_bytes(writers_to_reduce_mem.back().mem_size)
Expand Down Expand Up @@ -188,8 +188,8 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
if (!st.ok()) {
auto err_msg = fmt::format(
"tablet writer failed to reduce mem consumption by flushing memtable, "
"tablet_id={}, txn_id={}, err={}",
item.writer->tablet_id(), item.writer->txn_id(), st.to_string());
"tablet_id={}, err={}",
item.writer->tablet_id(), st.to_string());
LOG(WARNING) << err_msg;
item.writer->cancel_with_status(st);
}
Expand Down
10 changes: 5 additions & 5 deletions be/src/olap/memtable_memory_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
#include "util/countdown_latch.h"

namespace doris {
class DeltaWriter;
class MemTableWriter;
struct WriterMemItem {
DeltaWriter* writer;
MemTableWriter* writer;
int64_t mem_size;
};
class MemTableMemoryLimiter {
Expand All @@ -40,9 +40,9 @@ class MemTableMemoryLimiter {
// If yes, it will flush memtable to try to reduce memory consumption.
void handle_memtable_flush();

void register_writer(DeltaWriter* writer);
void register_writer(MemTableWriter* writer);

void deregister_writer(DeltaWriter* writer);
void deregister_writer(MemTableWriter* writer);

void refresh_mem_tracker() {
std::lock_guard<std::mutex> l(_lock);
Expand All @@ -66,6 +66,6 @@ class MemTableMemoryLimiter {
int64_t _load_soft_mem_limit = -1;
bool _soft_reduce_mem_in_progress = false;

std::unordered_set<DeltaWriter*> _writers;
std::unordered_set<MemTableWriter*> _writers;
};
} // namespace doris
5 changes: 1 addition & 4 deletions be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,9 @@ class LoadChannel {
// close will reset deltawriter memtable and should deregister writer before it.
{
std::lock_guard<SpinLock> l(_tablets_channels_lock);
auto memtable_memory_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
auto tablet_channel_it = _tablets_channels.find(index_id);
if (tablet_channel_it != _tablets_channels.end()) {
for (auto& writer_it : tablet_channel_it->second->get_tablet_writers()) {
memtable_memory_limiter->deregister_writer(writer_it.second);
}
tablet_channel_it->second->deregister_memtable_memory_limiter();
}
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/load_channel_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ LoadChannelMgr::~LoadChannelMgr() {
}

Status LoadChannelMgr::init(int64_t process_mem_limit) {
_memtable_memory_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
_last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
RETURN_IF_ERROR(_start_bg_worker());
return Status::OK();
Expand Down Expand Up @@ -159,7 +158,7 @@ Status LoadChannelMgr::add_batch(const PTabletWriterAddBlockRequest& request,
// If this is a high priority load task, do not handle this.
// because this may block for a while, which may lead to rpc timeout.
SCOPED_TIMER(channel->get_handle_mem_limit_timer());
_memtable_memory_limiter->handle_memtable_flush();
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
}

// 3. add batch to load channel
Expand Down
14 changes: 6 additions & 8 deletions be/src/runtime/load_channel_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
#include <stdint.h>

#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <utility>

// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
Expand Down Expand Up @@ -70,18 +72,14 @@ class LoadChannelMgr {
Status _start_bg_worker();

void _register_channel_all_writers(std::shared_ptr<doris::LoadChannel> channel) {
for (auto& tablet_channel_it : channel->get_tablets_channels()) {
for (auto& writer_it : tablet_channel_it.second->get_tablet_writers()) {
_memtable_memory_limiter->register_writer(writer_it.second);
}
for (auto& [_, tablet_channel] : channel->get_tablets_channels()) {
tablet_channel->register_memtable_memory_limiter();
}
}

void _deregister_channel_all_writers(std::shared_ptr<doris::LoadChannel> channel) {
for (auto& tablet_channel_it : channel->get_tablets_channels()) {
for (auto& writer_it : tablet_channel_it.second->get_tablet_writers()) {
_memtable_memory_limiter->deregister_writer(writer_it.second);
}
for (auto& [_, tablet_channel] : channel->get_tablets_channels()) {
tablet_channel->deregister_memtable_memory_limiter();
}
}

Expand Down
26 changes: 24 additions & 2 deletions be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ TabletsChannel::TabletsChannel(const TabletsChannelKey& key, const UniqueId& loa

TabletsChannel::~TabletsChannel() {
_s_tablet_writer_count -= _tablet_writers.size();
auto memtable_memory_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
for (auto& it : _tablet_writers) {
auto memtable_memory_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
memtable_memory_limiter->deregister_writer(it.second);
memtable_memory_limiter->deregister_writer(it.second->memtable_writer());
delete it.second;
}
delete _schema;
Expand Down Expand Up @@ -497,4 +497,26 @@ bool TabletsChannel::_is_broken_tablet(int64_t tablet_id) {
std::shared_lock<std::shared_mutex> rlock(_broken_tablets_lock);
return _broken_tablets.find(tablet_id) != _broken_tablets.end();
}

void TabletsChannel::register_memtable_memory_limiter() {
auto memtable_memory_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
_memtable_writers_foreach([memtable_memory_limiter](MemTableWriter* writer) {
memtable_memory_limiter->register_writer(writer);
});
}

void TabletsChannel::deregister_memtable_memory_limiter() {
auto memtable_memory_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
_memtable_writers_foreach([memtable_memory_limiter](MemTableWriter* writer) {
memtable_memory_limiter->deregister_writer(writer);
});
}

void TabletsChannel::_memtable_writers_foreach(std::function<void(MemTableWriter*)> fn) {
std::lock_guard<SpinLock> l(_tablet_writers_lock);
for (auto& [_, delta_writer] : _tablet_writers) {
fn(delta_writer->memtable_writer());
}
}

} // namespace doris
9 changes: 5 additions & 4 deletions be/src/runtime/tablets_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ struct TabletsChannelKey {
std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key);

class DeltaWriter;
class MemTableWriter;
class OlapTableSchemaParam;
class LoadChannel;

Expand Down Expand Up @@ -112,10 +113,9 @@ class TabletsChannel {

void refresh_profile();

std::unordered_map<int64_t, DeltaWriter*> get_tablet_writers() {
std::lock_guard<SpinLock> l(_tablet_writers_lock);
return _tablet_writers;
}
void register_memtable_memory_limiter();

void deregister_memtable_memory_limiter();

private:
template <typename Request>
Expand All @@ -135,6 +135,7 @@ class TabletsChannel {
int64_t tablet_id, Status error);
bool _is_broken_tablet(int64_t tablet_id);
void _init_profile(RuntimeProfile* profile);
void _memtable_writers_foreach(std::function<void(MemTableWriter*)> fn);

// id of this load channel
TabletsChannelKey _key;
Expand Down
7 changes: 4 additions & 3 deletions be/test/olap/memtable_memory_limiter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,16 @@ TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) {
}
std::mutex lock;
_mgr->init(100);
auto memtable_writer = delta_writer->memtable_writer();
{
std::lock_guard<std::mutex> l(lock);
_mgr->register_writer(delta_writer);
_mgr->register_writer(memtable_writer);
}
_mgr->handle_memtable_flush();
CHECK_EQ(0, delta_writer->active_memtable_mem_consumption());
CHECK_EQ(0, memtable_writer->active_memtable_mem_consumption());
{
std::lock_guard<std::mutex> l(lock);
_mgr->deregister_writer(delta_writer);
_mgr->deregister_writer(memtable_writer);
}

res = delta_writer->close();
Expand Down
Loading