Skip to content

Commit

Permalink
[improvement](spill) fuzzy spill and improve config
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Apr 9, 2024
1 parent 29b7860 commit d3ad912
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 70 deletions.
5 changes: 2 additions & 3 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1160,13 +1160,12 @@ DEFINE_mDouble(high_disk_avail_level_diff_usages, "0.15");
DEFINE_Int32(partition_disk_index_lru_size, "10000");
// limit the storage space that query spill files can use
DEFINE_String(spill_storage_root_path, "${DORIS_HOME}/storage");
DEFINE_mInt64(spill_storage_limit, "10737418240"); // 10G
DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
DEFINE_mInt32(spill_storage_usage_percent, "20"); // 20%
DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
DEFINE_Int32(spill_io_thread_pool_per_disk_thread_num, "2");
DEFINE_Int32(spill_io_thread_pool_queue_size, "1024");
DEFINE_Int32(spill_async_task_thread_pool_thread_num, "2");
DEFINE_Int32(spill_async_task_thread_pool_queue_size, "1024");
DEFINE_mInt32(spill_mem_warning_water_mark_multiplier, "2");

DEFINE_mBool(check_segment_when_build_rowset_meta, "false");

Expand Down
3 changes: 1 addition & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1242,13 +1242,12 @@ DECLARE_mDouble(high_disk_avail_level_diff_usages);
// create tablet in partition random robin idx lru size, default 10000
DECLARE_Int32(partition_disk_index_lru_size);
DECLARE_String(spill_storage_root_path);
DECLARE_mInt64(spill_storage_limit);
DECLARE_mInt32(spill_storage_usage_percent);
DECLARE_mInt32(spill_gc_interval_ms);
DECLARE_Int32(spill_io_thread_pool_per_disk_thread_num);
DECLARE_Int32(spill_io_thread_pool_queue_size);
DECLARE_Int32(spill_async_task_thread_pool_thread_num);
DECLARE_Int32(spill_async_task_thread_pool_queue_size);
DECLARE_mInt32(spill_mem_warning_water_mark_multiplier);

DECLARE_mBool(check_segment_when_build_rowset_meta);

Expand Down
15 changes: 14 additions & 1 deletion be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,19 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
if (ready()) {
return Status::OK();
}
std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>> spill_store_map;
for (const auto& spill_path : spill_store_paths) {
bool shared_with_storage_path = false;
for (const auto& storage_path : store_paths) {
if (spill_path.path == storage_path.path) {
shared_with_storage_path = true;
}
}
spill_store_map.emplace(spill_path.path,
std::make_unique<vectorized::SpillDataDir>(
spill_path.path, shared_with_storage_path,
spill_path.capacity_bytes, spill_path.storage_medium));
}
init_doris_metrics(store_paths);
_store_paths = store_paths;
_tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(_store_paths);
Expand Down Expand Up @@ -246,7 +259,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_wal_manager = WalManager::create_shared(this, config::group_commit_wal_path);
_dns_cache = new DNSCache();
_write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>();
_spill_stream_mgr = new vectorized::SpillStreamManager(spill_store_paths);
_spill_stream_mgr = new vectorized::SpillStreamManager(std::move(spill_store_map));
_backend_client_cache->init_metrics("backend");
_frontend_client_cache->init_metrics("frontend");
_broker_client_cache->init_metrics("broker");
Expand Down
113 changes: 83 additions & 30 deletions be/src/vec/spill/spill_stream_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,22 @@
#include <random>
#include <string>

#include "common/logging.h"
#include "io/fs/file_system.h"
#include "io/fs/local_file_system.h"
#include "olap/olap_define.h"
#include "runtime/runtime_state.h"
#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
#include "util/time.h"
#include "vec/spill/spill_stream.h"

