Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement](Wal)Support dynamic wal space limit #27726

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,7 @@ DEFINE_Int32(grace_shutdown_wait_seconds, "120");
DEFINE_Int16(bitmap_serialize_version, "1");

// group commit insert config
DEFINE_String(group_commit_wal_path, "./wal");
DEFINE_String(group_commit_wal_path, "");
DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
DEFINE_Int32(group_commit_relay_wal_threads, "10");
Expand Down Expand Up @@ -1118,8 +1118,9 @@ DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");
// Max size(bytes) of group commit queues, used for mem back pressure, defult 64M.
DEFINE_Int32(group_commit_max_queue_size, "67108864");

// Max size(bytes) of wal disk using, used for disk space back pressure, default 64M.
DEFINE_Int32(wal_max_disk_size, "67108864");
// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space.
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified.
DEFINE_String(group_commit_wal_max_disk_limit, "10%");

// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
Expand Down
5 changes: 3 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1199,8 +1199,9 @@ DECLARE_String(default_tzfiles_path);
// Max size(bytes) of group commit queues, used for mem back pressure.
DECLARE_Int32(group_commit_max_queue_size);

// Max size(bytes) of wal disk using, used for disk space back pressure.
DECLARE_Int32(wal_max_disk_size);
// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space.
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified.
DECLARE_mString(group_commit_wal_max_disk_limit);

// Ingest binlog work pool size
DECLARE_Int32(ingest_binlog_work_pool_size);
Expand Down
55 changes: 40 additions & 15 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "http/utils.h"
#include "io/fs/stream_load_pipe.h"
#include "olap/storage_engine.h"
#include "olap/wal_manager.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
Expand Down Expand Up @@ -169,31 +170,38 @@ int HttpStreamAction::on_header(HttpRequest* req) {

Status st = Status::OK();
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (iequal(group_commit_mode, "off_mode")) {
group_commit_mode = "";
}
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
st = Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]");
if (iequal(group_commit_mode, "off_mode")) {
group_commit_mode = "";
}
} else if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) {
// off_mode and empty
group_commit_mode = "off_mode";
Yukang-Lian marked this conversation as resolved.
Show resolved Hide resolved
ctx->group_commit = false;
} else {
// sync_mode and async_mode
ctx->group_commit = true;
}
ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty();
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!temp_partitions && !partitions && !ctx->two_phase_commit &&
(!group_commit_mode.empty() || config::wait_internal_group_commit_finish)) {
if (config::wait_internal_group_commit_finish) {
if (iequal(group_commit_mode, "async_mode") || config::wait_internal_group_commit_finish) {
ctx->group_commit = true;
} else {
ctx->group_commit = load_size_smaller_than_wal_limit(req);
if (!ctx->group_commit) {
LOG(WARNING) << "The data size for this http load("
<< req->header(HttpHeaders::CONTENT_LENGTH)
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
<< config::wal_max_disk_size * 0.8
<< " Bytes). Please set this load to \"group commit\"=false.";
group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode";
if (iequal(group_commit_mode, "sync_mode")) {
size_t max_available_size =
ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
LOG(INFO) << "When enable group commit, the data size can't be too large. The data "
"size "
"for this http load("
<< (req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0
: std::stol(req->header(HttpHeaders::CONTENT_LENGTH)))
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
<< max_available_size
<< " Bytes). So we set this load to \"group commit\"=sync_mode "
"automatically.";
st = Status::Error<EXCEEDED_LIMIT>("Http load size too large.");
}
}
Expand Down Expand Up @@ -358,6 +366,23 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
if (http_req->header(HTTP_GROUP_COMMIT) == "async mode") {
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t content_length = 0;
content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
content_length *= 3;
}
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(
ctx->id.to_thrift(), content_length));
}
}

return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}
Expand Down
64 changes: 43 additions & 21 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <thrift/protocol/TDebugProtocol.h>
#include <time.h>

