Skip to content

Commit

Permalink
[Improvement](inverted index) enable bkd index reader cache and refac…
Browse files Browse the repository at this point in the history
…tor inverted index searcher cache
  • Loading branch information
airborne12 committed Nov 21, 2023
1 parent 6fb57c2 commit 0a1908f
Show file tree
Hide file tree
Showing 6 changed files with 382 additions and 239 deletions.
150 changes: 135 additions & 15 deletions be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@

#include <CLucene/debug/mem.h>
#include <CLucene/search/IndexSearcher.h>
#include <CLucene/util/bkd/bkd_reader.h>
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
#include <string.h>
#include <sys/resource.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <iostream>
#include <optional>

#include "common/logging.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/inverted_index_compound_directory.h"
#include "olap/rowset/segment_v2/inverted_index_compound_reader.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/defer_op.h"
Expand All @@ -39,19 +42,78 @@
namespace doris {
namespace segment_v2 {

IndexSearcherPtr InvertedIndexSearcherCache::build_index_searcher(const io::FileSystemSPtr& fs,
const std::string& index_dir,
const std::string& file_name) {
Status FulltextIndexSearcherBuilder::build(const io::FileSystemSPtr& fs,
const std::string& index_dir,
const std::string& file_name,
OptionalIndexSearcherPtr& output_searcher) {
DorisCompoundReader* directory =
new DorisCompoundReader(DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()),
file_name.c_str(), config::inverted_index_read_buffer_size);
auto closeDirectory = true;
auto index_searcher =
std::make_shared<lucene::search::IndexSearcher>(directory, closeDirectory);
if (!index_searcher) {
_CLDECDELETE(directory)
output_searcher = std::nullopt;
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"FulltextIndexSearcherBuilder build index_searcher error.");
}
// NOTE: need to cl_refcount-- here, so that directory will be deleted when
// index_searcher is destroyed
_CLDECDELETE(directory)
return index_searcher;
output_searcher = index_searcher;
return Status::OK();
}

Status BKDIndexSearcherBuilder::build(const io::FileSystemSPtr& fs, const std::string& index_dir,
const std::string& file_name,
OptionalIndexSearcherPtr& output_searcher) {
try {
auto compound_reader = std::make_unique<DorisCompoundReader>(
DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()), file_name.c_str(),
config::inverted_index_read_buffer_size);

if (!compound_reader) {
LOG(ERROR) << "compound reader is null when get directory for:" << index_dir << "/"
<< file_name;
output_searcher = std::nullopt;
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"compound reader is null");
}
CLuceneError err;
std::unique_ptr<lucene::store::IndexInput> data_in;
std::unique_ptr<lucene::store::IndexInput> meta_in;
std::unique_ptr<lucene::store::IndexInput> index_in;

if (!compound_reader->openInput(
InvertedIndexDescriptor::get_temporary_bkd_index_data_file_name().c_str(),
data_in, err) ||
!compound_reader->openInput(
InvertedIndexDescriptor::get_temporary_bkd_index_meta_file_name().c_str(),
meta_in, err) ||
!compound_reader->openInput(
InvertedIndexDescriptor::get_temporary_bkd_index_file_name().c_str(), index_in,
err)) {
// Consider logging the error or handling it more comprehensively
LOG(ERROR) << "open bkd index input error: {}" << err.what();
output_searcher = std::nullopt;
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"open bkd index input error");
}
auto bkd_reader = std::make_shared<lucene::util::bkd::bkd_reader>(data_in.release());
if (0 == bkd_reader->read_meta(meta_in.get())) {
VLOG_NOTICE << "bkd index file is empty:" << compound_reader->toString();
output_searcher = std::nullopt;
return Status::EndOfFile("bkd index file is empty");
}

bkd_reader->read_index(index_in.get());
output_searcher = IndexSearcherPtr {bkd_reader};
return Status::OK();
} catch (const CLuceneError& e) {
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"BKDIndexSearcherBuilder build error: {}", e.what());
}
}

InvertedIndexSearcherCache* InvertedIndexSearcherCache::create_global_instance(
Expand Down Expand Up @@ -98,13 +160,18 @@ InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t
}
}

