From 9ff8bd2e9c995760b740800addd42eddfa8e25df Mon Sep 17 00:00:00 2001 From: abmdocrt Date: Wed, 27 Dec 2023 11:51:32 +0800 Subject: [PATCH] [Enhancement](Wal)Support dynamic wal space limit (#27726) --- be/src/common/config.cpp | 7 +- be/src/common/config.h | 5 +- be/src/http/action/http_stream.cpp | 55 +++-- be/src/http/action/stream_load.cpp | 64 +++-- be/src/http/utils.cpp | 7 +- be/src/io/fs/local_file_system.cpp | 13 + be/src/io/fs/local_file_system.h | 1 + be/src/olap/wal_dirs_info.cpp | 224 ++++++++++++++++++ be/src/olap/wal_dirs_info.h | 85 +++++++ be/src/olap/wal_manager.cpp | 165 +++++++++++-- be/src/olap/wal_manager.h | 37 ++- be/src/olap/wal_writer.cpp | 28 +-- be/src/olap/wal_writer.h | 10 +- be/src/runtime/exec_env.h | 1 + be/src/runtime/group_commit_mgr.cpp | 92 ++++++- be/src/runtime/group_commit_mgr.h | 24 +- be/src/vec/sink/group_commit_block_sink.cpp | 30 ++- be/src/vec/sink/group_commit_block_sink.h | 2 +- be/src/vec/sink/writer/vwal_writer.cpp | 10 +- be/src/vec/sink/writer/vwal_writer.h | 3 +- be/test/http/stream_load_test.cpp | 13 +- be/test/olap/wal_manager_test.cpp | 101 +++++++- be/test/olap/wal_reader_writer_test.cpp | 5 +- be/test/vec/exec/vtablet_sink_test.cpp | 2 +- be/test/vec/exec/vwal_scanner_test.cpp | 7 +- ..._group_commit_and_wal_back_pressure.groovy | 6 +- .../test_group_commit_wal_limit.groovy | 76 +----- 27 files changed, 846 insertions(+), 227 deletions(-) create mode 100644 be/src/olap/wal_dirs_info.cpp create mode 100644 be/src/olap/wal_dirs_info.h diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 9a886ff19c9207..358db99e592b24 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1099,7 +1099,7 @@ DEFINE_Int32(grace_shutdown_wait_seconds, "120"); DEFINE_Int16(bitmap_serialize_version, "1"); // group commit insert config -DEFINE_String(group_commit_wal_path, "./wal"); +DEFINE_String(group_commit_wal_path, ""); DEFINE_Int32(group_commit_replay_wal_retry_num, "10"); DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5"); DEFINE_Int32(group_commit_relay_wal_threads, "10"); @@ -1128,8 +1128,9 @@ DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo"); // Max size(bytes) of group commit queues, used for mem back pressure, defult 64M. DEFINE_Int32(group_commit_max_queue_size, "67108864"); -// Max size(bytes) of wal disk using, used for disk space back pressure, default 64M. -DEFINE_Int32(wal_max_disk_size, "67108864"); +// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space. +// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified. +DEFINE_String(group_commit_wal_max_disk_limit, "10%"); // Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency DEFINE_Int32(ingest_binlog_work_pool_size, "-1"); diff --git a/be/src/common/config.h b/be/src/common/config.h index bb341002902ef7..a41a3d06141182 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1203,8 +1203,9 @@ DECLARE_String(default_tzfiles_path); // Max size(bytes) of group commit queues, used for mem back pressure. DECLARE_Int32(group_commit_max_queue_size); -// Max size(bytes) of wal disk using, used for disk space back pressure. -DECLARE_Int32(wal_max_disk_size); +// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space. +// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified. +DECLARE_mString(group_commit_wal_max_disk_limit); // Ingest binlog work pool size DECLARE_Int32(ingest_binlog_work_pool_size); diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 8c75f6be7402ac..18256237a81107 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -46,6 +46,7 @@ #include "http/utils.h" #include "io/fs/stream_load_pipe.h" #include "olap/storage_engine.h" +#include "olap/wal_manager.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" @@ -169,31 +170,38 @@ int HttpStreamAction::on_header(HttpRequest* req) { Status st = Status::OK(); std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); - if (iequal(group_commit_mode, "off_mode")) { - group_commit_mode = ""; - } if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { st = Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]"); - if (iequal(group_commit_mode, "off_mode")) { - group_commit_mode = ""; - } + } else if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) { + // off_mode and empty + group_commit_mode = "off_mode"; + ctx->group_commit = false; + } else { + // sync_mode and async_mode + ctx->group_commit = true; } ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true"; auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty(); auto partitions = !req->header(HTTP_PARTITIONS).empty(); if (!temp_partitions && !partitions && !ctx->two_phase_commit && (!group_commit_mode.empty() || config::wait_internal_group_commit_finish)) { - if (config::wait_internal_group_commit_finish) { + if (iequal(group_commit_mode, "async_mode") || config::wait_internal_group_commit_finish) { ctx->group_commit = true; - } else { - ctx->group_commit = load_size_smaller_than_wal_limit(req); - if (!ctx->group_commit) { - LOG(WARNING) << "The data size for this http load(" - << req->header(HttpHeaders::CONTENT_LENGTH) - << " Bytes) exceeds the WAL (Write-Ahead Log) limit (" - << config::wal_max_disk_size * 0.8 - << " Bytes). Please set this load to \"group commit\"=false."; + group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode"; + if (iequal(group_commit_mode, "sync_mode")) { + size_t max_available_size = + ExecEnv::GetInstance()->wal_mgr()->get_max_available_size(); + LOG(INFO) << "When enable group commit, the data size can't be too large. The data " + "size " + "for this http load(" + << (req->header(HttpHeaders::CONTENT_LENGTH).empty() + ? 0 + : std::stol(req->header(HttpHeaders::CONTENT_LENGTH))) + << " Bytes) exceeds the WAL (Write-Ahead Log) limit (" + << max_available_size + << " Bytes). So we set this load to \"group commit\"=sync_mode " + "automatically."; st = Status::Error("Http load size too large."); } } @@ -358,6 +366,23 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req, ctx->txn_id = ctx->put_result.params.txn_conf.txn_id; ctx->label = ctx->put_result.params.import_label; ctx->put_result.params.__set_wal_id(ctx->wal_id); + if (http_req->header(HTTP_GROUP_COMMIT) == "async mode") { + if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { + size_t content_length = 0; + content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); + if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || + ctx->format == TFileFormatType::FORMAT_CSV_LZO || + ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || + ctx->format == TFileFormatType::FORMAT_CSV_LZOP || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || + ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { + content_length *= 3; + } + RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info( + ctx->id.to_thrift(), content_length)); + } + } return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); } diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 04c29a53027823..eddf31856a03d5 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -32,6 +32,7 @@ #include #include +#include #include #include #include @@ -189,15 +190,16 @@ int StreamLoadAction::on_header(HttpRequest* req) { ctx->label = req->header(HTTP_LABEL_KEY); Status st = Status::OK(); std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); - if (iequal(group_commit_mode, "off_mode")) { - group_commit_mode = ""; - } if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { st = Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]"); - if (iequal(group_commit_mode, "off_mode")) { - group_commit_mode = ""; - } + } else if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) { + // off_mode and empty + group_commit_mode = "off_mode"; + ctx->group_commit = false; + } else { + // sync_mode and async_mode + ctx->group_commit = true; } auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() && iequal(req->header(HTTP_PARTIAL_COLUMNS), "true"); @@ -206,19 +208,26 @@ int StreamLoadAction::on_header(HttpRequest* req) { auto partitions = !req->header(HTTP_PARTITIONS).empty(); if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit && (!group_commit_mode.empty() || config::wait_internal_group_commit_finish)) { - if (!group_commit_mode.empty() && !ctx->label.empty()) { + if (!config::wait_internal_group_commit_finish && ctx->group_commit && + !ctx->label.empty()) { st = Status::InternalError("label and group_commit can't be set at the same time"); } - if (config::wait_internal_group_commit_finish) { + if (iequal(group_commit_mode, "async_mode") || config::wait_internal_group_commit_finish) { ctx->group_commit = true; - } else { - ctx->group_commit = load_size_smaller_than_wal_limit(req); - if (!ctx->group_commit) { - LOG(WARNING) << "The data size for this stream load(" - << req->header(HttpHeaders::CONTENT_LENGTH) - << " Bytes) exceeds the WAL (Write-Ahead Log) limit (" - << config::wal_max_disk_size * 0.8 - << " Bytes). Please set this load to \"group commit\"=false."; + group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode"; + if (iequal(group_commit_mode, "sync_mode")) { + size_t max_available_size = + ExecEnv::GetInstance()->wal_mgr()->get_max_available_size(); + LOG(INFO) << "When enable group commit, the data size can't be too large. The data " + "size " + "for this stream load(" + << (req->header(HttpHeaders::CONTENT_LENGTH).empty() + ? 0 + : std::stol(req->header(HttpHeaders::CONTENT_LENGTH))) + << " Bytes) exceeds the WAL (Write-Ahead Log) limit (" + << max_available_size + << " Bytes). So we set this load to \"group commit\"=sync_mode " + "automatically."; st = Status::Error("Stream load size too large."); } } @@ -640,11 +649,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, request.__set_memtable_on_sink_node(value); } if (ctx->group_commit) { - if (!http_req->header(HTTP_GROUP_COMMIT).empty()) { - request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); - } else { - request.__set_group_commit_mode("sync_mode"); - } + request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); } #ifndef BE_TEST @@ -665,6 +670,23 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief(); return plan_status; } + if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { + if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { + size_t content_length = 0; + content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); + if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || + ctx->format == TFileFormatType::FORMAT_CSV_LZO || + ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || + ctx->format == TFileFormatType::FORMAT_CSV_LZOP || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || + ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { + content_length *= 3; + } + RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info( + ctx->id.to_thrift(), content_length)); + } + } VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params); // if we not use streaming, we must download total content before we begin diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp index 0d66887663f849..9fbf4c79300dcd 100644 --- a/be/src/http/utils.cpp +++ b/be/src/http/utils.cpp @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -38,6 +39,8 @@ #include "http/http_status.h" #include "io/fs/file_system.h" #include "io/fs/local_file_system.h" +#include "olap/wal_manager.h" +#include "runtime/exec_env.h" #include "util/path_util.h" #include "util/url_coding.h" @@ -199,8 +202,8 @@ bool load_size_smaller_than_wal_limit(HttpRequest* req) { // these blocks within the limited space. So we need to set group_commit = false to avoid dead lock. if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) { size_t body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH)); - // TODO(Yukang): change it to WalManager::wal_limit - return (body_bytes <= config::wal_max_disk_size * 0.8) && (body_bytes != 0); + size_t max_available_size = ExecEnv::GetInstance()->wal_mgr()->get_max_available_size(); + return (body_bytes != 0 && body_bytes < 0.8 * max_available_size); } else { return false; } diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp index 83e1554fe6b48f..a71708ac72c824 100644 --- a/be/src/io/fs/local_file_system.cpp +++ b/be/src/io/fs/local_file_system.cpp @@ -172,6 +172,19 @@ Status LocalFileSystem::file_size_impl(const Path& file, int64_t* file_size) con return Status::OK(); } +Status LocalFileSystem::directory_size(const Path& dir_path, size_t* dir_size) { + *dir_size = 0; + if (std::filesystem::exists(dir_path) && std::filesystem::is_directory(dir_path)) { + for (const auto& entry : std::filesystem::recursive_directory_iterator(dir_path)) { + if (std::filesystem::is_regular_file(entry)) { + *dir_size += std::filesystem::file_size(entry); + } + } + return Status::OK(); + } + return Status::IOError("faile to get dir size {}", dir_path.native()); +} + Status LocalFileSystem::list_impl(const Path& dir, bool only_file, std::vector* files, bool* exists) { RETURN_IF_ERROR(exists_impl(dir, exists)); diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h index 53df0b8e06bf44..4f7e5107fb3eb6 100644 --- a/be/src/io/fs/local_file_system.h +++ b/be/src/io/fs/local_file_system.h @@ -78,6 +78,7 @@ class LocalFileSystem final : public FileSystem { // "safe" means the path will be concat with the path prefix config::user_files_secure_path, // so that it can not list any files outside the config::user_files_secure_path Status safe_glob(const std::string& path, std::vector* res); + Status directory_size(const Path& dir_path, size_t* dir_size); protected: Status create_file_impl(const Path& file, FileWriterPtr* writer, diff --git a/be/src/olap/wal_dirs_info.cpp b/be/src/olap/wal_dirs_info.cpp new file mode 100644 index 00000000000000..340d896a8c60db --- /dev/null +++ b/be/src/olap/wal_dirs_info.cpp @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_dirs_info.h" + +#include +#include +#include + +#include "common/config.h" +#include "common/status.h" +#include "io/fs/local_file_system.h" +#include "util/parse_util.h" + +namespace doris { + +std::string WalDirInfo::get_wal_dir() { + return _wal_dir; +} + +size_t WalDirInfo::get_limit() { + std::shared_lock rlock(_lock); + return _limit; +} + +size_t WalDirInfo::get_used() { + std::shared_lock rlock(_lock); + return _used; +} + +size_t WalDirInfo::get_pre_allocated() { + std::shared_lock rlock(_lock); + return _pre_allocated; +} + +Status WalDirInfo::set_limit(size_t limit) { + std::unique_lock wlock(_lock); + _limit = limit; + return Status::OK(); +} + +Status WalDirInfo::set_used(size_t used) { + std::unique_lock wlock(_lock); + _used = used; + return Status::OK(); +} + +Status WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + std::unique_lock wlock(_lock); + if (is_add_pre_allocated) { + _pre_allocated += pre_allocated; + } else { + _pre_allocated -= pre_allocated; + } + return Status::OK(); +} + +size_t WalDirInfo::available() { + std::unique_lock wlock(_lock); + int64_t available = _limit - _used - _pre_allocated; + return available > 0 ? available : 0; +} + +Status WalDirInfo::update_wal_dir_limit(size_t limit) { + if (limit != static_cast(-1)) { + RETURN_IF_ERROR(set_limit(limit)); + } else { + size_t available_bytes; + size_t disk_capacity_bytes; + RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info( + _wal_dir, &disk_capacity_bytes, &available_bytes)); + bool is_percent = true; + int64_t wal_disk_limit = ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit, + -1, available_bytes, &is_percent); + if (wal_disk_limit <= 0) { + return Status::InternalError("Disk full! Please check your disk usage!"); + } + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_limit(wal_disk_limit)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_used(size_t used) { + if (used != static_cast(-1)) { + RETURN_IF_ERROR(set_used(used)); + } else { + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_used(wal_dir_size)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + RETURN_IF_ERROR(set_pre_allocated(pre_allocated, is_add_pre_allocated)); + return Status::OK(); +} + +Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used, + size_t pre_allocated) { + for (const auto& it : _wal_dirs_info_vec) { + if (it->get_wal_dir() == wal_dir) { +#ifdef BE_TEST + return Status::OK(); +#endif + return Status::InternalError("wal dir {} exists!", wal_dir); + } + } + std::unique_lock wlock(_lock); + _wal_dirs_info_vec.emplace_back( + std::make_shared(wal_dir, limit, used, pre_allocated)); + return Status::OK(); +} + +Status WalDirsInfo::clear() { + std::unique_lock wlock(_lock); + _wal_dirs_info_vec.clear(); + return Status::OK(); +} + +std::string WalDirsInfo::get_available_random_wal_dir() { + if (_wal_dirs_info_vec.size() == 1) { + return (*_wal_dirs_info_vec.begin())->get_wal_dir(); + } else { + std::vector available_wal_dirs; + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->available() > wal_dir_info->get_limit() * 0.2) { + available_wal_dirs.emplace_back(wal_dir_info->get_wal_dir()); + } + } + if (available_wal_dirs.empty()) { + return (*std::min_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->get_wal_dir(); + } else { + return (*std::next(_wal_dirs_info_vec.begin(), rand() % _wal_dirs_info_vec.size())) + ->get_wal_dir(); + } + } +} + +size_t WalDirsInfo::get_max_available_size() { + return _wal_dirs_info_vec.size() == 1 + ? (*_wal_dirs_info_vec.begin())->available() + : (*std::max_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->available(); +} + +Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->get_wal_dir() == wal_dir) { + return wal_dir_info->update_wal_dir_limit(limit); + } + } + return Status::InternalError("Can not find wal dir in wal disks info."); +} + +Status WalDirsInfo::update_all_wal_dir_limit() { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + RETURN_IF_ERROR(wal_dir_info->update_wal_dir_limit(-1)); + } + return Status::OK(); +} + +Status WalDirsInfo::update_wal_dir_used(std::string wal_dir, size_t used) { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->get_wal_dir() == wal_dir) { + return wal_dir_info->update_wal_dir_used(used); + } + } + return Status::InternalError("Can not find wal dir in wal disks info."); +} + +Status WalDirsInfo::update_all_wal_dir_used() { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + RETURN_IF_ERROR(wal_dir_info->update_wal_dir_used(-1)); + } + return Status::OK(); +} + +Status WalDirsInfo::update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated, + bool is_add_pre_allocated) { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->get_wal_dir() == wal_dir) { + return wal_dir_info->update_wal_dir_pre_allocated(pre_allocated, is_add_pre_allocated); + } + } + return Status::InternalError("Can not find wal dir in wal disks info."); +} + +Status WalDirsInfo::get_wal_dir_available_size(const std::string& wal_dir, + size_t* available_bytes) { + std::shared_lock l(_lock); + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->get_wal_dir() == wal_dir) { + *available_bytes = wal_dir_info->available(); + return Status::OK(); + } + } + return Status::InternalError("can not find wal dir!"); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_dirs_info.h b/be/src/olap/wal_dirs_info.h new file mode 100644 index 00000000000000..048cd8f9564362 --- /dev/null +++ b/be/src/olap/wal_dirs_info.h @@ -0,0 +1,85 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "common/factory_creator.h" +#include "common/status.h" + +namespace doris { +class WalDirInfo { + ENABLE_FACTORY_CREATOR(WalDirInfo); + +public: + WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t pre_allocated) + : _wal_dir(std::move(wal_dir)), + _limit(limit), + _used(used), + _pre_allocated(pre_allocated) {} + std::string get_wal_dir(); + size_t get_limit(); + size_t get_used(); + size_t get_pre_allocated(); + Status set_limit(size_t limit); + Status set_used(size_t used); + Status set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated); + size_t available(); + Status update_wal_dir_limit(size_t limit = -1); + Status update_wal_dir_used(size_t used = -1); + Status update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated = true); + +private: + std::string _wal_dir; + size_t _limit; + size_t _used; + size_t _pre_allocated; + std::shared_mutex _lock; +}; + +class WalDirsInfo { + ENABLE_FACTORY_CREATOR(WalDirsInfo); + +public: + WalDirsInfo() = default; + ~WalDirsInfo() = default; + Status add(const std::string& wal_dir, size_t limit, size_t used, size_t pre_allocated); + Status clear(); + std::string get_available_random_wal_dir(); + size_t get_max_available_size(); + Status update_wal_dir_limit(std::string wal_dir, size_t limit = -1); + Status update_all_wal_dir_limit(); + Status update_wal_dir_used(std::string wal_dir, size_t used = -1); + Status update_all_wal_dir_used(); + Status update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated, + bool is_add_pre_allocated); + Status get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes); + +private: + std::vector> _wal_dirs_info_vec; + std::shared_mutex _lock; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp index 238f88ebd00707..c62996da19b415 100644 --- a/be/src/olap/wal_manager.cpp +++ b/be/src/olap/wal_manager.cpp @@ -17,6 +17,7 @@ #include "olap/wal_manager.h" +#include #include #include @@ -24,14 +25,24 @@ #include #include #include +#include +#include +#include +#include #include +#include +#include "common/config.h" +#include "common/status.h" #include "io/fs/local_file_system.h" +#include "olap/wal_dirs_info.h" #include "olap/wal_writer.h" #include "runtime/client_cache.h" +#include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/plan_fragment_executor.h" #include "runtime/stream_load/stream_load_context.h" +#include "util/parse_util.h" #include "util/path_util.h" #include "util/thrift_rpc_helper.h" #include "vec/exec/format/wal/wal_reader.h" @@ -40,12 +51,11 @@ namespace doris { WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) : _exec_env(exec_env), _stop_background_threads_latch(1), _stop(false) { doris::vectorized::WalReader::string_split(wal_dir_list, ";", _wal_dirs); - _all_wal_disk_bytes = std::make_shared(0); - _cv = std::make_shared(); static_cast(ThreadPoolBuilder("GroupCommitReplayWalThreadPool") .set_min_threads(1) .set_max_threads(config::group_commit_relay_wal_threads) .build(&_thread_pool)); + _wal_dirs_info = WalDirsInfo::create_unique(); } WalManager::~WalManager() { @@ -60,12 +70,46 @@ void WalManager::stop() { if (_replay_thread) { _replay_thread->join(); } + if (_update_wal_dirs_info_thread) { + _update_wal_dirs_info_thread->join(); + } _thread_pool->shutdown(); LOG(INFO) << "WalManager is stopped"; } } Status WalManager::init() { + RETURN_IF_ERROR(_init_wal_dirs_conf()); + RETURN_IF_ERROR(_init_wal_dirs()); + RETURN_IF_ERROR(_init_wal_dirs_info()); + return Thread::create( + "WalMgr", "replay_wal", [this]() { static_cast(this->replay()); }, + &_replay_thread); +} + +Status WalManager::_init_wal_dirs_conf() { + std::vector tmp_dirs; + if (_wal_dirs.empty()) { + // default case. + for (const StorePath& path : ExecEnv::GetInstance()->store_paths()) { + tmp_dirs.emplace_back(path.path + "/wal"); + } + } else { + // user config must be absolute path. + for (const std::string& wal_dir : _wal_dirs) { + if (std::filesystem::path(wal_dir).is_absolute()) { + tmp_dirs.emplace_back(wal_dir); + } else { + return Status::InternalError( + "BE config group_commit_replay_wal_dir has to be absolute path!"); + } + } + } + _wal_dirs = tmp_dirs; + return Status::OK(); +} + +Status WalManager::_init_wal_dirs() { bool exists = false; for (auto wal_dir : _wal_dirs) { std::string tmp_dir = wal_dir + "/tmp"; @@ -80,9 +124,44 @@ Status WalManager::init() { } RETURN_IF_ERROR(scan_wals(wal_dir)); } + return Status::OK(); +} + +Status WalManager::_init_wal_dirs_info() { + for (const std::string& wal_dir : _wal_dirs) { + size_t available_bytes; +#ifndef BE_TEST + size_t disk_capacity_bytes; + RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(wal_dir, &disk_capacity_bytes, + &available_bytes)); +#else + available_bytes = wal_limit_test_bytes; +#endif + bool is_percent = true; + int64_t wal_disk_limit = ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit, + -1, available_bytes, &is_percent); + if (wal_disk_limit < 0) { + return Status::InternalError( + "group_commit_wal_max_disk_limit config is wrong, please check your config!"); + } + // if there are some wal files in wal dir, we need to add it to wal disk limit. + size_t wal_dir_size = 0; +#ifndef BE_TEST + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(wal_dir, &wal_dir_size)); +#endif + if (is_percent) { + wal_disk_limit += wal_dir_size; + } + RETURN_IF_ERROR(_wal_dirs_info->add(wal_dir, wal_disk_limit, wal_dir_size, 0)); + +#ifdef BE_TEST + wal_limit_test_bytes = wal_disk_limit; +#endif + } return Thread::create( - "WalMgr", "replay_wal", [this]() { static_cast(this->replay()); }, - &_replay_thread); + "WalMgr", "update_wal_dir_info", + [this]() { static_cast(this->_update_wal_dir_info_thread()); }, + &_update_wal_dirs_info_thread); } void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status) { @@ -158,9 +237,8 @@ void WalManager::print_wal_status_queue() { } Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, - const std::string& label) { - std::string base_path = - _wal_dirs.size() == 1 ? _wal_dirs[0] : _wal_dirs[rand() % _wal_dirs.size()]; + const std::string& label, std::string& base_path) { + base_path = _wal_dirs_info->get_available_random_wal_dir(); std::stringstream ss; ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/" << std::to_string(wal_id) << "_" << label; @@ -209,7 +287,7 @@ Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr& RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path)); } LOG(INFO) << "create wal " << wal_path; - wal_writer = std::make_shared(wal_path, _all_wal_disk_bytes, _cv); + wal_writer = std::make_shared(wal_path); RETURN_IF_ERROR(wal_writer->init()); { std::lock_guard wrlock(_wal_lock); @@ -250,7 +328,7 @@ Status WalManager::scan_wals(const std::string& wal_path) { << ", st=" << st.to_string(); return st; } - if (wals.size() == 0) { + if (wals.empty()) { continue; } std::vector res; @@ -331,6 +409,12 @@ Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, std::vector< table_ptr = it->second; } table_ptr->add_wals(wals); +#ifndef BE_TEST + for (auto wal : wals) { + RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal))); + RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal))); + } +#endif return Status::OK(); } @@ -344,31 +428,20 @@ size_t WalManager::get_wal_table_size(int64_t table_id) { } } -Status WalManager::delete_wal(int64_t wal_id) { +Status WalManager::delete_wal(int64_t wal_id, size_t block_queue_pre_allocated) { + std::string wal_path; { std::lock_guard wrlock(_wal_lock); - if (_wal_id_to_writer_map.find(wal_id) != _wal_id_to_writer_map.end()) { - _all_wal_disk_bytes->fetch_sub(_wal_id_to_writer_map[wal_id]->disk_bytes(), - std::memory_order_relaxed); - _cv->notify_one(); - std::string wal_path = _wal_path_map[wal_id]; - LOG(INFO) << "wal delete file=" << wal_path << ", this file disk usage is " - << _wal_id_to_writer_map[wal_id]->disk_bytes() - << " ,after deleting it, all wals disk usage is " - << _all_wal_disk_bytes->load(std::memory_order_relaxed); - _wal_id_to_writer_map.erase(wal_id); - } - if (_wal_id_to_writer_map.empty()) { - CHECK_EQ(_all_wal_disk_bytes->load(std::memory_order_relaxed), 0); - } auto it = _wal_path_map.find(wal_id); if (it != _wal_path_map.end()) { - std::string wal_path = it->second; + wal_path = it->second; RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path)); LOG(INFO) << "delete file=" << wal_path; _wal_path_map.erase(wal_id); } } + RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), + block_queue_pre_allocated, false)); return Status::OK(); } @@ -409,4 +482,46 @@ Status WalManager::get_wal_column_index(int64_t wal_id, std::vector& col return Status::OK(); } +size_t WalManager::get_max_available_size() { + return _wal_dirs_info->get_max_available_size(); +} + +Status WalManager::update_wal_dir_limit(const std::string& wal_dir, size_t limit) { + return _wal_dirs_info->update_wal_dir_limit(wal_dir, limit); +} + +Status WalManager::update_wal_dir_used(const std::string& wal_dir, size_t used) { + return _wal_dirs_info->update_wal_dir_used(wal_dir, used); +} + +Status WalManager::update_wal_dir_pre_allocated(const std::string& wal_dir, size_t pre_allocated, + bool is_add_pre_allocated) { + return _wal_dirs_info->update_wal_dir_pre_allocated(wal_dir, pre_allocated, + is_add_pre_allocated); +} + +Status WalManager::_update_wal_dir_info_thread() { + while (!_stop.load()) { + static_cast(_wal_dirs_info->update_all_wal_dir_limit()); + static_cast(_wal_dirs_info->update_all_wal_dir_used()); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + return Status::OK(); +} + +Status WalManager::get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes) { + return _wal_dirs_info->get_wal_dir_available_size(wal_dir, available_bytes); +} + +std::string WalManager::_get_base_wal_path(const std::string& wal_path_str) { + io::Path wal_path = wal_path_str; + for (int i = 0; i < 3; ++i) { + if (!wal_path.has_parent_path()) { + return ""; + } + wal_path = wal_path.parent_path(); + } + return wal_path.string(); +} + } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h index 83847beabf0b85..8d330a64bb9fc8 100644 --- a/be/src/olap/wal_manager.h +++ b/be/src/olap/wal_manager.h @@ -16,15 +16,24 @@ // under the License. #pragma once + #include +#include #include +#include +#include #include +#include +#include +#include #include "common/config.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/FrontendService_types.h" #include "gen_cpp/HeartbeatService_types.h" +#include "gutil/ref_counted.h" +#include "olap/wal_dirs_info.h" #include "olap/wal_reader.h" #include "olap/wal_table.h" #include "olap/wal_writer.h" @@ -47,7 +56,7 @@ class WalManager { public: WalManager(ExecEnv* exec_env, const std::string& wal_dir); ~WalManager(); - Status delete_wal(int64_t wal_id); + Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0); Status init(); Status scan_wals(const std::string& wal_path); Status replay(); @@ -56,7 +65,8 @@ class WalManager { Status scan(); size_t get_wal_table_size(int64_t table_id); Status add_recover_wal(int64_t db_id, int64_t table_id, std::vector wals); - Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label); + Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label, + std::string& base_path); Status get_wal_path(int64_t wal_id, std::string& wal_path); Status get_wal_status_queue_size(const PGetWalQueueSizeRequest* request, PGetWalQueueSizeResponse* response); @@ -70,10 +80,30 @@ class WalManager { void erase_wal_column_index(int64_t wal_id); Status get_wal_column_index(int64_t wal_id, std::vector& column_index); + Status update_wal_dir_limit(const std::string& wal_dir, size_t limit = -1); + Status update_wal_dir_used(const std::string& wal_dir, size_t used = -1); + Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t pre_allocated, + bool is_add_pre_allocated); + Status get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes); + size_t get_max_available_size(); + +private: + Status _init_wal_dirs_conf(); + Status _init_wal_dirs(); + Status _init_wal_dirs_info(); + std::string _get_base_wal_path(const std::string& wal_path_str); + const std::string& _get_available_random_wal_dir(); + Status _update_wal_dir_info_thread(); + +public: + // used for be ut + size_t wal_limit_test_bytes; + private: ExecEnv* _exec_env = nullptr; std::shared_mutex _lock; scoped_refptr _replay_thread; + scoped_refptr _update_wal_dirs_info_thread; CountDownLatch _stop_background_threads_latch; std::map> _table_map; std::vector _wal_dirs; @@ -81,12 +111,11 @@ class WalManager { std::shared_mutex _wal_status_lock; std::unordered_map _wal_path_map; std::unordered_map> _wal_id_to_writer_map; - std::shared_ptr _all_wal_disk_bytes; std::unordered_map> _wal_status_queues; std::atomic _stop; std::shared_mutex _wal_column_id_map_lock; std::unordered_map&> _wal_column_id_map; - std::shared_ptr _cv; std::unique_ptr _thread_pool; + std::unique_ptr _wal_dirs_info; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_writer.cpp b/be/src/olap/wal_writer.cpp index 9d3da90d887386..beae6ec80b6c23 100644 --- a/be/src/olap/wal_writer.cpp +++ b/be/src/olap/wal_writer.cpp @@ -26,6 +26,7 @@ #include "io/fs/local_file_system.h" #include "io/fs/path.h" #include "olap/storage_engine.h" +#include "olap/wal_manager.h" #include "util/crc32c.h" namespace doris { @@ -33,14 +34,7 @@ namespace doris { const char* k_wal_magic = "WAL1"; const uint32_t k_wal_magic_length = 4; -WalWriter::WalWriter(const std::string& file_name, - const std::shared_ptr& all_wal_disk_bytes, - const std::shared_ptr& cv) - : cv(cv), - _file_name(file_name), - _disk_bytes(0), - _all_wal_disk_bytes(all_wal_disk_bytes), - _is_first_append_blocks(true) {} +WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {} WalWriter::~WalWriter() {} @@ -58,22 +52,6 @@ Status WalWriter::finalize() { } Status WalWriter::append_blocks(const PBlockArray& blocks) { - { - if (_is_first_append_blocks) { - _is_first_append_blocks = false; - std::unique_lock l(_mutex); - while (_all_wal_disk_bytes->load(std::memory_order_relaxed) > - config::wal_max_disk_size) { - LOG(INFO) << "First time to append blocks to wal file " << _file_name - << ". Currently, all wal disk space usage is " - << _all_wal_disk_bytes->load(std::memory_order_relaxed) - << ", larger than the maximum limit " << config::wal_max_disk_size - << ", so we need to wait. When any other load finished, that wal will be " - "removed, the space used by that wal will be free."; - cv->wait_for(l, std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME)); - } - } - } size_t total_size = 0; for (const auto& block : blocks) { total_size += LENGTH_SIZE + block->ByteSizeLong() + CHECKSUM_SIZE; @@ -99,8 +77,6 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) { "failed to write block to wal expected= " + std::to_string(total_size) + ",actually=" + std::to_string(offset)); } - _disk_bytes.fetch_add(total_size, std::memory_order_relaxed); - _all_wal_disk_bytes->fetch_add(total_size, std::memory_order_relaxed); return Status::OK(); } diff --git a/be/src/olap/wal_writer.h b/be/src/olap/wal_writer.h index 88ff4659769b1c..ea8bba4f021e54 100644 --- a/be/src/olap/wal_writer.h +++ b/be/src/olap/wal_writer.h @@ -34,16 +34,13 @@ extern const uint32_t k_wal_magic_length; class WalWriter { public: - explicit WalWriter(const std::string& file_name, - const std::shared_ptr& all_wal_disk_bytes, - const std::shared_ptr& cv); + explicit WalWriter(const std::string& file_name); ~WalWriter(); Status init(); Status finalize(); Status append_blocks(const PBlockArray& blocks); - size_t disk_bytes() const { return _disk_bytes.load(std::memory_order_relaxed); }; Status append_header(uint32_t version, std::string col_ids); std::string file_name() { return _file_name; }; @@ -51,17 +48,12 @@ class WalWriter { public: static const int64_t LENGTH_SIZE = 8; static const int64_t CHECKSUM_SIZE = 4; - std::shared_ptr cv; static const int64_t VERSION_SIZE = 4; private: static constexpr size_t MAX_WAL_WRITE_WAIT_TIME = 1000; std::string _file_name; io::FileWriterPtr _file_writer; - std::atomic_size_t _disk_bytes; - std::shared_ptr _all_wal_disk_bytes; - std::mutex _mutex; - bool _is_first_append_blocks; }; } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 4f4325a79c7bda..4c1060891f5ffb 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -229,6 +229,7 @@ class ExecEnv { void set_routine_load_task_executor(RoutineLoadTaskExecutor* r) { this->_routine_load_task_executor = r; } + void set_wal_mgr(std::shared_ptr wm) { this->_wal_manager = wm; } #endif stream_load::LoadStreamStubPool* load_stream_stub_pool() { diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 201a290a6e8878..28df650f13c7b8 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -21,14 +21,22 @@ #include #include +#include +#include +#include +#include +#include +#include #include "client_cache.h" #include "common/config.h" +#include "common/status.h" #include "olap/wal_manager.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/runtime_state.h" #include "util/thrift_rpc_helper.h" +#include "vec/core/block.h" namespace doris { @@ -46,7 +54,6 @@ Status LoadBlockQueue::add_block(std::shared_ptr block, bool RETURN_IF_ERROR(_v_wal_writer->write_wal(block.get())); } _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); - _single_block_queue_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); } _get_cond.notify_all(); return Status::OK(); @@ -68,7 +75,6 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* } while (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() && (!need_commit || (need_commit && !_load_ids.empty()))) { - CHECK_EQ(_single_block_queue_bytes->load(), 0); auto left_milliseconds = _group_commit_interval_ms; auto duration = std::chrono::duration_cast( std::chrono::steady_clock::now() - _start_time) @@ -106,10 +112,8 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* *find_block = true; _block_queue.pop_front(); _all_block_queues_bytes->fetch_sub(block->bytes(), std::memory_order_relaxed); - _single_block_queue_bytes->fetch_sub(block->bytes(), std::memory_order_relaxed); } if (_block_queue.empty() && need_commit && _load_ids.empty()) { - CHECK_EQ(_single_block_queue_bytes->load(), 0); *eos = true; } else { *eos = false; @@ -150,7 +154,6 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) { { auto& future_block = _block_queue.front(); _all_block_queues_bytes->fetch_sub(future_block->bytes(), std::memory_order_relaxed); - _single_block_queue_bytes->fetch_sub(future_block->bytes(), std::memory_order_relaxed); } _block_queue.pop_front(); } @@ -333,11 +336,12 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ 10000L); result_status = Status::create(result.status); } + std::shared_ptr load_block_queue; { std::lock_guard l(_lock); auto it = _load_block_queues.find(instance_id); if (it != _load_block_queues.end()) { - auto& load_block_queue = it->second; + load_block_queue = it->second; if (!status.ok()) { load_block_queue->cancel(status); } @@ -357,7 +361,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ // result_status: commit txn result if (status.ok() && st.ok() && (result_status.ok() || result_status.is())) { - RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(txn_id)); + RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal( + txn_id, load_block_queue->block_queue_pre_allocated.load())); RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id, txn_id)); } else { std::string wal_path; @@ -443,8 +448,9 @@ Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_i } group_commit_table = _table_map[table_id]; } - return group_commit_table->get_first_block_load_queue(table_id, base_schema_version, load_id, - load_block_queue, be_exe_version); + RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue( + table_id, base_schema_version, load_id, load_block_queue, be_exe_version)); + return Status::OK(); } Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id, @@ -461,11 +467,14 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i } return group_commit_table->get_load_block_queue(instance_id, load_block_queue); } + Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, WalManager* wal_manager, std::vector& slot_desc, int be_exe_version) { + RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->add_wal_path(db_id, tb_id, wal_id, + import_label, wal_base_path)); _v_wal_writer = std::make_shared( - db_id, tb_id, wal_id, import_label, wal_manager, slot_desc, be_exe_version); + tb_id, wal_id, import_label, wal_manager, slot_desc, be_exe_version); return _v_wal_writer->init(); } @@ -475,4 +484,67 @@ Status LoadBlockQueue::close_wal() { } return Status::OK(); } + +bool LoadBlockQueue::has_enough_wal_disk_space( + const std::vector>& blocks, const TUniqueId& load_id, + bool is_blocks_contain_all_load_data) { + size_t blocks_size = 0; + for (auto block : blocks) { + blocks_size += block->bytes(); + } + size_t content_length = 0; + Status st = ExecEnv::GetInstance()->group_commit_mgr()->get_load_info(load_id, &content_length); + if (st.ok()) { + RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->remove_load_info(load_id)); + } else { + return Status::InternalError("can not find load id."); + } + size_t pre_allocated = is_blocks_contain_all_load_data + ? blocks_size + : (blocks_size > content_length ? blocks_size : content_length); + auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr(); + size_t available_bytes = 0; + { + st = wal_mgr->get_wal_dir_available_size(wal_base_path, &available_bytes); + if (!st.ok()) { + LOG(WARNING) << "get wal disk available size filed!"; + } + } + if (pre_allocated < available_bytes) { + st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path, pre_allocated, true); + if (!st.ok()) { + LOG(WARNING) << "update wal dir pre_allocated failed, reason: " << st.to_string(); + } + block_queue_pre_allocated.fetch_add(pre_allocated); + return true; + } else { + return false; + } +} + +Status GroupCommitMgr::update_load_info(TUniqueId load_id, size_t content_length) { + std::unique_lock l(_load_info_lock); + if (_load_id_to_content_length_map.find(load_id) == _load_id_to_content_length_map.end()) { + _load_id_to_content_length_map.insert(std::make_pair(load_id, content_length)); + } + return Status::OK(); +} + +Status GroupCommitMgr::get_load_info(TUniqueId load_id, size_t* content_length) { + std::shared_lock l(_load_info_lock); + if (_load_id_to_content_length_map.find(load_id) != _load_id_to_content_length_map.end()) { + *content_length = _load_id_to_content_length_map[load_id]; + return Status::OK(); + } + return Status::InternalError("can not find load id!"); +} + +Status GroupCommitMgr::remove_load_info(TUniqueId load_id) { + std::unique_lock l(_load_info_lock); + if (_load_id_to_content_length_map.find(load_id) == _load_id_to_content_length_map.end()) { + return Status::InternalError("can not remove load id!"); + } + _load_id_to_content_length_map.erase(load_id); + return Status::OK(); +} } // namespace doris diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 6f625eda6aec04..35afcc46249b7a 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -20,9 +20,17 @@ #include #include +#include +#include #include +#include +#include +#include +#include #include "common/status.h" +#include "olap/wal_manager.h" +#include "runtime/exec_env.h" #include "util/threadpool.h" #include "vec/core/block.h" #include "vec/sink/writer/vwal_writer.h" @@ -45,9 +53,7 @@ class LoadBlockQueue { wait_internal_group_commit_finish(wait_internal_group_commit_finish), _start_time(std::chrono::steady_clock::now()), _all_block_queues_bytes(all_block_queues_bytes), - _group_commit_interval_ms(group_commit_interval_ms) { - _single_block_queue_bytes = std::make_shared(0); - }; + _group_commit_interval_ms(group_commit_interval_ms) {}; Status add_block(std::shared_ptr block, bool write_wal); Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block, @@ -59,6 +65,8 @@ class LoadBlockQueue { WalManager* wal_manager, std::vector& slot_desc, int be_exe_version); Status close_wal(); + bool has_enough_wal_disk_space(const std::vector>& blocks, + const TUniqueId& load_id, bool is_blocks_contain_all_load_data); static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000; UniqueId load_instance_id; @@ -71,6 +79,8 @@ class LoadBlockQueue { bool process_finish = false; std::condition_variable internal_group_commit_finish_cv; Status status = Status::OK(); + std::string wal_base_path; + std::atomic_size_t block_queue_pre_allocated = 0; private: void _cancel_without_lock(const Status& st); @@ -84,8 +94,6 @@ class LoadBlockQueue { // memory consumption of all tables' load block queues, used for back pressure. std::shared_ptr _all_block_queues_bytes; - // memory consumption of one load block queue, used for correctness check. - std::shared_ptr _single_block_queue_bytes; // group commit interval in ms, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");' int64_t _group_commit_interval_ms; std::shared_ptr _v_wal_writer; @@ -145,6 +153,10 @@ class GroupCommitMgr { const UniqueId& load_id, std::shared_ptr& load_block_queue, int be_exe_version); + Status update_load_info(TUniqueId load_id, size_t content_length); + Status get_load_info(TUniqueId load_id, size_t* content_length); + Status remove_load_info(TUniqueId load_id); + std::condition_variable cv; private: ExecEnv* _exec_env = nullptr; @@ -155,6 +167,8 @@ class GroupCommitMgr { std::unique_ptr _thread_pool; // memory consumption of all tables' load block queues, used for back pressure. std::shared_ptr _all_block_queues_bytes; + std::shared_mutex _load_info_lock; + std::unordered_map _load_id_to_content_length_map; }; } // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index d4ede45868fe5d..3e38f9c42fc456 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -17,6 +17,14 @@ #include "vec/sink/group_commit_block_sink.h" +#include + +#include +#include +#include + +#include "common/exception.h" +#include "runtime/exec_env.h" #include "runtime/group_commit_mgr.h" #include "runtime/runtime_state.h" #include "util/doris_metrics.h" @@ -61,6 +69,8 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) { Status GroupCommitBlockSink::prepare(RuntimeState* state) { RETURN_IF_ERROR(DataSink::prepare(state)); + RETURN_IF_ERROR( + ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(_load_id.to_thrift(), 0)); _state = state; // profile must add to state's object pool @@ -107,7 +117,7 @@ Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) { (double)state->num_rows_load_filtered() / num_selected_rows > _max_filter_ratio) { return Status::DataQualityError("too many filtered rows"); } - RETURN_IF_ERROR(_add_blocks()); + RETURN_IF_ERROR(_add_blocks(_group_commit_mode != TGroupCommitMode::SYNC_MODE, true)); } if (_load_block_queue) { _load_block_queue->remove_load_id(_load_id); @@ -213,7 +223,7 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, _blocks.emplace_back(output_block); } else { if (!_is_block_appended) { - RETURN_IF_ERROR(_add_blocks()); + RETURN_IF_ERROR(_add_blocks(_group_commit_mode != TGroupCommitMode::SYNC_MODE, false)); } RETURN_IF_ERROR(_load_block_queue->add_block( output_block, _group_commit_mode != TGroupCommitMode::SYNC_MODE)); @@ -221,7 +231,7 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, return Status::OK(); } -Status GroupCommitBlockSink::_add_blocks() { +Status GroupCommitBlockSink::_add_blocks(bool write_wal, bool is_blocks_contain_all_load_data) { DCHECK(_is_block_appended == false); TUniqueId load_id; load_id.__set_hi(_load_id.hi); @@ -231,6 +241,20 @@ Status GroupCommitBlockSink::_add_blocks() { RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue( _db_id, _table_id, _base_schema_version, load_id, _load_block_queue, _state->be_exec_version())); + if (write_wal) { + _group_commit_mode = _load_block_queue->has_enough_wal_disk_space( + _blocks, load_id, is_blocks_contain_all_load_data) + ? TGroupCommitMode::ASYNC_MODE + : TGroupCommitMode::SYNC_MODE; + if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) { + LOG(INFO) + << "Load label " << _load_block_queue->label + << " will not write wal because wal disk space usage reachs max limit."; + } else { + LOG(INFO) << "Load label " << _load_block_queue->label << " will write wal to " + << _load_block_queue->wal_base_path << "."; + } + } _state->set_import_label(_load_block_queue->label); _state->set_wal_id(_load_block_queue->txn_id); } else { diff --git a/be/src/vec/sink/group_commit_block_sink.h b/be/src/vec/sink/group_commit_block_sink.h index d7c8ed70d6dafc..c0971f4801a5ac 100644 --- a/be/src/vec/sink/group_commit_block_sink.h +++ b/be/src/vec/sink/group_commit_block_sink.h @@ -47,7 +47,7 @@ class GroupCommitBlockSink : public DataSink { private: Status _add_block(RuntimeState* state, std::shared_ptr block); - Status _add_blocks(); + Status _add_blocks(bool write_wal, bool is_blocks_contain_all_load_data); vectorized::VExprContextSPtrs _output_vexpr_ctxs; diff --git a/be/src/vec/sink/writer/vwal_writer.cpp b/be/src/vec/sink/writer/vwal_writer.cpp index d929207e9a9b3d..2dc945a2a2f5b9 100644 --- a/be/src/vec/sink/writer/vwal_writer.cpp +++ b/be/src/vec/sink/writer/vwal_writer.cpp @@ -42,11 +42,10 @@ namespace doris { namespace vectorized { -VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, - const std::string& import_label, WalManager* wal_manager, - std::vector& slot_desc, int be_exe_version) - : _db_id(db_id), - _tb_id(tb_id), +VWalWriter::VWalWriter(int64_t tb_id, int64_t wal_id, const std::string& import_label, + WalManager* wal_manager, std::vector& slot_desc, + int be_exe_version) + : _tb_id(tb_id), _wal_id(wal_id), _label(import_label), _wal_manager(wal_manager), @@ -56,7 +55,6 @@ VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, VWalWriter::~VWalWriter() {} Status VWalWriter::init() { - RETURN_IF_ERROR(_wal_manager->add_wal_path(_db_id, _tb_id, _wal_id, _label)); RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer)); _wal_manager->add_wal_status_queue(_tb_id, _wal_id, WalManager::WAL_STATUS::CREATE); std::stringstream ss; diff --git a/be/src/vec/sink/writer/vwal_writer.h b/be/src/vec/sink/writer/vwal_writer.h index 17c9dc979a1c47..324409e9d469a1 100644 --- a/be/src/vec/sink/writer/vwal_writer.h +++ b/be/src/vec/sink/writer/vwal_writer.h @@ -83,7 +83,7 @@ namespace vectorized { class VWalWriter { public: - VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, + VWalWriter(int64_t tb_id, int64_t wal_id, const std::string& import_label, WalManager* wal_manager, std::vector& slot_desc, int be_exe_version); ~VWalWriter(); @@ -92,7 +92,6 @@ class VWalWriter { Status close(); private: - int64_t _db_id; int64_t _tb_id; int64_t _wal_id; uint32_t _version = 0; diff --git a/be/test/http/stream_load_test.cpp b/be/test/http/stream_load_test.cpp index 75588f4190b1ef..74dcf9f1f3ff67 100644 --- a/be/test/http/stream_load_test.cpp +++ b/be/test/http/stream_load_test.cpp @@ -19,6 +19,8 @@ #include +#include + #include "common/config.h" #include "event2/buffer.h" #include "event2/event.h" @@ -34,6 +36,7 @@ #include "http/http_request.h" #include "http/utils.h" #include "olap/wal_manager.h" +#include "runtime/exec_env.h" namespace doris { @@ -51,10 +54,14 @@ void http_request_done_cb(struct evhttp_request* req, void* arg) { TEST_F(StreamLoadTest, TestHeader) { // 1G - config::wal_max_disk_size = 1073741824; + auto wal_mgr = WalManager::create_shared(ExecEnv::GetInstance(), config::group_commit_wal_path); + static_cast(wal_mgr->_wal_dirs_info->add("test_path_1", 1000, 0, 0)); + static_cast(wal_mgr->_wal_dirs_info->add("test_path_2", 10000, 0, 0)); + static_cast(wal_mgr->_wal_dirs_info->add("test_path_3", 100000, 0, 0)); + ExecEnv::GetInstance()->set_wal_mgr(wal_mgr); // 1. empty info { - auto evhttp_req = evhttp_request_new(nullptr, nullptr); + auto* evhttp_req = evhttp_request_new(nullptr, nullptr); HttpRequest req(evhttp_req); EXPECT_EQ(load_size_smaller_than_wal_limit(&req), false); evhttp_request_free(evhttp_req); @@ -86,7 +93,7 @@ TEST_F(StreamLoadTest, TestHeader) { evhttp_req->uri = uri; evhttp_req->uri_elems = evhttp_uri_parse(uri); evhttp_add_header(evhttp_req->input_headers, HTTP_GROUP_COMMIT.c_str(), "true"); - evhttp_add_header(evhttp_req->input_headers, HttpHeaders::CONTENT_LENGTH, "1000"); + evhttp_add_header(evhttp_req->input_headers, HttpHeaders::CONTENT_LENGTH, "20000"); HttpRequest req(evhttp_req); req.init_from_evhttp(); EXPECT_EQ(load_size_smaller_than_wal_limit(&req), true); diff --git a/be/test/olap/wal_manager_test.cpp b/be/test/olap/wal_manager_test.cpp index 17fb3e3e6f43c7..93d8636eb225ec 100644 --- a/be/test/olap/wal_manager_test.cpp +++ b/be/test/olap/wal_manager_test.cpp @@ -18,6 +18,7 @@ #include +#include #include #include #include @@ -27,6 +28,7 @@ #include "gen_cpp/HeartbeatService_types.h" #include "gen_cpp/internal_service.pb.h" #include "io/fs/local_file_system.h" +#include "olap/options.h" #include "runtime/decimalv2_value.h" #include "runtime/exec_env.h" #include "runtime/result_queue_mgr.h" @@ -46,8 +48,7 @@ extern Status k_stream_load_plan_status; extern std::string k_request_line; ExecEnv* _env = nullptr; -std::string wal_dir = "./wal_test"; -std::string tmp_dir = "./wal_test/tmp"; +std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test"; class WalManagerTest : public testing::Test { public: @@ -63,6 +64,7 @@ class WalManagerTest : public testing::Test { _env->_internal_client_cache = new BrpcClientCache(); _env->_function_client_cache = new BrpcClientCache(); _env->_stream_load_executor = StreamLoadExecutor::create_shared(_env); + _env->_store_paths = {StorePath(std::filesystem::current_path(), 0)}; _env->_wal_manager = WalManager::create_shared(_env, wal_dir); k_stream_load_begin_result = TLoadTxnBeginResult(); k_stream_load_plan_status = Status::OK(); @@ -78,16 +80,14 @@ class WalManagerTest : public testing::Test { void prepare() { static_cast(io::global_local_filesystem()->create_directory(wal_dir)); } void createWal(const std::string& wal_path) { - std::shared_ptr _all_wal_disk_bytes = - std::make_shared(0); - std::shared_ptr cv = std::make_shared(); - auto wal_writer = WalWriter(wal_path, _all_wal_disk_bytes, cv); + auto wal_writer = WalWriter(wal_path); static_cast(wal_writer.init()); static_cast(wal_writer.finalize()); } }; TEST_F(WalManagerTest, recovery_normal) { + _env->wal_mgr()->wal_limit_test_bytes = 1099511627776; k_request_line = "{\"Status\": \"Success\", \"Message\": \"Test\"}"; std::string db_id = "1"; @@ -123,4 +123,93 @@ TEST_F(WalManagerTest, recovery_normal) { ASSERT_TRUE(!std::filesystem::exists(wal_200)); ASSERT_TRUE(!std::filesystem::exists(wal_201)); } + +TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) { + auto wal_mgr = WalManager::create_shared(_env, config::group_commit_wal_path); + static_cast(wal_mgr->init()); + _env->set_wal_mgr(wal_mgr); + + // 1T + size_t available_bytes = 1099511627776; + size_t wal_limit_bytes; + + _env->wal_mgr()->wal_limit_test_bytes = available_bytes; + config::group_commit_wal_max_disk_limit = "0%"; + EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK()); + EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 0); + + _env->wal_mgr()->wal_limit_test_bytes = available_bytes; + config::group_commit_wal_max_disk_limit = "5%"; + EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK()); + wal_limit_bytes = available_bytes * 0.05; + EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, wal_limit_bytes); + + _env->wal_mgr()->wal_limit_test_bytes = available_bytes; + config::group_commit_wal_max_disk_limit = "50%"; + EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK()); + wal_limit_bytes = available_bytes * 0.5; + EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, wal_limit_bytes); + + _env->wal_mgr()->wal_limit_test_bytes = available_bytes; + config::group_commit_wal_max_disk_limit = "200%"; + EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK()); + wal_limit_bytes = available_bytes * 2; + EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, wal_limit_bytes); + + _env->wal_mgr()->wal_limit_test_bytes = available_bytes; + config::group_commit_wal_max_disk_limit = "-10%"; + EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::InternalError("")); + EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, available_bytes); + + _env->wal_mgr()->wal_limit_test_bytes = available_bytes; + config::group_commit_wal_max_disk_limit = "0"; + EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK()); + EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 0); + + // 1M + _env->wal_mgr()->wal_limit_test_bytes = available_bytes; + config::group_commit_wal_max_disk_limit = "1048576"; + EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK()); + EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 1048576); + + // 1G + _env->wal_mgr()->wal_limit_test_bytes = available_bytes; + config::group_commit_wal_max_disk_limit = "1073741824"; + EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK()); + EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 1073741824); + + // 100G + _env->wal_mgr()->wal_limit_test_bytes = available_bytes; + config::group_commit_wal_max_disk_limit = "107374182400"; + EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK()); + EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 107374182400); + + // 1M + _env->wal_mgr()->wal_limit_test_bytes = available_bytes; + config::group_commit_wal_max_disk_limit = "1M"; + EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK()); + EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 1048576); + + // 1G + _env->wal_mgr()->wal_limit_test_bytes = available_bytes; + config::group_commit_wal_max_disk_limit = "1G"; + EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK()); + EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 1073741824); + + // 100G + _env->wal_mgr()->wal_limit_test_bytes = available_bytes; + config::group_commit_wal_max_disk_limit = "100G"; + EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK()); + EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 107374182400); + + _env->wal_mgr()->wal_limit_test_bytes = available_bytes; + config::group_commit_wal_max_disk_limit = "-1024"; + EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::InternalError("")); + EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, available_bytes); + + _env->wal_mgr()->wal_limit_test_bytes = available_bytes; + config::group_commit_wal_max_disk_limit = "-1M"; + EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::InternalError("")); + EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, available_bytes); +} } // namespace doris \ No newline at end of file diff --git a/be/test/olap/wal_reader_writer_test.cpp b/be/test/olap/wal_reader_writer_test.cpp index 1d1102f350fa09..d24db7286802c7 100644 --- a/be/test/olap/wal_reader_writer_test.cpp +++ b/be/test/olap/wal_reader_writer_test.cpp @@ -90,10 +90,7 @@ void generate_block(PBlock& pblock, int row_index) { TEST_F(WalReaderWriterTest, TestWriteAndRead1) { std::string file_name = _s_test_data_path + "/abcd123.txt"; - std::shared_ptr _all_wal_disk_bytes = - std::make_shared(0); - std::shared_ptr cv = std::make_shared(); - auto wal_writer = WalWriter(file_name, _all_wal_disk_bytes, cv); + auto wal_writer = WalWriter(file_name); static_cast(wal_writer.init()); size_t file_len = 0; int64_t file_size = -1; diff --git a/be/test/vec/exec/vtablet_sink_test.cpp b/be/test/vec/exec/vtablet_sink_test.cpp index 81c2bdc4b072e3..f0b8e8c96433ef 100644 --- a/be/test/vec/exec/vtablet_sink_test.cpp +++ b/be/test/vec/exec/vtablet_sink_test.cpp @@ -496,7 +496,7 @@ class VOlapTableSinkTest : public testing::Test { private: ExecEnv* _env = nullptr; brpc::Server* _server = nullptr; - std::string wal_dir = "./wal_test"; + std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test"; }; TEST_F(VOlapTableSinkTest, normal) { diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index a239741a5cb341..ac066aceb98955 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -57,7 +57,7 @@ class VWalScannerTest : public testing::Test { void init_desc_table(); ExecEnv* _env = nullptr; - std::string wal_dir = "./wal_test"; + std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test"; int64_t db_id = 1; int64_t tb_id = 2; int64_t txn_id = 789; @@ -195,6 +195,7 @@ void VWalScannerTest::init_desc_table() { } void VWalScannerTest::init() { + config::group_commit_wal_max_disk_limit = "100M"; init_desc_table(); static_cast(io::global_local_filesystem()->create_directory( wal_dir + "/" + std::to_string(db_id) + "/" + std::to_string(tb_id))); @@ -215,7 +216,9 @@ void VWalScannerTest::init() { _env = ExecEnv::GetInstance(); _env->_wal_manager = WalManager::create_shared(_env, wal_dir); - auto st = _env->_wal_manager->add_wal_path(db_id, tb_id, txn_id, label); + std::string base_path; + auto st = _env->_wal_manager->_init_wal_dirs_info(); + st = _env->_wal_manager->add_wal_path(db_id, tb_id, txn_id, label, base_path); } TEST_F(VWalScannerTest, normal) { diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy index f5cb97fbae8aef..9a081aa2d59792 100644 --- a/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy @@ -70,7 +70,7 @@ suite("test_group_commit_and_wal_back_pressure", "p2") { unset 'label' file 'test_group_commit_and_wal_back_pressure.csv.gz' - time 100000 + time 600000 } }) } @@ -88,7 +88,7 @@ suite("test_group_commit_and_wal_back_pressure", "p2") { unset 'label' file 'test_group_commit_and_wal_back_pressure.csv.gz' - time 100000 + time 600000 } }) } @@ -106,7 +106,7 @@ suite("test_group_commit_and_wal_back_pressure", "p2") { unset 'label' file 'test_group_commit_and_wal_back_pressure.csv.gz' - time 100000 + time 600000 } }) } diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy index 24b66de04b86c6..7361faf72df16a 100644 --- a/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy @@ -41,7 +41,7 @@ suite("test_group_commit_wal_limit") { // normal case StringBuilder strBuilder = new StringBuilder() strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) - strBuilder.append(" -H \"group_commit:true\" -H \"column_separator:,\" " ) + strBuilder.append(" -H \"group_commit:async_mode\" -H \"column_separator:,\" " ) strBuilder.append(" -H \"compress_type:gz\" -H \"format:csv\" " ) strBuilder.append(" -T " + context.config.dataPath + "/load_p0/stream_load/test_group_commit_wal_limit.csv.gz") strBuilder.append(" http://" + context.config.feHttpAddress + "/api/${db}/${tableName}/_stream_load") @@ -55,49 +55,13 @@ suite("test_group_commit_wal_limit") { logger.info("out is " + out ) assertTrue(out.contains('group_commit')) - // chunked data case - strBuilder = new StringBuilder() - strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) - strBuilder.append(" -H \"group_commit:true\" -H \"column_separator:,\" " ) - strBuilder.append(" -H \"compress_type:gz\" -H \"format:csv\" " ) - strBuilder.append(" -H \"Content-Length:0\" " ) - strBuilder.append(" -T " + context.config.dataPath + "/load_p0/stream_load/test_group_commit_wal_limit.csv.gz") - strBuilder.append(" http://" + context.config.feHttpAddress + "/api/${db}/${tableName}/_stream_load") - - command = strBuilder.toString() - logger.info("command is " + command) - process = ['bash','-c',command].execute() - code = process.waitFor() - assertEquals(code, 0) - out = process.text - logger.info("out is " + out ) - assertTrue(out.contains('Stream load size too large')) - - // too lagre data case 1TB - strBuilder = new StringBuilder() - strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) - strBuilder.append(" -H \"group_commit:true\" -H \"column_separator:,\" " ) - strBuilder.append(" -H \"compress_type:gz\" -H \"format:csv\" " ) - strBuilder.append(" -H \"Content-Length:1099511627776\" " ) - strBuilder.append(" -T " + context.config.dataPath + "/load_p0/stream_load/test_group_commit_wal_limit.csv.gz") - strBuilder.append(" http://" + context.config.feHttpAddress + "/api/${db}/${tableName}/_stream_load") - - command = strBuilder.toString() - logger.info("command is " + command) - process = ['bash','-c',command].execute() - code = process.waitFor() - assertEquals(code, 0) - out = process.text - logger.info("out is " + out ) - assertTrue(out.contains('Stream load size too large')) - // httpload // normal case strBuilder = new StringBuilder() strBuilder.append("curl -v --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) String sql = " -H \"sql:insert into " + db + "." + tableName + " (k,v) select c1, c2 from http_stream(\\\"format\\\" = \\\"csv\\\", \\\"column_separator\\\" = \\\",\\\", \\\"compress_type\\\" = \\\"gz\\\" ) \" " strBuilder.append(sql) - strBuilder.append(" -H \"group_commit:true\"") + strBuilder.append(" -H \"group_commit:async_mode\"") strBuilder.append(" -T " + context.config.dataPath + "/load_p0/stream_load/test_group_commit_wal_limit.csv.gz") strBuilder.append(" http://" + context.config.feHttpAddress + "/api/_http_stream") @@ -109,40 +73,4 @@ suite("test_group_commit_wal_limit") { out = process.text logger.info("out is " + out ) assertTrue(out.contains('group_commit')) - - // chunked data case - strBuilder = new StringBuilder() - strBuilder.append("curl -v --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) - sql = " -H \"sql:insert into " + db + "." + tableName + " (k,v) select c1, c2 from http_stream(\\\"format\\\" = \\\"csv\\\", \\\"column_separator\\\" = \\\",\\\", \\\"compress_type\\\" = \\\"gz\\\" ) \" " - strBuilder.append(sql) - strBuilder.append(" -H \"group_commit:true\" -H \"Content-Length:0\"") - strBuilder.append(" -T " + context.config.dataPath + "/load_p0/stream_load/test_group_commit_wal_limit.csv.gz") - strBuilder.append(" http://" + context.config.feHttpAddress + "/api/_http_stream") - - command = strBuilder.toString() - logger.info("command is " + command) - process = ['bash','-c',command].execute() - code = process.waitFor() - assertEquals(code, 0) - out = process.text - logger.info("out is " + out ) - assertTrue(out.contains('Http load size too large')) - - // too lagre data case 1TB - strBuilder = new StringBuilder() - strBuilder.append("curl -v --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) - sql = " -H \"sql:insert into " + db + "." + tableName + " (k,v) select c1, c2 from http_stream(\\\"format\\\" = \\\"csv\\\", \\\"column_separator\\\" = \\\",\\\", \\\"compress_type\\\" = \\\"gz\\\" ) \" " - strBuilder.append(sql) - strBuilder.append(" -H \"group_commit:true\" -H \"Content-Length:1099511627776\"") - strBuilder.append(" -T " + context.config.dataPath + "/load_p0/stream_load/test_group_commit_wal_limit.csv.gz") - strBuilder.append(" http://" + context.config.feHttpAddress + "/api/_http_stream") - - command = strBuilder.toString() - logger.info("command is " + command) - process = ['bash','-c',command].execute() - code = process.waitFor() - assertEquals(code, 0) - out = process.text - logger.info("out is " + out ) - assertTrue(out.contains('Http load size too large')) }