#include <algorithm>
#include <future>
#include <map>
#include <sstream>
Expand Down Expand Up @@ -189,15 +190,16 @@ int StreamLoadAction::on_header(HttpRequest* req) {
ctx->label = req->header(HTTP_LABEL_KEY);
Status st = Status::OK();
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (iequal(group_commit_mode, "off_mode")) {
group_commit_mode = "";
}
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
st = Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]");
if (iequal(group_commit_mode, "off_mode")) {
group_commit_mode = "";
}
} else if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) {
// off_mode and empty
group_commit_mode = "off_mode";
ctx->group_commit = false;
} else {
// sync_mode and async_mode
ctx->group_commit = true;
}
auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() &&
iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");
Expand All @@ -206,19 +208,26 @@ int StreamLoadAction::on_header(HttpRequest* req) {
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit &&
(!group_commit_mode.empty() || config::wait_internal_group_commit_finish)) {
if (!group_commit_mode.empty() && !ctx->label.empty()) {
if (!config::wait_internal_group_commit_finish && ctx->group_commit &&
!ctx->label.empty()) {
st = Status::InternalError("label and group_commit can't be set at the same time");
}
if (config::wait_internal_group_commit_finish) {
if (iequal(group_commit_mode, "async_mode") || config::wait_internal_group_commit_finish) {
ctx->group_commit = true;
} else {
ctx->group_commit = load_size_smaller_than_wal_limit(req);
if (!ctx->group_commit) {
LOG(WARNING) << "The data size for this stream load("
<< req->header(HttpHeaders::CONTENT_LENGTH)
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
<< config::wal_max_disk_size * 0.8
<< " Bytes). Please set this load to \"group commit\"=false.";
group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode";
if (iequal(group_commit_mode, "sync_mode")) {
size_t max_available_size =
ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
LOG(INFO) << "When enable group commit, the data size can't be too large. The data "
"size "
"for this stream load("
<< (req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0
: std::stol(req->header(HttpHeaders::CONTENT_LENGTH)))
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
<< max_available_size
<< " Bytes). So we set this load to \"group commit\"=sync_mode "
"automatically.";
st = Status::Error<EXCEEDED_LIMIT>("Stream load size too large.");
}
}
Expand Down Expand Up @@ -640,11 +649,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
request.__set_memtable_on_sink_node(value);
}
if (ctx->group_commit) {
if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
} else {
request.__set_group_commit_mode("sync_mode");
}
request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
}

#ifndef BE_TEST
Expand All @@ -665,6 +670,23 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief();
return plan_status;
}
if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t content_length = 0;
content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
content_length *= 3;
}
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(
ctx->id.to_thrift(), content_length));
}
}

VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params);
// if we not use streaming, we must download total content before we begin
Expand Down
7 changes: 5 additions & 2 deletions be/src/http/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <sys/stat.h>
#include <unistd.h>

#include <algorithm>
#include <memory>
#include <ostream>
#include <vector>
Expand All @@ -38,6 +39,8 @@
#include "http/http_status.h"
#include "io/fs/file_system.h"
#include "io/fs/local_file_system.h"
#include "olap/wal_manager.h"
#include "runtime/exec_env.h"
#include "util/path_util.h"
#include "util/url_coding.h"

Expand Down Expand Up @@ -199,8 +202,8 @@ bool load_size_smaller_than_wal_limit(HttpRequest* req) {
// these blocks within the limited space. So we need to set group_commit = false to avoid dead lock.
if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
// TODO(Yukang): change it to WalManager::wal_limit
return (body_bytes <= config::wal_max_disk_size * 0.8) && (body_bytes != 0);
size_t max_available_size = ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
return (body_bytes != 0 && body_bytes < 0.8 * max_available_size);
} else {
return false;
}
Expand Down
13 changes: 13 additions & 0 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,19 @@ Status LocalFileSystem::file_size_impl(const Path& file, int64_t* file_size) con
return Status::OK();
}

Status LocalFileSystem::directory_size(const Path& dir_path, size_t* dir_size) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'directory_size' can be made static [readability-convert-member-functions-to-static]

be/src/io/fs/local_file_system.h:80:

-     Status directory_size(const Path& dir_path, size_t* dir_size);
+     static Status directory_size(const Path& dir_path, size_t* dir_size);

*dir_size = 0;
if (std::filesystem::exists(dir_path) && std::filesystem::is_directory(dir_path)) {
for (const auto& entry : std::filesystem::recursive_directory_iterator(dir_path)) {
if (std::filesystem::is_regular_file(entry)) {
*dir_size += std::filesystem::file_size(entry);
}
}
return Status::OK();
}
return Status::IOError("faile to get dir size {}", dir_path.native());
}

Status LocalFileSystem::list_impl(const Path& dir, bool only_file, std::vector<FileInfo>* files,
bool* exists) {
RETURN_IF_ERROR(exists_impl(dir, exists));
Expand Down
1 change: 1 addition & 0 deletions be/src/io/fs/local_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class LocalFileSystem final : public FileSystem {
// "safe" means the path will be concat with the path prefix config::user_files_secure_path,
// so that it can not list any files outside the config::user_files_secure_path
Status safe_glob(const std::string& path, std::vector<FileInfo>* res);
Status directory_size(const Path& dir_path, size_t* dir_size);

protected:
Status create_file_impl(const Path& file, FileWriterPtr* writer,
Expand Down
Loading
Loading