Status InvertedIndexSearcherCache::get_index_searcher(const io::FileSystemSPtr& fs,
const std::string& index_dir,
const std::string& file_name,
InvertedIndexCacheHandle* cache_handle,
OlapReaderStatistics* stats, bool use_cache) {
Status InvertedIndexSearcherCache::get_index_searcher(
const io::FileSystemSPtr& fs, const std::string& index_dir, const std::string& file_name,
InvertedIndexCacheHandle* cache_handle, OlapReaderStatistics* stats,
InvertedIndexReaderType reader_type, bool use_cache) {
auto file_path = index_dir + "/" + file_name;

bool exists = false;
RETURN_IF_ERROR(fs->exists(file_path, &exists));
if (!exists) {
LOG(WARNING) << "inverted index: " << file_path << " not exist.";
return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
"inverted index input file {} not found", file_path);
}
using namespace std::chrono;
auto start_time = steady_clock::now();
Defer cost {[&]() {
Expand All @@ -119,14 +186,40 @@ Status InvertedIndexSearcherCache::get_index_searcher(const io::FileSystemSPtr&
}

cache_handle->owned = !use_cache;
IndexSearcherPtr index_searcher = nullptr;
IndexSearcherPtr index_searcher;
std::unique_ptr<IndexSearcherBuilder> index_builder = nullptr;
auto mem_tracker =
std::unique_ptr<MemTracker>(new MemTracker("InvertedIndexSearcherCacheWithRead"));
#ifndef BE_TEST
{
SCOPED_RAW_TIMER(&stats->inverted_index_searcher_open_timer);
SCOPED_CONSUME_MEM_TRACKER(mem_tracker.get());
index_searcher = build_index_searcher(fs, index_dir, file_name);
switch (reader_type) {
case InvertedIndexReaderType::STRING_TYPE:
case InvertedIndexReaderType::FULLTEXT: {
index_builder = std::make_unique<FulltextIndexSearcherBuilder>();
break;
}
case InvertedIndexReaderType::BKD: {
index_builder = std::make_unique<BKDIndexSearcherBuilder>();
break;
}

default:
LOG(ERROR) << "InvertedIndexReaderType:" << reader_type_to_string(reader_type)
<< " is not support for InvertedIndexSearcherCache";
return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
"InvertedIndexSearcherCache do not support reader type.");
}
OptionalIndexSearcherPtr result;
RETURN_IF_ERROR(index_builder->build(fs, index_dir, file_name, result));
if (!result.has_value()) {
LOG(ERROR) << "InvertedIndexReaderType:" << reader_type_to_string(reader_type)
<< " build for InvertedIndexSearcherCache error";
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"InvertedIndexSearcherCache build error.");
}
index_searcher = *result;
}
#endif

Expand All @@ -144,7 +237,8 @@ Status InvertedIndexSearcherCache::get_index_searcher(const io::FileSystemSPtr&

Status InvertedIndexSearcherCache::insert(const io::FileSystemSPtr& fs,
const std::string& index_dir,
const std::string& file_name) {
const std::string& file_name,
InvertedIndexReaderType reader_type) {
auto file_path = index_dir + "/" + file_name;

using namespace std::chrono;
Expand All @@ -156,13 +250,39 @@ Status InvertedIndexSearcherCache::insert(const io::FileSystemSPtr& fs,

InvertedIndexSearcherCache::CacheKey cache_key(file_path);
IndexCacheValuePtr cache_value = std::make_unique<InvertedIndexSearcherCache::CacheValue>();
IndexSearcherPtr index_searcher = nullptr;
IndexSearcherPtr index_searcher;
std::unique_ptr<IndexSearcherBuilder> builder = nullptr;
auto mem_tracker =
std::unique_ptr<MemTracker>(new MemTracker("InvertedIndexSearcherCacheWithInsert"));
#ifndef BE_TEST
{
SCOPED_CONSUME_MEM_TRACKER(mem_tracker.get());
index_searcher = build_index_searcher(fs, index_dir, file_name);
switch (reader_type) {
case InvertedIndexReaderType::STRING_TYPE:
case InvertedIndexReaderType::FULLTEXT: {
builder = std::make_unique<FulltextIndexSearcherBuilder>();
break;
}
case InvertedIndexReaderType::BKD: {
builder = std::make_unique<BKDIndexSearcherBuilder>();
break;
}

default:
LOG(ERROR) << "InvertedIndexReaderType:" << reader_type_to_string(reader_type)
<< " is not support for InvertedIndexSearcherCache";
return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
"InvertedIndexSearcherCache do not support reader type.");
}
OptionalIndexSearcherPtr result;
RETURN_IF_ERROR(builder->build(fs, index_dir, file_name, result));
if (!result.has_value()) {
LOG(ERROR) << "InvertedIndexReaderType:" << reader_type_to_string(reader_type)
<< " build for InvertedIndexSearcherCache error";
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"InvertedIndexSearcherCache build error.");
}
index_searcher = *result;
}
#endif

Expand Down
44 changes: 35 additions & 9 deletions be/src/olap/rowset/segment_v2/inverted_index_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <roaring/roaring.hh>
#include <string>
#include <utility>
#include <variant>

#include "common/config.h"
#include "common/status.h"
Expand All @@ -54,16 +55,44 @@ namespace lucene {
namespace search {
class IndexSearcher;
} // namespace search
namespace util {
namespace bkd {
class bkd_reader;
}
} // namespace util
} // namespace lucene

namespace doris {
struct OlapReaderStatistics;

namespace segment_v2 {
using IndexSearcherPtr = std::shared_ptr<lucene::search::IndexSearcher>;
using FulltextIndexSearcherPtr = std::shared_ptr<lucene::search::IndexSearcher>;
using BKDIndexSearcherPtr = std::shared_ptr<lucene::util::bkd::bkd_reader>;
using IndexSearcherPtr = std::variant<FulltextIndexSearcherPtr, BKDIndexSearcherPtr>;
using OptionalIndexSearcherPtr = std::optional<IndexSearcherPtr>;

class InvertedIndexCacheHandle;

class IndexSearcherBuilder {
public:
virtual Status build(const io::FileSystemSPtr& fs, const std::string& index_dir,
const std::string& file_name,
OptionalIndexSearcherPtr& output_searcher) = 0;
virtual ~IndexSearcherBuilder() = default;
};

class FulltextIndexSearcherBuilder : public IndexSearcherBuilder {
public:
Status build(const io::FileSystemSPtr& fs, const std::string& index_dir,
const std::string& file_name, OptionalIndexSearcherPtr& output_searcher) override;
};

class BKDIndexSearcherBuilder : public IndexSearcherBuilder {
public:
Status build(const io::FileSystemSPtr& fs, const std::string& index_dir,
const std::string& file_name, OptionalIndexSearcherPtr& output_searcher) override;
};

class InvertedIndexSearcherCache : public LRUCachePolicy {
public:
// The cache key of index_searcher lru cache
Expand All @@ -73,7 +102,7 @@ class InvertedIndexSearcherCache : public LRUCachePolicy {
};

// The cache value of index_searcher lru cache.
// Holding a opened index_searcher.
// Holding an opened index_searcher.
struct CacheValue : public LRUCacheValueBase {
IndexSearcherPtr index_searcher;
};
Expand All @@ -95,19 +124,16 @@ class InvertedIndexSearcherCache : public LRUCachePolicy {
return ExecEnv::GetInstance()->get_inverted_index_searcher_cache();
}

static IndexSearcherPtr build_index_searcher(const io::FileSystemSPtr& fs,
const std::string& index_dir,
const std::string& file_name);

InvertedIndexSearcherCache(size_t capacity, uint32_t num_shards);

Status get_index_searcher(const io::FileSystemSPtr& fs, const std::string& index_dir,
const std::string& file_name, InvertedIndexCacheHandle* cache_handle,
OlapReaderStatistics* stats, bool use_cache = true);
OlapReaderStatistics* stats, InvertedIndexReaderType reader_type,
bool use_cache = true);

// function `insert` called after inverted index writer close
Status insert(const io::FileSystemSPtr& fs, const std::string& index_dir,
const std::string& file_name);
const std::string& file_name, InvertedIndexReaderType reader_type);

// function `erase` called after compaction remove segment
Status erase(const std::string& index_file_path);
Expand Down Expand Up @@ -211,7 +237,7 @@ class InvertedIndexQueryCache : public LRUCachePolicy {
key_buf.append("/");
key_buf.append(column_name);
key_buf.append("/");
auto query_type_str = InvertedIndexQueryType_toString(query_type);
auto query_type_str = query_type_to_string(query_type);
if (query_type_str.empty()) {
return "";
}
Expand Down
46 changes: 45 additions & 1 deletion be/src/olap/rowset/segment_v2/inverted_index_query_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,50 @@
namespace doris {
namespace segment_v2 {

enum class InvertedIndexReaderType {
UNKNOWN = -1,
FULLTEXT = 0,
STRING_TYPE = 1,
BKD = 2,
};

template <InvertedIndexReaderType T>
constexpr const char* InvertedIndexReaderTypeToString();

template <>
constexpr const char* InvertedIndexReaderTypeToString<InvertedIndexReaderType::UNKNOWN>() {
return "UNKNOWN";
}

template <>
constexpr const char* InvertedIndexReaderTypeToString<InvertedIndexReaderType::FULLTEXT>() {
return "FULLTEXT";
}

template <>
constexpr const char* InvertedIndexReaderTypeToString<InvertedIndexReaderType::STRING_TYPE>() {
return "STRING_TYPE";
}

template <>
constexpr const char* InvertedIndexReaderTypeToString<InvertedIndexReaderType::BKD>() {
return "BKD";
}

inline std::string reader_type_to_string(InvertedIndexReaderType query_type) {
switch (query_type) {
case InvertedIndexReaderType::UNKNOWN:
return InvertedIndexReaderTypeToString<InvertedIndexReaderType::UNKNOWN>();
case InvertedIndexReaderType::FULLTEXT:
return InvertedIndexReaderTypeToString<InvertedIndexReaderType::FULLTEXT>();
case InvertedIndexReaderType::STRING_TYPE:
return InvertedIndexReaderTypeToString<InvertedIndexReaderType::STRING_TYPE>();
case InvertedIndexReaderType::BKD:
return InvertedIndexReaderTypeToString<InvertedIndexReaderType::BKD>();
}
return ""; // Explicitly handle all cases
}

enum class InvertedIndexQueryType {
UNKNOWN_QUERY = -1,
EQUAL_QUERY = 0,
Expand All @@ -34,7 +78,7 @@ enum class InvertedIndexQueryType {
MATCH_PHRASE_QUERY = 7,
};

inline std::string InvertedIndexQueryType_toString(InvertedIndexQueryType query_type) {
inline std::string query_type_to_string(InvertedIndexQueryType query_type) {
switch (query_type) {
case InvertedIndexQueryType::UNKNOWN_QUERY: {
return "UNKNOWN";
Expand Down
Loading

0 comments on commit 0a1908f

Please sign in to comment.