Skip to content

Commit

Permalink
[improve] Make the schema change memory space adaptive #34350 (#34515)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang authored May 8, 2024
1 parent 0a38945 commit 778a887
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 3 deletions.
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ DEFINE_String(mem_limit, "80%");
// Soft memory limit as a fraction of hard memory limit.
DEFINE_Double(soft_mem_limit_frac, "0.9");

// Schema change memory limit as a fraction of soft memory limit.
DEFINE_Double(schema_change_mem_limit_frac, "0.6");

// Many modern allocators (for example, tcmalloc) do not do a mremap for
// realloc, even in case of large enough chunks of memory. Although this allows
// you to increase performance and reduce memory consumption during realloc.
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ DECLARE_String(mem_limit);
// Soft memory limit as a fraction of hard memory limit.
DECLARE_Double(soft_mem_limit_frac);

// Schema change memory limit as a fraction of soft memory limit.
DECLARE_Double(schema_change_mem_limit_frac);

// Many modern allocators (for example) do not do a mremap for
// realloc, even in case of large enough chunks of memory. Although this allows
// you to increase performance and reduce memory consumption during realloc.
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "olap/rowset/rowset_reader.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/segment_v2/inverted_index_writer.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
Expand Down Expand Up @@ -231,7 +232,9 @@ class SchemaChangeHandler {
bool sc_sorting, bool sc_directly) {
if (sc_sorting) {
return std::make_unique<VSchemaChangeWithSorting>(
changer, config::memory_limitation_per_thread_for_schema_change_bytes);
changer, ExecEnv::GetInstance()
->storage_engine()
->memory_limitation_bytes_per_thread_for_schema_change());
}

if (sc_directly) {
Expand Down
9 changes: 9 additions & 0 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
#include "runtime/memory/mem_tracker.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/metrics.h"
#include "util/spinlock.h"
#include "util/stopwatch.hpp"
Expand Down Expand Up @@ -141,6 +142,9 @@ StorageEngine::StorageEngine(const EngineOptions& options)
});

_broken_paths = options.broken_paths;

_memory_limitation_bytes_for_schema_change =
static_cast<int64_t>(MemInfo::soft_mem_limit() * config::schema_change_mem_limit_frac);
}

StorageEngine::~StorageEngine() {
Expand Down Expand Up @@ -169,6 +173,11 @@ StorageEngine::~StorageEngine() {
_s_instance = nullptr;
}

int64_t StorageEngine::memory_limitation_bytes_per_thread_for_schema_change() const {
return std::max(_memory_limitation_bytes_for_schema_change / config::alter_tablet_worker_count,
config::memory_limitation_per_thread_for_schema_change_bytes);
}

Status StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
std::vector<std::thread> threads;
std::vector<Status> results(data_dirs.size());
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ class StorageEngine {

std::set<string> get_broken_paths() { return _broken_paths; }

int64_t memory_limitation_bytes_per_thread_for_schema_change() const;

private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
Expand Down Expand Up @@ -479,6 +481,8 @@ class StorageEngine {

std::unique_ptr<CreateTabletIdxCache> _create_tablet_idx_lru_cache;

int64_t _memory_limitation_bytes_for_schema_change;

DISALLOW_COPY_AND_ASSIGN(StorageEngine);
};

Expand Down
9 changes: 8 additions & 1 deletion be/src/olap/task/engine_alter_tablet_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,19 @@ namespace doris {

EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request)
: _alter_tablet_req(request) {
auto mem_limit = ExecEnv::GetInstance()
->storage_engine()
->memory_limitation_bytes_per_thread_for_schema_change();
_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
std::to_string(_alter_tablet_req.base_tablet_id),
std::to_string(_alter_tablet_req.new_tablet_id)),
config::memory_limitation_per_thread_for_schema_change_bytes);
mem_limit);
LOG_INFO("schema change")
.tag("base_tablet_id", request.base_tablet_id)
.tag("new_tablet_id", request.new_tablet_id)
.tag("mem_limit", mem_limit);
}

Status EngineAlterTabletTask::execute() {
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/task/engine_index_change_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "olap/task/engine_index_change_task.h"

#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"

Expand All @@ -29,7 +30,9 @@ EngineIndexChangeTask::EngineIndexChangeTask(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
fmt::format("EngineIndexChangeTask#tabletId={}",
std::to_string(_alter_inverted_index_req.tablet_id)),
config::memory_limitation_per_thread_for_schema_change_bytes);
ExecEnv::GetInstance()
->storage_engine()
->memory_limitation_bytes_per_thread_for_schema_change());
}

Status EngineIndexChangeTask::execute() {
Expand Down

0 comments on commit 778a887

Please sign in to comment.