Skip to content

Commit

Permalink
[chore](file-cache) Enable file cache for cloud mode by force (apache…
Browse files Browse the repository at this point in the history
…#41357)

## Proposed changes

Temp local rowset writer for external sorting replies on file cache.
  • Loading branch information
TangSiyang2001 authored Sep 29, 2024
1 parent 421fde0 commit cf5a159
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 40 deletions.
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "cloud/cloud_rowset_writer.h"

#include "common/status.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/fs/file_system.h"
#include "olap/rowset/rowset_factory.h"
Expand All @@ -34,7 +35,7 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context)
if (_context.is_local_rowset()) {
// In cloud mode, this branch implies it is an intermediate rowset for external merge sort,
// we use `global_local_filesystem` to write data to `tmp_file_dir`(see `local_segment_path`).
_context.tablet_path = io::FileCacheFactory::instance()->get_cache_path();
_context.tablet_path = io::FileCacheFactory::instance()->pick_one_cache_path();
} else {
_rowset_meta->set_remote_storage_resource(*_context.storage_resource);
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/io/cache/block_file_cache_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#pragma once

#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <vector>

#include "common/status.h"
Expand All @@ -46,7 +48,8 @@ class FileCacheFactory {

size_t try_release(const std::string& base_path);

const std::string& get_cache_path() {
std::string_view pick_one_cache_path() {
DCHECK(!_caches.empty());
size_t cur_index = _next_index.fetch_add(1);
return _caches[cur_index % _caches.size()]->get_base_path();
}
Expand Down
82 changes: 44 additions & 38 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
init_file_cache_factory(cache_paths);
doris::io::BeConfDataDirReader::init_be_conf_data_dir(store_paths, spill_store_paths,
cache_paths);

_pipeline_tracer_ctx = std::make_unique<pipeline::PipelineTracerContext>(); // before query
RETURN_IF_ERROR(init_pipeline_task_scheduler());
_workload_group_manager = new WorkloadGroupMgr();
Expand Down Expand Up @@ -392,50 +391,57 @@ Status ExecEnv::init_pipeline_task_scheduler() {

void ExecEnv::init_file_cache_factory(std::vector<doris::CachePath>& cache_paths) {
// Load file cache before starting up daemon threads to make sure StorageEngine is read.
if (doris::config::enable_file_cache) {
if (config::file_cache_each_block_size > config::s3_write_buffer_size ||
config::s3_write_buffer_size % config::file_cache_each_block_size != 0) {
LOG_FATAL(
"The config file_cache_each_block_size {} must less than or equal to config "
"s3_write_buffer_size {} and config::s3_write_buffer_size % "
"config::file_cache_each_block_size must be zero",
config::file_cache_each_block_size, config::s3_write_buffer_size);
exit(-1);
}
std::unordered_set<std::string> cache_path_set;
Status rest = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths);
if (!rest) {
LOG(FATAL) << "parse config file cache path failed, path="
<< doris::config::file_cache_path;
if (!config::enable_file_cache) {
if (config::is_cloud_mode()) {
LOG(FATAL) << "Cloud mode requires to enable file cache, plz set "
"config::enable_file_cache "
"= true";
exit(-1);
}
std::vector<std::thread> file_cache_init_threads;
return;
}
if (config::file_cache_each_block_size > config::s3_write_buffer_size ||
config::s3_write_buffer_size % config::file_cache_each_block_size != 0) {
LOG_FATAL(
"The config file_cache_each_block_size {} must less than or equal to config "
"s3_write_buffer_size {} and config::s3_write_buffer_size % "
"config::file_cache_each_block_size must be zero",
config::file_cache_each_block_size, config::s3_write_buffer_size);
exit(-1);
}
std::unordered_set<std::string> cache_path_set;
Status rest = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths);
if (!rest) {
LOG(FATAL) << "parse config file cache path failed, path="
<< doris::config::file_cache_path;
exit(-1);
}
std::vector<std::thread> file_cache_init_threads;

std::list<doris::Status> cache_status;
for (auto& cache_path : cache_paths) {
if (cache_path_set.find(cache_path.path) != cache_path_set.end()) {
LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path);
continue;
}
std::list<doris::Status> cache_status;
for (auto& cache_path : cache_paths) {
if (cache_path_set.find(cache_path.path) != cache_path_set.end()) {
LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path);
continue;
}

file_cache_init_threads.emplace_back([&, status = &cache_status.emplace_back()]() {
*status = doris::io::FileCacheFactory::instance()->create_file_cache(
cache_path.path, cache_path.init_settings());
});
file_cache_init_threads.emplace_back([&, status = &cache_status.emplace_back()]() {
*status = doris::io::FileCacheFactory::instance()->create_file_cache(
cache_path.path, cache_path.init_settings());
});

cache_path_set.emplace(cache_path.path);
}
cache_path_set.emplace(cache_path.path);
}

for (std::thread& thread : file_cache_init_threads) {
if (thread.joinable()) {
thread.join();
}
for (std::thread& thread : file_cache_init_threads) {
if (thread.joinable()) {
thread.join();
}
for (const auto& status : cache_status) {
if (!status.ok()) {
LOG(FATAL) << "failed to init file cache, err: " << status;
exit(-1);
}
}
for (const auto& status : cache_status) {
if (!status.ok()) {
LOG(FATAL) << "failed to init file cache, err: " << status;
exit(-1);
}
}
}
Expand Down

0 comments on commit cf5a159

Please sign in to comment.