diff --git a/be/src/clucene b/be/src/clucene index 6f8a21ffe15bd78..d20200ed36dda40 160000 --- a/be/src/clucene +++ b/be/src/clucene @@ -1 +1 @@ -Subproject commit 6f8a21ffe15bd78a1cd3e685067ee5c9ed071827 +Subproject commit d20200ed36dda4087489d49457a4da0c44ad4d09 diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c44249da760084b..8fd20f020963c66 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -991,6 +991,7 @@ DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600"); DEFINE_String(inverted_index_searcher_cache_limit, "10%"); // set `true` to enable insert searcher into cache when write inverted index data DEFINE_Bool(enable_write_index_searcher_cache, "true"); +DEFINE_Bool(enable_cooldown_preload_index_reader, "false"); DEFINE_Bool(enable_inverted_index_cache_check_timestamp, "true"); DEFINE_Int32(inverted_index_fd_number_limit_percent, "40"); // 40% diff --git a/be/src/common/config.h b/be/src/common/config.h index 52a2eeee2e34e6b..bf61127c7efb421 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1027,6 +1027,8 @@ DECLARE_mInt32(inverted_index_cache_stale_sweep_time_sec); DECLARE_String(inverted_index_searcher_cache_limit); // set `true` to enable insert searcher into cache when write inverted index data DECLARE_Bool(enable_write_index_searcher_cache); +// Pre-load the inverted index reader into the file cache during cooldown. +DECLARE_Bool(enable_cooldown_preload_index_reader); DECLARE_Bool(enable_inverted_index_cache_check_timestamp); DECLARE_Int32(inverted_index_fd_number_limit_percent); // 50% diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index 176ff0d21b5e898..254918aa1688d1c 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -346,7 +346,8 @@ Status BetaRowset::copy_files_to(const std::string& dir, const RowsetId& new_row return Status::OK(); } -Status BetaRowset::upload_to(io::RemoteFileSystem* dest_fs, const RowsetId& new_rowset_id) { +Status BetaRowset::upload_to(const io::RemoteFileSystemSPtr& dest_fs, + const RowsetId& new_rowset_id) { DCHECK(is_local()); if (num_segments() < 1) { return Status::OK(); @@ -355,6 +356,8 @@ Status BetaRowset::upload_to(io::RemoteFileSystem* dest_fs, const RowsetId& new_ local_paths.reserve(num_segments()); std::vector dest_paths; dest_paths.reserve(num_segments()); + std::vector idx_remote_paths; + idx_remote_paths.reserve(num_segments()); for (int i = 0; i < num_segments(); ++i) { // Note: Here we use relative path for remote. auto remote_seg_path = remote_segment_path(_rowset_meta->tablet_id(), new_rowset_id, i); @@ -373,11 +376,44 @@ Status BetaRowset::upload_to(io::RemoteFileSystem* dest_fs, const RowsetId& new_ index_meta->index_id()); dest_paths.push_back(remote_inverted_index_file); local_paths.push_back(local_inverted_index_file); + idx_remote_paths.push_back(remote_inverted_index_file); } } } auto st = dest_fs->batch_upload(local_paths, dest_paths); if (st.ok()) { + // Pre-write the metadata of the inverted index into the file cache + if (config::enable_cooldown_preload_index_reader) { + if (dest_fs->type() == io::FileSystemType::S3 && config::enable_file_cache) { + auto start = std::chrono::steady_clock::now(); + for (auto& path : idx_remote_paths) { + std::shared_ptr file_reader = nullptr; + if (!dest_fs->open_file(path, &file_reader).ok()) { + continue; + } + + auto& url = file_reader->path().native(); + size_t pos = url.rfind('/'); + if (pos != std::string::npos) { + auto idx_path = url.substr(0, pos); + auto idx_name = url.substr(pos + 1); + + try { + InvertedIndexSearcherCache::build_index_searcher(dest_fs, idx_path, + idx_name); + } catch (CLuceneError& err) { + LOG(WARNING) << "opening index reader clucene err: " << err.what(); + } catch (...) { + LOG(WARNING) << "opening index reader other err"; + } + } + } + auto duration = + std::chrono::duration(std::chrono::steady_clock::now() - start); + LOG(INFO) << "cooldown upload open invert index duration: " << duration.count(); + } + } + DorisMetrics::instance()->upload_rowset_count->increment(1); DorisMetrics::instance()->upload_total_byte->increment(data_disk_size()); } else { diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index 064b8fcb6ef7461..6efb0066ff9f85a 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -76,7 +76,8 @@ class BetaRowset final : public Rowset { Status copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) override; - Status upload_to(io::RemoteFileSystem* dest_fs, const RowsetId& new_rowset_id) override; + Status upload_to(const io::RemoteFileSystemSPtr& dest_fs, + const RowsetId& new_rowset_id) override; // only applicable to alpha rowset, no op here Status remove_old_files(std::vector* files_to_remove) override { diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 7ac31e608e4d66b..407731d8dd2442d 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -211,7 +211,8 @@ class Rowset : public std::enable_shared_from_this { // copy all files to `dir` virtual Status copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) = 0; - virtual Status upload_to(io::RemoteFileSystem* dest_fs, const RowsetId& new_rowset_id) { + virtual Status upload_to(const io::RemoteFileSystemSPtr& dest_fs, + const RowsetId& new_rowset_id) { return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp index f3c68984ebbfa70..72408c66f6b2673 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp @@ -43,12 +43,20 @@ InvertedIndexSearcherCache* InvertedIndexSearcherCache::_s_instance = nullptr; IndexSearcherPtr InvertedIndexSearcherCache::build_index_searcher(const io::FileSystemSPtr& fs, const std::string& index_dir, const std::string& file_name) { - DorisCompoundReader* directory = - new DorisCompoundReader(DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()), - file_name.c_str(), config::inverted_index_read_buffer_size); + bool open_idx_file_cache = true; + DorisCompoundReader* directory = new DorisCompoundReader( + DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()), file_name.c_str(), + config::inverted_index_read_buffer_size, open_idx_file_cache); + auto closeDirectory = true; - auto index_searcher = - std::make_shared(directory, closeDirectory); + auto reader = lucene::index::IndexReader::open( + directory, config::inverted_index_read_buffer_size, closeDirectory); + + bool close_reader = true; + auto index_searcher = std::make_shared(reader, close_reader); + + directory->getDorisIndexInput()->setIdxFileCache(false); + // NOTE: need to cl_refcount-- here, so that directory will be deleted when // index_searcher is destroyed _CLDECDELETE(directory) diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp index de57eed6f85441a..4b9ff7d8dae63e5 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp @@ -97,6 +97,22 @@ CL_NS(store)::Directory* DorisCompoundFileWriter::getDirectory() { return directory; } +void DorisCompoundFileWriter::sort_files(std::vector& file_infos) { + auto file_priority = [](const std::string& filename) { + if (filename.find("segments") != std::string::npos) return 1; + if (filename.find("fnm") != std::string::npos) return 2; + if (filename.find("tii") != std::string::npos) return 3; + return 4; // Other files + }; + + std::sort(file_infos.begin(), file_infos.end(), [&](const FileInfo& a, const FileInfo& b) { + int32_t priority_a = file_priority(a.filename); + int32_t priority_b = file_priority(b.filename); + if (priority_a != priority_b) return priority_a < priority_b; + return a.filesize < b.filesize; + }); +} + void DorisCompoundFileWriter::writeCompoundFile() { // list files in current dir std::vector files; @@ -106,15 +122,15 @@ void DorisCompoundFileWriter::writeCompoundFile() { if (it != files.end()) { files.erase(it); } - // sort file list by file length - std::vector> sorted_files; + + std::vector sorted_files; for (auto file : files) { - sorted_files.push_back(std::make_pair( - file, ((DorisCompoundDirectory*)directory)->fileLength(file.c_str()))); + FileInfo file_info; + file_info.filename = file; + file_info.filesize = ((DorisCompoundDirectory*)directory)->fileLength(file.c_str()); + sorted_files.emplace_back(std::move(file_info)); } - std::sort(sorted_files.begin(), sorted_files.end(), - [](const std::pair& a, - const std::pair& b) { return (a.second < b.second); }); + sort_files(sorted_files); int32_t file_count = sorted_files.size(); @@ -138,12 +154,12 @@ void DorisCompoundFileWriter::writeCompoundFile() { const int64_t buffer_length = 16384; uint8_t ram_buffer[buffer_length]; for (auto file : sorted_files) { - ram_output->writeString(file.first); // file name - ram_output->writeLong(0); // data offset - ram_output->writeLong(file.second); // file length - header_file_length += file.second; + ram_output->writeString(file.filename); // file name + ram_output->writeLong(0); // data offset + ram_output->writeLong(file.filesize); // file length + header_file_length += file.filesize; if (header_file_length <= MAX_HEADER_DATA_SIZE) { - copyFile(file.first.c_str(), ram_output.get(), ram_buffer, buffer_length); + copyFile(file.filename.c_str(), ram_output.get(), ram_buffer, buffer_length); header_file_count++; } } @@ -167,7 +183,7 @@ void DorisCompoundFileWriter::writeCompoundFile() { uint8_t header_buffer[buffer_length]; for (int i = 0; i < sorted_files.size(); ++i) { auto file = sorted_files[i]; - output->writeString(file.first); // FileName + output->writeString(file.filename); // FileName // DataOffset if (i < header_file_count) { // file data write in header, so we set its offset to -1. @@ -175,19 +191,19 @@ void DorisCompoundFileWriter::writeCompoundFile() { } else { output->writeLong(data_offset); } - output->writeLong(file.second); // FileLength + output->writeLong(file.filesize); // FileLength if (i < header_file_count) { // append data - copyFile(file.first.c_str(), output.get(), header_buffer, buffer_length); + copyFile(file.filename.c_str(), output.get(), header_buffer, buffer_length); } else { - data_offset += file.second; + data_offset += file.filesize; } } // write rest files' data uint8_t data_buffer[buffer_length]; for (int i = header_file_count; i < sorted_files.size(); ++i) { auto file = sorted_files[i]; - copyFile(file.first.c_str(), output.get(), data_buffer, buffer_length); + copyFile(file.filename.c_str(), output.get(), data_buffer, buffer_length); } out_dir->close(); // NOTE: need to decrease ref count, but not to delete here, diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h index b89c63835396fec..eb08ad98a3be45b 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h @@ -55,7 +55,15 @@ class DorisCompoundFileWriter : LUCENE_BASE { int64_t bufferLength); private: - CL_NS(store)::Directory* directory; + class FileInfo { + public: + std::string filename; + int32_t filesize; + }; + + void sort_files(std::vector& file_infos); + + CL_NS(store)::Directory* directory = nullptr; }; class CLUCENE_EXPORT DorisCompoundDirectory : public lucene::store::Directory { @@ -143,6 +151,7 @@ class DorisCompoundDirectory::FSIndexInput : public lucene::store::BufferedIndex this->_pos = 0; this->_handle = handle; this->_io_ctx.reader_type = ReaderType::READER_QUERY; + this->_io_ctx.read_segment_index = false; } protected: @@ -161,6 +170,8 @@ class DorisCompoundDirectory::FSIndexInput : public lucene::store::BufferedIndex const char* getObjectName() const override { return getClassName(); } static const char* getClassName() { return "FSIndexInput"; } + void setIdxFileCache(bool index) override { _io_ctx.read_segment_index = index; } + doris::Mutex _this_lock; protected: diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp index fce1009be51ed12..f9bfeb5aaad37d9 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp @@ -124,7 +124,7 @@ CSIndexInput::CSIndexInput(const CSIndexInput& clone) : BufferedIndexInput(clone void CSIndexInput::close() {} DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const char* name, - int32_t read_buffer_size) + int32_t read_buffer_size, bool open_idx_file_cache) : readBufferSize(read_buffer_size), dir(d), ram_dir(new lucene::store::RAMDirectory()), @@ -140,6 +140,7 @@ DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const char .c_str()); } stream = dir->openInput(name, readBufferSize); + stream->setIdxFileCache(open_idx_file_cache); int32_t count = stream->readVInt(); ReaderFileEntry* entry = nullptr; @@ -327,5 +328,9 @@ std::string DorisCompoundReader::toString() const { std::string(file_name); } +CL_NS(store)::IndexInput* DorisCompoundReader::getDorisIndexInput() { + return stream; +} + } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h index 75bf1ab633e0738..995a8a5c4639208 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h @@ -73,7 +73,8 @@ class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory { public: DorisCompoundReader(lucene::store::Directory* dir, const char* name, - int32_t _readBufferSize = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE); + int32_t _readBufferSize = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE, + bool open_idx_file_cache = false); ~DorisCompoundReader() override; void copyFile(const char* file, int64_t file_length, uint8_t* buffer, int64_t buffer_length); bool list(std::vector* names) const override; @@ -93,6 +94,7 @@ class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory { std::string getFileName() { return file_name; } static const char* getClassName(); const char* getObjectName() const override; + CL_NS(store)::IndexInput* getDorisIndexInput(); }; } // namespace segment_v2 diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 2cbd54633971a1c..67995c2f54ab56a 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2151,7 +2151,7 @@ Status Tablet::_cooldown_data() { } }}; auto start = std::chrono::steady_clock::now(); - if (st = old_rowset->upload_to(dest_fs.get(), new_rowset_id); !st.ok()) { + if (st = old_rowset->upload_to(dest_fs, new_rowset_id); !st.ok()) { return st; } diff --git a/build.sh b/build.sh index a7e31fa9b3b8359..15f22b03a72f284 100755 --- a/build.sh +++ b/build.sh @@ -302,7 +302,7 @@ update_submodule() { } update_submodule "be/src/apache-orc" "apache-orc" "https://github.com/apache/doris-thirdparty/archive/refs/heads/orc.tar.gz" -update_submodule "be/src/clucene" "clucene" "https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene.tar.gz" +update_submodule "be/src/clucene" "clucene" "https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene-2.0.tar.gz" if [[ "${CLEAN}" -eq 1 && "${BUILD_BE}" -eq 0 && "${BUILD_FE}" -eq 0 && "${BUILD_SPARK_DPP}" -eq 0 ]]; then clean_gensrc