Skip to content

Commit

Permalink
[Enhancement](Wal)Support dynamic wal space limit (#27726)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian authored Dec 27, 2023
1 parent 8e35861 commit 9ff8bd2
Show file tree
Hide file tree
Showing 27 changed files with 846 additions and 227 deletions.
7 changes: 4 additions & 3 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,7 @@ DEFINE_Int32(grace_shutdown_wait_seconds, "120");
DEFINE_Int16(bitmap_serialize_version, "1");

// group commit insert config
DEFINE_String(group_commit_wal_path, "./wal");
DEFINE_String(group_commit_wal_path, "");
DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
DEFINE_Int32(group_commit_relay_wal_threads, "10");
Expand Down Expand Up @@ -1128,8 +1128,9 @@ DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");
// Max size(bytes) of group commit queues, used for mem back pressure, defult 64M.
DEFINE_Int32(group_commit_max_queue_size, "67108864");

// Max size(bytes) of wal disk using, used for disk space back pressure, default 64M.
DEFINE_Int32(wal_max_disk_size, "67108864");
// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space.
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified.
DEFINE_String(group_commit_wal_max_disk_limit, "10%");

// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
Expand Down
5 changes: 3 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1203,8 +1203,9 @@ DECLARE_String(default_tzfiles_path);
// Max size(bytes) of group commit queues, used for mem back pressure.
DECLARE_Int32(group_commit_max_queue_size);

// Max size(bytes) of wal disk using, used for disk space back pressure.
DECLARE_Int32(wal_max_disk_size);
// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space.
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified.
DECLARE_mString(group_commit_wal_max_disk_limit);

// Ingest binlog work pool size
DECLARE_Int32(ingest_binlog_work_pool_size);
Expand Down
55 changes: 40 additions & 15 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "http/utils.h"
#include "io/fs/stream_load_pipe.h"
#include "olap/storage_engine.h"
#include "olap/wal_manager.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
Expand Down Expand Up @@ -169,31 +170,38 @@ int HttpStreamAction::on_header(HttpRequest* req) {

Status st = Status::OK();
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (iequal(group_commit_mode, "off_mode")) {
group_commit_mode = "";
}
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
st = Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]");
if (iequal(group_commit_mode, "off_mode")) {
group_commit_mode = "";
}
} else if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) {
// off_mode and empty
group_commit_mode = "off_mode";
ctx->group_commit = false;
} else {
// sync_mode and async_mode
ctx->group_commit = true;
}
ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty();
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!temp_partitions && !partitions && !ctx->two_phase_commit &&
(!group_commit_mode.empty() || config::wait_internal_group_commit_finish)) {
if (config::wait_internal_group_commit_finish) {
if (iequal(group_commit_mode, "async_mode") || config::wait_internal_group_commit_finish) {
ctx->group_commit = true;
} else {
ctx->group_commit = load_size_smaller_than_wal_limit(req);
if (!ctx->group_commit) {
LOG(WARNING) << "The data size for this http load("
<< req->header(HttpHeaders::CONTENT_LENGTH)
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
<< config::wal_max_disk_size * 0.8
<< " Bytes). Please set this load to \"group commit\"=false.";
group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode";
if (iequal(group_commit_mode, "sync_mode")) {
size_t max_available_size =
ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
LOG(INFO) << "When enable group commit, the data size can't be too large. The data "
"size "
"for this http load("
<< (req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0
: std::stol(req->header(HttpHeaders::CONTENT_LENGTH)))
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
<< max_available_size
<< " Bytes). So we set this load to \"group commit\"=sync_mode "
"automatically.";
st = Status::Error<EXCEEDED_LIMIT>("Http load size too large.");
}
}
Expand Down Expand Up @@ -358,6 +366,23 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
if (http_req->header(HTTP_GROUP_COMMIT) == "async mode") {
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t content_length = 0;
content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
content_length *= 3;
}
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(
ctx->id.to_thrift(), content_length));
}
}

return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}
Expand Down
64 changes: 43 additions & 21 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <thrift/protocol/TDebugProtocol.h>
#include <time.h>

