Skip to content

Commit

Permalink
[optimize](invert index) "optimize inverted index metadata into file …
Browse files Browse the repository at this point in the history
…cache
  • Loading branch information
zzzxl1993 committed Dec 14, 2023
1 parent 9d5f314 commit 86f722d
Show file tree
Hide file tree
Showing 13 changed files with 114 additions and 31 deletions.
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,7 @@ DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
DEFINE_String(inverted_index_searcher_cache_limit, "10%");
// set `true` to enable insert searcher into cache when write inverted index data
DEFINE_Bool(enable_write_index_searcher_cache, "true");
DEFINE_Bool(enable_cooldown_preload_index_reader, "false");
DEFINE_Bool(enable_inverted_index_cache_check_timestamp, "true");
DEFINE_Int32(inverted_index_fd_number_limit_percent, "40"); // 40%

Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,8 @@ DECLARE_mInt32(inverted_index_cache_stale_sweep_time_sec);
DECLARE_String(inverted_index_searcher_cache_limit);
// set `true` to enable insert searcher into cache when write inverted index data
DECLARE_Bool(enable_write_index_searcher_cache);
// Pre-load the inverted index reader into the file cache during cooldown.
DECLARE_Bool(enable_cooldown_preload_index_reader);
DECLARE_Bool(enable_inverted_index_cache_check_timestamp);
DECLARE_Int32(inverted_index_fd_number_limit_percent); // 50%

Expand Down
38 changes: 37 additions & 1 deletion be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ Status BetaRowset::copy_files_to(const std::string& dir, const RowsetId& new_row
return Status::OK();
}