namespace doris::vectorized {

SpillStreamManager::SpillStreamManager(const std::vector<StorePath>& paths)
: _spill_store_paths(paths), _stop_background_threads_latch(1) {}
SpillStreamManager::SpillStreamManager(
std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>>&&
spill_store_map)
: _spill_store_map(std::move(spill_store_map)), _stop_background_threads_latch(1) {}

Status SpillStreamManager::init() {
LOG(INFO) << "init spill stream manager";
Expand All @@ -49,21 +53,21 @@ Status SpillStreamManager::init() {
spill_io_thread_count = 2;
}
int pool_idx = 0;
for (const auto& path : _spill_store_paths) {
auto gc_dir_root_dir = fmt::format("{}/{}", path.path, SPILL_GC_DIR_PREFIX);
for (const auto& [path, store] : _spill_store_map) {
auto gc_dir_root_dir = fmt::format("{}/{}", path, SPILL_GC_DIR_PREFIX);
bool exists = true;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(gc_dir_root_dir, &exists));
if (!exists) {
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(gc_dir_root_dir));
}

auto spill_dir = fmt::format("{}/{}", path.path, SPILL_DIR_PREFIX);
auto spill_dir = fmt::format("{}/{}", path, SPILL_DIR_PREFIX);
RETURN_IF_ERROR(io::global_local_filesystem()->exists(spill_dir, &exists));
if (!exists) {
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(spill_dir));
} else {
auto suffix = ToStringFromUnixMillis(UnixMillis());
auto gc_dir = fmt::format("{}/{}/{}", path.path, SPILL_GC_DIR_PREFIX, suffix);
auto gc_dir = fmt::format("{}/{}/{}", path, SPILL_GC_DIR_PREFIX, suffix);
if (std::filesystem::exists(gc_dir)) {
LOG(WARNING) << "gc dir already exists: " << gc_dir;
}
Expand All @@ -78,15 +82,15 @@ Status SpillStreamManager::init() {
spill_data_size += dir_entry.file_size();
}
}
path_to_spill_data_size_[path.path] = spill_data_size;
store->update_usage(spill_data_size);

std::unique_ptr<ThreadPool> io_pool;
static_cast<void>(ThreadPoolBuilder(fmt::format("SpillIOThreadPool-{}", pool_idx++))
.set_min_threads(spill_io_thread_count)
.set_max_threads(spill_io_thread_count)
.set_max_queue_size(config::spill_io_thread_pool_queue_size)
.build(&io_pool));
path_to_io_thread_pool_[path.path] = std::move(io_pool);
path_to_io_thread_pool_[path] = std::move(io_pool);
}
static_cast<void>(ThreadPoolBuilder("SpillAsyncTaskThreadPool")
.set_min_threads(config::spill_async_task_thread_pool_thread_num)
Expand All @@ -106,20 +110,20 @@ void SpillStreamManager::_spill_gc_thread_callback() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::spill_gc_interval_ms))) {
gc(2000);
for (auto& [path, dir] : _spill_store_map) {
static_cast<void>(dir->update_capacity());
}
}
}