#include <algorithm>
#include <future>
#include <map>
#include <sstream>
Expand Down Expand Up @@ -189,15 +190,16 @@ int StreamLoadAction::on_header(HttpRequest* req) {
ctx->label = req->header(HTTP_LABEL_KEY);
Status st = Status::OK();
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (iequal(group_commit_mode, "off_mode")) {
group_commit_mode = "";
}
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
st = Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]");
if (iequal(group_commit_mode, "off_mode")) {
group_commit_mode = "";
}
} else if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) {
// off_mode and empty
group_commit_mode = "off_mode";
ctx->group_commit = false;
} else {
// sync_mode and async_mode
ctx->group_commit = true;
}
auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() &&
iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");
Expand All @@ -206,19 +208,26 @@ int StreamLoadAction::on_header(HttpRequest* req) {
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit &&
(!group_commit_mode.empty() || config::wait_internal_group_commit_finish)) {
if (!group_commit_mode.empty() && !ctx->label.empty()) {
if (!config::wait_internal_group_commit_finish && ctx->group_commit &&
!ctx->label.empty()) {
st = Status::InternalError("label and group_commit can't be set at the same time");
}
if (config::wait_internal_group_commit_finish) {
if (iequal(group_commit_mode, "async_mode") || config::wait_internal_group_commit_finish) {
ctx->group_commit = true;
} else {
ctx->group_commit = load_size_smaller_than_wal_limit(req);
if (!ctx->group_commit) {
LOG(WARNING) << "The data size for this stream load("
<< req->header(HttpHeaders::CONTENT_LENGTH)
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
<< config::wal_max_disk_size * 0.8
<< " Bytes). Please set this load to \"group commit\"=false.";
group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode";
if (iequal(group_commit_mode, "sync_mode")) {
size_t max_available_size =
ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
LOG(INFO) << "When enable group commit, the data size can't be too large. The data "
"size "
"for this stream load("
<< (req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0
: std::stol(req->header(HttpHeaders::CONTENT_LENGTH)))
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
<< max_available_size
<< " Bytes). So we set this load to \"group commit\"=sync_mode "
"automatically.";
st = Status::Error<EXCEEDED_LIMIT>("Stream load size too large.");
}
}
Expand Down Expand Up @@ -640,11 +649,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
request.__set_memtable_on_sink_node(value);
}
if (ctx->group_commit) {
if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
} else {
request.__set_group_commit_mode("sync_mode");
}
request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
}

#ifndef BE_TEST
Expand All @@ -665,6 +670,23 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief();
return plan_status;
}
if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t content_length = 0;
content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
content_length *= 3;
}
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(
ctx->id.to_thrift(), content_length));
}
}

VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params);
// if we not use streaming, we must download total content before we begin
Expand Down
7 changes: 5 additions & 2 deletions be/src/http/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <sys/stat.h>
#include <unistd.h>

#include <algorithm>
#include <memory>
#include <ostream>
#include <vector>
Expand All @@ -38,6 +39,8 @@
#include "http/http_status.h"
#include "io/fs/file_system.h"
#include "io/fs/local_file_system.h"
#include "olap/wal_manager.h"
#include "runtime/exec_env.h"
#include "util/path_util.h"
#include "util/url_coding.h"

Expand Down Expand Up @@ -199,8 +202,8 @@ bool load_size_smaller_than_wal_limit(HttpRequest* req) {
// these blocks within the limited space. So we need to set group_commit = false to avoid dead lock.
if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
// TODO(Yukang): change it to WalManager::wal_limit
return (body_bytes <= config::wal_max_disk_size * 0.8) && (body_bytes != 0);
size_t max_available_size = ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
return (body_bytes != 0 && body_bytes < 0.8 * max_available_size);
} else {
return false;
}
Expand Down
13 changes: 13 additions & 0 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,19 @@ Status LocalFileSystem::file_size_impl(const Path& file, int64_t* file_size) con
return Status::OK();
}

Status LocalFileSystem::directory_size(const Path& dir_path, size_t* dir_size) {
*dir_size = 0;
if (std::filesystem::exists(dir_path) && std::filesystem::is_directory(dir_path)) {
for (const auto& entry : std::filesystem::recursive_directory_iterator(dir_path)) {
if (std::filesystem::is_regular_file(entry)) {
*dir_size += std::filesystem::file_size(entry);
}
}
return Status::OK();
}
return Status::IOError("faile to get dir size {}", dir_path.native());
}

Status LocalFileSystem::list_impl(const Path& dir, bool only_file, std::vector<FileInfo>* files,
bool* exists) {
RETURN_IF_ERROR(exists_impl(dir, exists));
Expand Down
1 change: 1 addition & 0 deletions be/src/io/fs/local_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class LocalFileSystem final : public FileSystem {
// "safe" means the path will be concat with the path prefix config::user_files_secure_path,
// so that it can not list any files outside the config::user_files_secure_path
Status safe_glob(const std::string& path, std::vector<FileInfo>* res);
Status directory_size(const Path& dir_path, size_t* dir_size);

protected:
Status create_file_impl(const Path& file, FileWriterPtr* writer,
Expand Down
Loading

0 comments on commit 9ff8bd2

Please sign in to comment.