Status BetaRowset::upload_to(io::RemoteFileSystem* dest_fs, const RowsetId& new_rowset_id) {
Status BetaRowset::upload_to(const io::RemoteFileSystemSPtr& dest_fs,
const RowsetId& new_rowset_id) {
DCHECK(is_local());
if (num_segments() < 1) {
return Status::OK();
Expand All @@ -355,6 +356,8 @@ Status BetaRowset::upload_to(io::RemoteFileSystem* dest_fs, const RowsetId& new_
local_paths.reserve(num_segments());
std::vector<io::Path> dest_paths;
dest_paths.reserve(num_segments());
std::vector<io::Path> idx_remote_paths;
idx_remote_paths.reserve(num_segments());
for (int i = 0; i < num_segments(); ++i) {
// Note: Here we use relative path for remote.
auto remote_seg_path = remote_segment_path(_rowset_meta->tablet_id(), new_rowset_id, i);
Expand All @@ -373,11 +376,44 @@ Status BetaRowset::upload_to(io::RemoteFileSystem* dest_fs, const RowsetId& new_
index_meta->index_id());
dest_paths.push_back(remote_inverted_index_file);
local_paths.push_back(local_inverted_index_file);
idx_remote_paths.push_back(remote_inverted_index_file);
}
}
}
auto st = dest_fs->batch_upload(local_paths, dest_paths);
if (st.ok()) {
// Pre-write the metadata of the inverted index into the file cache
if (config::enable_cooldown_preload_index_reader) {
if (dest_fs->type() == io::FileSystemType::S3 && config::enable_file_cache) {
auto start = std::chrono::steady_clock::now();
for (auto& path : idx_remote_paths) {
std::shared_ptr<io::FileReader> file_reader = nullptr;
if (!dest_fs->open_file(path, &file_reader).ok()) {
continue;
}

auto& url = file_reader->path().native();
size_t pos = url.rfind('/');
if (pos != std::string::npos) {
auto idx_path = url.substr(0, pos);
auto idx_name = url.substr(pos + 1);

try {
InvertedIndexSearcherCache::build_index_searcher(dest_fs, idx_path,
idx_name);
} catch (CLuceneError& err) {
LOG(WARNING) << "opening index reader clucene err: " << err.what();
} catch (...) {
LOG(WARNING) << "opening index reader other err";
}
}
}
auto duration =
std::chrono::duration<float>(std::chrono::steady_clock::now() - start);
LOG(INFO) << "cooldown upload open invert index duration: " << duration.count();
}
}

DorisMetrics::instance()->upload_rowset_count->increment(1);
DorisMetrics::instance()->upload_total_byte->increment(data_disk_size());
} else {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/beta_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class BetaRowset final : public Rowset {

Status copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) override;

Status upload_to(io::RemoteFileSystem* dest_fs, const RowsetId& new_rowset_id) override;
Status upload_to(const io::RemoteFileSystemSPtr& dest_fs,
const RowsetId& new_rowset_id) override;

// only applicable to alpha rowset, no op here
Status remove_old_files(std::vector<std::string>* files_to_remove) override {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
// copy all files to `dir`
virtual Status copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) = 0;

virtual Status upload_to(io::RemoteFileSystem* dest_fs, const RowsetId& new_rowset_id) {
virtual Status upload_to(const io::RemoteFileSystemSPtr& dest_fs,
const RowsetId& new_rowset_id) {
return Status::OK();
}

Expand Down
18 changes: 13 additions & 5 deletions be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,20 @@ InvertedIndexSearcherCache* InvertedIndexSearcherCache::_s_instance = nullptr;
IndexSearcherPtr InvertedIndexSearcherCache::build_index_searcher(const io::FileSystemSPtr& fs,
const std::string& index_dir,
const std::string& file_name) {
DorisCompoundReader* directory =
new DorisCompoundReader(DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()),
file_name.c_str(), config::inverted_index_read_buffer_size);
bool open_idx_file_cache = true;
DorisCompoundReader* directory = new DorisCompoundReader(
DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()), file_name.c_str(),
config::inverted_index_read_buffer_size, open_idx_file_cache);

auto closeDirectory = true;
auto index_searcher =
std::make_shared<lucene::search::IndexSearcher>(directory, closeDirectory);
auto reader = lucene::index::IndexReader::open(
directory, config::inverted_index_read_buffer_size, closeDirectory);

bool close_reader = true;
auto index_searcher = std::make_shared<lucene::search::IndexSearcher>(reader, close_reader);

directory->getDorisIndexInput()->setIdxFileCache(false);

// NOTE: need to cl_refcount-- here, so that directory will be deleted when
// index_searcher is destroyed
_CLDECDELETE(directory)
Expand Down
50 changes: 33 additions & 17 deletions be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,22 @@ CL_NS(store)::Directory* DorisCompoundFileWriter::getDirectory() {
return directory;
}

void DorisCompoundFileWriter::sort_files(std::vector<FileInfo>& file_infos) {
auto file_priority = [](const std::string& filename) {
if (filename.find("segments") != std::string::npos) return 1;
if (filename.find("fnm") != std::string::npos) return 2;
if (filename.find("tii") != std::string::npos) return 3;
return 4; // Other files
};

std::sort(file_infos.begin(), file_infos.end(), [&](const FileInfo& a, const FileInfo& b) {
int32_t priority_a = file_priority(a.filename);
int32_t priority_b = file_priority(b.filename);
if (priority_a != priority_b) return priority_a < priority_b;
return a.filesize < b.filesize;
});
}

void DorisCompoundFileWriter::writeCompoundFile() {
// list files in current dir
std::vector<std::string> files;
Expand All @@ -106,15 +122,15 @@ void DorisCompoundFileWriter::writeCompoundFile() {
if (it != files.end()) {
files.erase(it);
}
// sort file list by file length
std::vector<std::pair<std::string, int64_t>> sorted_files;

std::vector<FileInfo> sorted_files;
for (auto file : files) {
sorted_files.push_back(std::make_pair(
file, ((DorisCompoundDirectory*)directory)->fileLength(file.c_str())));
FileInfo file_info;
file_info.filename = file;
file_info.filesize = ((DorisCompoundDirectory*)directory)->fileLength(file.c_str());
sorted_files.emplace_back(std::move(file_info));
}
std::sort(sorted_files.begin(), sorted_files.end(),
[](const std::pair<std::string, int64_t>& a,
const std::pair<std::string, int64_t>& b) { return (a.second < b.second); });
sort_files(sorted_files);

int32_t file_count = sorted_files.size();

Expand All @@ -138,12 +154,12 @@ void DorisCompoundFileWriter::writeCompoundFile() {
const int64_t buffer_length = 16384;
uint8_t ram_buffer[buffer_length];
for (auto file : sorted_files) {
ram_output->writeString(file.first); // file name
ram_output->writeLong(0); // data offset
ram_output->writeLong(file.second); // file length
header_file_length += file.second;
ram_output->writeString(file.filename); // file name
ram_output->writeLong(0); // data offset
ram_output->writeLong(file.filesize); // file length
header_file_length += file.filesize;
if (header_file_length <= MAX_HEADER_DATA_SIZE) {
copyFile(file.first.c_str(), ram_output.get(), ram_buffer, buffer_length);
copyFile(file.filename.c_str(), ram_output.get(), ram_buffer, buffer_length);
header_file_count++;
}
}
Expand All @@ -167,27 +183,27 @@ void DorisCompoundFileWriter::writeCompoundFile() {
uint8_t header_buffer[buffer_length];
for (int i = 0; i < sorted_files.size(); ++i) {
auto file = sorted_files[i];
output->writeString(file.first); // FileName
output->writeString(file.filename); // FileName
// DataOffset
if (i < header_file_count) {
// file data write in header, so we set its offset to -1.
output->writeLong(-1);
} else {
output->writeLong(data_offset);
}
output->writeLong(file.second); // FileLength
output->writeLong(file.filesize); // FileLength
if (i < header_file_count) {
// append data
copyFile(file.first.c_str(), output.get(), header_buffer, buffer_length);
copyFile(file.filename.c_str(), output.get(), header_buffer, buffer_length);
} else {
data_offset += file.second;
data_offset += file.filesize;
}
}
// write rest files' data
uint8_t data_buffer[buffer_length];
for (int i = header_file_count; i < sorted_files.size(); ++i) {
auto file = sorted_files[i];
copyFile(file.first.c_str(), output.get(), data_buffer, buffer_length);
copyFile(file.filename.c_str(), output.get(), data_buffer, buffer_length);
}
out_dir->close();
// NOTE: need to decrease ref count, but not to delete here,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,15 @@ class DorisCompoundFileWriter : LUCENE_BASE {
int64_t bufferLength);

private:
CL_NS(store)::Directory* directory;
class FileInfo {
public:
std::string filename;
int32_t filesize;
};

void sort_files(std::vector<FileInfo>& file_infos);

CL_NS(store)::Directory* directory = nullptr;
};

class CLUCENE_EXPORT DorisCompoundDirectory : public lucene::store::Directory {
Expand Down Expand Up @@ -143,6 +151,7 @@ class DorisCompoundDirectory::FSIndexInput : public lucene::store::BufferedIndex
this->_pos = 0;
this->_handle = handle;
this->_io_ctx.reader_type = ReaderType::READER_QUERY;
this->_io_ctx.read_segment_index = false;
}

protected:
Expand All @@ -161,6 +170,8 @@ class DorisCompoundDirectory::FSIndexInput : public lucene::store::BufferedIndex
const char* getObjectName() const override { return getClassName(); }
static const char* getClassName() { return "FSIndexInput"; }

void setIdxFileCache(bool index) override { _io_ctx.read_segment_index = index; }

doris::Mutex _this_lock;

protected:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ CSIndexInput::CSIndexInput(const CSIndexInput& clone) : BufferedIndexInput(clone
void CSIndexInput::close() {}

DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const char* name,
int32_t read_buffer_size)
int32_t read_buffer_size, bool open_idx_file_cache)
: readBufferSize(read_buffer_size),
dir(d),
ram_dir(new lucene::store::RAMDirectory()),
Expand All @@ -140,6 +140,7 @@ DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const char
.c_str());
}
stream = dir->openInput(name, readBufferSize);
stream->setIdxFileCache(open_idx_file_cache);

int32_t count = stream->readVInt();
ReaderFileEntry* entry = nullptr;
Expand Down Expand Up @@ -327,5 +328,9 @@ std::string DorisCompoundReader::toString() const {
std::string(file_name);
}

CL_NS(store)::IndexInput* DorisCompoundReader::getDorisIndexInput() {
return stream;
}

} // namespace segment_v2
} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory {

public:
DorisCompoundReader(lucene::store::Directory* dir, const char* name,
int32_t _readBufferSize = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE);
int32_t _readBufferSize = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE,
bool open_idx_file_cache = false);
~DorisCompoundReader() override;
void copyFile(const char* file, int64_t file_length, uint8_t* buffer, int64_t buffer_length);
bool list(std::vector<std::string>* names) const override;
Expand All @@ -93,6 +94,7 @@ class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory {
std::string getFileName() { return file_name; }
static const char* getClassName();
const char* getObjectName() const override;
CL_NS(store)::IndexInput* getDorisIndexInput();
};

} // namespace segment_v2
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2151,7 +2151,7 @@ Status Tablet::_cooldown_data() {
}
}};
auto start = std::chrono::steady_clock::now();
if (st = old_rowset->upload_to(dest_fs.get(), new_rowset_id); !st.ok()) {
if (st = old_rowset->upload_to(dest_fs, new_rowset_id); !st.ok()) {
return st;
}

Expand Down
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ update_submodule() {
}

update_submodule "be/src/apache-orc" "apache-orc" "https://github.com/apache/doris-thirdparty/archive/refs/heads/orc.tar.gz"
update_submodule "be/src/clucene" "clucene" "https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene.tar.gz"
update_submodule "be/src/clucene" "clucene" "https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene-2.0.tar.gz"

if [[ "${CLEAN}" -eq 1 && "${BUILD_BE}" -eq 0 && "${BUILD_FE}" -eq 0 && "${BUILD_SPARK_DPP}" -eq 0 ]]; then
clean_gensrc
Expand Down

0 comments on commit 86f722d

Please sign in to comment.