Status SpillStreamManager::_init_spill_store_map() {
for (const auto& path : _spill_store_paths) {
auto store =
std::make_unique<SpillDataDir>(path.path, path.capacity_bytes, path.storage_medium);
auto st = store->init();
for (const auto& store : _spill_store_map) {
auto st = store.second->init();
if (!st.ok()) {
LOG(WARNING) << "Store load failed, status=" << st.to_string()
<< ", path=" << store->path();
<< ", path=" << store.second->path();
return st;
}
_spill_store_map.emplace(store->path(), std::move(store));
}

return Status::OK();
Expand Down Expand Up @@ -210,8 +214,8 @@ void SpillStreamManager::gc(int64_t max_file_count) {

bool exists = true;
int64_t count = 0;
for (const auto& path : _spill_store_paths) {
std::string gc_root_dir = fmt::format("{}/{}", path.path, SPILL_GC_DIR_PREFIX);
for (const auto& [path, store_dir] : _spill_store_map) {
std::string gc_root_dir = fmt::format("{}/{}", path, SPILL_GC_DIR_PREFIX);

std::error_code ec;
exists = std::filesystem::exists(gc_root_dir, ec);
Expand Down Expand Up @@ -243,7 +247,7 @@ void SpillStreamManager::gc(int64_t max_file_count) {
}

int64_t data_size = 0;
Defer defer {[&]() { update_usage(path.path, -data_size); }};
Defer defer {[&]() { store_dir->update_usage(-data_size); }};

for (const auto& file : files) {
auto abs_file_path = fmt::format("{}/{}", abs_dir, file.file_name);
Expand All @@ -257,11 +261,11 @@ void SpillStreamManager::gc(int64_t max_file_count) {
}
}

SpillDataDir::SpillDataDir(const std::string& path, int64_t capacity_bytes,
SpillDataDir::SpillDataDir(std::string path, bool shared_with_storage_path, int64_t capacity_bytes,
TStorageMedium::type storage_medium)
: _path(path),
_available_bytes(0),
_disk_capacity_bytes(0),
: _path(std::move(path)),
_shared_with_storage_path(shared_with_storage_path),
_disk_capacity_bytes(capacity_bytes),
_storage_medium(storage_medium) {}

Status SpillDataDir::init() {
Expand All @@ -271,18 +275,67 @@ Status SpillDataDir::init() {
RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, path={}", _path),
"check file exist failed");
}

return update_capacity();
}
Status SpillDataDir::update_capacity() {
std::lock_guard<std::mutex> l(_mutex);
RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, &_disk_capacity_bytes,
&_available_bytes));
if (_shared_with_storage_path) {
_limit_bytes = (size_t)(_disk_capacity_bytes *
(config::storage_flood_stage_usage_percent / 100.0) *
(config::spill_storage_usage_percent / 100.0));
} else {
_limit_bytes =
(size_t)(_disk_capacity_bytes * (config::spill_storage_usage_percent / 100.0));
}
return Status::OK();
}
bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
double used_pct = get_usage(incoming_data_size);
int64_t left_bytes = _available_bytes - incoming_data_size;
if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
LOG(WARNING) << "reach capacity limit. used pct: " << used_pct
<< ", left bytes: " << left_bytes << ", path: " << _path;
return true;
std::lock_guard<std::mutex> l(_mutex);
if (_shared_with_storage_path) {
VLOG_DEBUG << fmt::format(
"spill data path: {}, limit: {}, used: {}, available: {}, "
"incoming "
"bytes: {}",
_path, PrettyPrinter::print_bytes(_limit_bytes),
PrettyPrinter::print_bytes(_used_bytes),
PrettyPrinter::print_bytes(_available_bytes),
PrettyPrinter::print_bytes(incoming_data_size));
int64_t left_bytes = _available_bytes - incoming_data_size;
if (_used_bytes + incoming_data_size > _limit_bytes ||
left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
LOG(WARNING) << fmt::format(
"spill data reach limit, path: {}, limit: {}, used: {}, available: {}, "
"incoming "
"bytes: {}",
_path, PrettyPrinter::print_bytes(_limit_bytes),
PrettyPrinter::print_bytes(_used_bytes),
PrettyPrinter::print_bytes(_available_bytes),
PrettyPrinter::print_bytes(incoming_data_size));
return true;
}
return false;
} else {
double used_pct = _disk_capacity_bytes == 0
? 0
: (_disk_capacity_bytes - _available_bytes + incoming_data_size) /
(double)_disk_capacity_bytes;
VLOG_DEBUG << fmt::format(
"spill path: {}, capacity: {}, available: {}, used pct: {}, incoming bytes: {}",
_path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
PrettyPrinter::print_bytes(_available_bytes), used_pct,
PrettyPrinter::print_bytes(incoming_data_size));
if (used_pct >= config::spill_storage_usage_percent / 100.0) {
LOG(WARNING) << fmt::format(
"spill data reach limit, path: {}, capacity: {}, available: {}, incoming "
"bytes: {}",
_path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
PrettyPrinter::print_bytes(_available_bytes),
PrettyPrinter::print_bytes(incoming_data_size));
return true;
}
return false;
}
return false;
}
} // namespace doris::vectorized
67 changes: 46 additions & 21 deletions be/src/vec/spill/spill_stream_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ class RuntimeProfile;

namespace vectorized {

class SpillStreamManager;
class SpillDataDir {
public:
SpillDataDir(const std::string& path, int64_t capacity_bytes = -1,
SpillDataDir(std::string path, bool shared_with_storage_path, int64_t capacity_bytes,
TStorageMedium::type storage_medium = TStorageMedium::HDD);

Status init();
Expand All @@ -54,25 +55,61 @@ class SpillDataDir {

Status update_capacity();

double get_usage(int64_t incoming_data_size) const {
return _disk_capacity_bytes == 0
? 0
: (_disk_capacity_bytes - _available_bytes + incoming_data_size) /
(double)_disk_capacity_bytes;
void update_usage(int64_t incoming_data_size) {
if (_shared_with_storage_path) {
std::lock_guard<std::mutex> l(_mutex);
_used_bytes += incoming_data_size;
}
}

int64_t get_used_bytes() {
std::lock_guard<std::mutex> l(_mutex);
if (_shared_with_storage_path) {
return _used_bytes;
} else {
return _disk_capacity_bytes - _available_bytes;
}
}

double get_usage(int64_t incoming_data_size) {
std::lock_guard<std::mutex> l(_mutex);
if (_shared_with_storage_path) {
return _limit_bytes == 0 ? 0 : _used_bytes + incoming_data_size / (double)_limit_bytes;

} else {
return _disk_capacity_bytes == 0
? 0
: (_disk_capacity_bytes - _available_bytes + incoming_data_size) /
(double)_disk_capacity_bytes;
}
}

int64_t storage_limit() {
std::lock_guard<std::mutex> l(_mutex);
return _limit_bytes;
}

private:
friend class SpillStreamManager;
std::string _path;

// the actual available capacity of the disk of this data dir
size_t _available_bytes;
bool _shared_with_storage_path;

// protect _disk_capacity_bytes, _available_bytes, _limit_bytes, _used_bytes
std::mutex _mutex;
// the actual capacity of the disk of this data dir
size_t _disk_capacity_bytes;
// used when _shared_with_storage_path = true
size_t _limit_bytes = 0;
// the actual available capacity of the disk of this data dir
size_t _available_bytes = 0;
int64_t _used_bytes = 0;
TStorageMedium::type _storage_medium;
};
class SpillStreamManager {
public:
SpillStreamManager(const std::vector<StorePath>& paths);
SpillStreamManager(std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>>&&
spill_store_map);

Status init();

Expand All @@ -93,16 +130,6 @@ class SpillStreamManager {

void gc(int64_t max_file_count);

void update_usage(const std::string& path, int64_t incoming_data_size) {
path_to_spill_data_size_[path] += incoming_data_size;
}

static bool reach_capacity_limit(size_t size, size_t incoming_data_size) {
return size + incoming_data_size > config::spill_storage_limit;
}

int64_t spilled_data_size(const std::string& path) { return path_to_spill_data_size_[path]; }

ThreadPool* get_spill_io_thread_pool(const std::string& path) const {
const auto it = path_to_io_thread_pool_.find(path);
DCHECK(it != path_to_io_thread_pool_.end());
Expand All @@ -115,13 +142,11 @@ class SpillStreamManager {
void _spill_gc_thread_callback();
std::vector<SpillDataDir*> _get_stores_for_spill(TStorageMedium::type storage_medium);

std::vector<StorePath> _spill_store_paths;
std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> _spill_store_map;

CountDownLatch _stop_background_threads_latch;
std::unique_ptr<ThreadPool> async_task_thread_pool_;
std::unordered_map<std::string, std::unique_ptr<ThreadPool>> path_to_io_thread_pool_;
std::unordered_map<std::string, std::atomic_int64_t> path_to_spill_data_size_;
scoped_refptr<Thread> _spill_gc_thread;

std::atomic_uint64_t id_ = 0;
Expand Down
Loading

0 comments on commit d3ad912

Please sign in to comment.