Skip to content

Commit

Permalink
Merge branch 'master' into fix_fe_type
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Oct 10, 2024
2 parents 0501c34 + 1e8ea55 commit e154fba
Show file tree
Hide file tree
Showing 90 changed files with 4,535 additions and 1,569 deletions.
26 changes: 20 additions & 6 deletions be/src/agent/be_exec_version_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ Status BeExecVersionManager::check_be_exec_version(int be_exec_version) {

int BeExecVersionManager::get_function_compatibility(int be_exec_version,
std::string function_name) {
if (_function_restrict_map.contains(function_name) && be_exec_version != get_newest_version()) {
throw Exception(Status::InternalError(
"function {} do not support old be exec version, maybe it's because doris are "
"doing a rolling upgrade. newest_version={}, input_be_exec_version={}",
function_name, get_newest_version(), be_exec_version));
}

auto it = _function_change_map.find(function_name);
if (it == _function_change_map.end()) {
// 0 means no compatibility issues need to be dealt with
Expand Down Expand Up @@ -82,7 +89,7 @@ void BeExecVersionManager::check_function_compatibility(int current_be_exec_vers
* 3: start from doris 2.0.0 (by some mistakes)
* a. aggregation function do not serialize bitmap to string.
* b. support window funnel mode.
* 4/5: start from doris 2.1.0
* 4: start from doris 2.1.0
* a. ignore this line, window funnel mode should be enabled from 2.0.
* b. array contains/position/countequal function return nullable in less situations.
* c. cleared old version of Version 2.
Expand All @@ -92,15 +99,22 @@ void BeExecVersionManager::check_function_compatibility(int current_be_exec_vers
* g. do local merge of remote runtime filter
* h. "now": ALWAYS_NOT_NULLABLE -> DEPEND_ON_ARGUMENTS
*
* 7: start from doris 3.0.0
* 5: start from doris 3.0.0
* a. change some agg function nullable property: PR #37215
*
* 6: start from doris 3.0.1 and 2.1.6
* a. change the impl of percentile (need fix)
* b. clear old version of version 3->4
* c. change FunctionIsIPAddressInRange from AlwaysNotNullable to DependOnArguments
* d. change some agg function nullable property: PR #37215
* e. change variant serde to fix PR #38413
* f. support const column in serialize/deserialize function: PR #41175
* d. change variant serde to fix PR #38413
*
* 7: start from doris 3.0.2
* a. window funnel logic change
* b. support const column in serialize/deserialize function: PR #41175
*/
const int BeExecVersionManager::max_be_exec_version = 7;

const int BeExecVersionManager::max_be_exec_version = 8;
const int BeExecVersionManager::min_be_exec_version = 0;
std::map<std::string, std::set<int>> BeExecVersionManager::_function_change_map {};
std::set<std::string> BeExecVersionManager::_function_restrict_map;
} // namespace doris
9 changes: 7 additions & 2 deletions be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

namespace doris {

constexpr static int AGG_FUNCTION_NEW_WINDOW_FUNNEL = 6;
constexpr inline int BITMAP_SERDE = 3;
constexpr inline int USE_NEW_SERDE = 4; // release on DORIS version 2.1
constexpr inline int OLD_WAL_SERDE = 3; // use to solve compatibility issues, see pr #32299
Expand All @@ -34,7 +33,7 @@ constexpr inline int VARIANT_SERDE = 6; // change variant serde to fix P
constexpr inline int AGGREGATION_2_1_VERSION =
6; // some aggregation changed the data format after this version
constexpr inline int USE_CONST_SERDE =
7; // support const column in serialize/deserialize function: PR #41175
8; // support const column in serialize/deserialize function: PR #41175

class BeExecVersionManager {
public:
Expand All @@ -59,11 +58,17 @@ class BeExecVersionManager {
_function_change_map[function_name].insert(breaking_old_version);
}

static void registe_restrict_function_compatibility(std::string function_name) {
_function_restrict_map.insert(function_name);
}

private:
static const int max_be_exec_version;
static const int min_be_exec_version;
// [function name] -> [breaking change start version]
static std::map<std::string, std::set<int>> _function_change_map;
// those function must has input newest be exec version
static std::set<std::string> _function_restrict_map;
};

} // namespace doris
12 changes: 9 additions & 3 deletions be/src/exprs/bloom_filter_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,20 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase {
}

Status merge(BloomFilterFuncBase* bloomfilter_func) {
DCHECK(bloomfilter_func != nullptr);
DCHECK(bloomfilter_func->_bloom_filter != nullptr);
if (bloomfilter_func == nullptr) {
return Status::InternalError("bloomfilter_func is nullptr");
}
if (bloomfilter_func->_bloom_filter == nullptr) {
return Status::InternalError("bloomfilter_func->_bloom_filter is nullptr");
}
// If `_inited` is false, there is no memory allocated in bloom filter and this is the first
// call for `merge` function. So we just reuse this bloom filter, and we don't need to
// allocate memory again.
if (!_inited) {
auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
DCHECK(_bloom_filter == nullptr);
if (_bloom_filter == nullptr) {
return Status::InternalError("_bloom_filter is nullptr");
}
_bloom_filter = bloomfilter_func->_bloom_filter;
_bloom_filter_alloced = other_func->_bloom_filter_alloced;
_inited = true;
Expand Down
17 changes: 17 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index/analyzer/analyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,21 @@ std::vector<std::string> InvertedIndexAnalyzer::get_analyse_result(
return analyse_result;
}

std::vector<std::string> InvertedIndexAnalyzer::get_analyse_result(
const std::string& search_str, const std::string& field_name,
InvertedIndexQueryType query_type, const std::map<std::string, std::string>& properties) {
InvertedIndexCtxSPtr inverted_index_ctx = std::make_shared<InvertedIndexCtx>(
get_inverted_index_parser_type_from_string(
get_parser_string_from_properties(properties)),
get_parser_mode_string_from_properties(properties),
get_parser_char_filter_map_from_properties(properties),
get_parser_lowercase_from_properties(properties),
get_parser_stopwords_from_properties(properties));
auto analyzer = create_analyzer(inverted_index_ctx.get());
inverted_index_ctx->analyzer = analyzer.get();
auto reader = create_reader(inverted_index_ctx->char_filter_map);
reader->init(search_str.data(), search_str.size(), true);
return get_analyse_result(reader.get(), analyzer.get(), field_name, query_type);
}

} // namespace doris::segment_v2::inverted_index
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Analyzer;
} // namespace lucene

namespace doris::segment_v2::inverted_index {

class InvertedIndexAnalyzer {
public:
static std::unique_ptr<lucene::util::Reader> create_reader(CharFilterMap& char_filter_map);
Expand All @@ -44,5 +45,10 @@ class InvertedIndexAnalyzer {
const std::string& field_name,
InvertedIndexQueryType query_type,
bool drop_duplicates = true);

static std::vector<std::string> get_analyse_result(
const std::string& search_str, const std::string& field_name,
InvertedIndexQueryType query_type,
const std::map<std::string, std::string>& properties);
};
} // namespace doris::segment_v2::inverted_index
87 changes: 60 additions & 27 deletions be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@

#include "phrase_query.h"

#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/split.hpp>
#include <charconv>

#include "CLucene/index/Terms.h"
#include "olap/rowset/segment_v2/inverted_index/analyzer/analyzer.h"

namespace doris::segment_v2 {

template <typename Derived>
Expand Down Expand Up @@ -141,19 +146,21 @@ void PhraseQuery::add(const InvertedIndexQueryInfo& query_info) {

_slop = query_info.slop;
if (_slop == 0 || query_info.ordered) {
if (query_info.ordered) {
_additional_terms = query_info.additional_terms;
}
// Logic for no slop query and ordered phrase query
add(query_info.field_name, query_info.terms);
} else {
// Simple slop query follows the default phrase query algorithm
auto query = std::make_unique<CL_NS(search)::PhraseQuery>();
_phrase_query = std::make_unique<CL_NS(search)::PhraseQuery>();
for (const auto& term : query_info.terms) {
std::wstring ws_term = StringUtil::string_to_wstring(term);
auto* t = _CLNEW lucene::index::Term(query_info.field_name.c_str(), ws_term.c_str());
query->add(t);
_phrase_query->add(t);
_CLDECDELETE(t);
}
query->setSlop(_slop);
_matcher = std::move(query);
_phrase_query->setSlop(_slop);
}
}

Expand All @@ -173,13 +180,16 @@ void PhraseQuery::add(const std::wstring& field_name, const std::vector<std::str
}

std::vector<TermIterator> iterators;
auto ensureTermPosition = [this, &iterators, &field_name](const std::string& term) {
auto ensureTermPosition = [this, &iterators, &field_name](const std::string& term,
bool is_save_iter = true) {
std::wstring ws_term = StringUtil::string_to_wstring(term);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
_terms.push_back(t);
TermPositions* term_pos = _searcher->getReader()->termPositions(t);
_term_docs.push_back(term_pos);
iterators.emplace_back(term_pos);
if (is_save_iter) {
iterators.emplace_back(term_pos);
}
return term_pos;
};

Expand All @@ -190,16 +200,29 @@ void PhraseQuery::add(const std::wstring& field_name, const std::vector<std::str
auto* term_pos = ensureTermPosition(term);
matcher._postings.emplace_back(term_pos, i);
}
_matcher = matcher;
_matchers.emplace_back(matcher);
} else {
OrderedSloppyPhraseMatcher matcher;
for (size_t i = 0; i < terms.size(); i++) {
const auto& term = terms[i];
auto* term_pos = ensureTermPosition(term);
matcher._postings.emplace_back(term_pos, i);
{
OrderedSloppyPhraseMatcher single_matcher;
for (size_t i = 0; i < terms.size(); i++) {
const auto& term = terms[i];
auto* term_pos = ensureTermPosition(term);
single_matcher._postings.emplace_back(term_pos, i);
}
single_matcher._allowed_slop = _slop;
_matchers.emplace_back(single_matcher);
}
{
for (auto& terms : _additional_terms) {
ExactPhraseMatcher single_matcher;
for (size_t i = 0; i < terms.size(); i++) {
const auto& term = terms[i];
auto* term_pos = ensureTermPosition(term, false);
single_matcher._postings.emplace_back(term_pos, i);
}
_matchers.emplace_back(std::move(single_matcher));
}
}
matcher._allowed_slop = _slop;
_matcher = matcher;
}

std::sort(iterators.begin(), iterators.end(), [](const TermIterator& a, const TermIterator& b) {
Expand All @@ -214,9 +237,9 @@ void PhraseQuery::add(const std::wstring& field_name, const std::vector<std::str
}

void PhraseQuery::search(roaring::Roaring& roaring) {
if (std::holds_alternative<PhraseQueryPtr>(_matcher)) {
if (_phrase_query) {
_searcher->_search(
std::get<PhraseQueryPtr>(_matcher).get(),
_phrase_query.get(),
[&roaring](const int32_t docid, const float_t /*score*/) { roaring.add(docid); });
} else {
if (_lead1.isEmpty()) {
Expand Down Expand Up @@ -288,17 +311,9 @@ int32_t PhraseQuery::do_next(int32_t doc) {
}

bool PhraseQuery::matches(int32_t doc) {
return std::visit(
[&doc](auto&& m) -> bool {
using T = std::decay_t<decltype(m)>;
if constexpr (std::is_same_v<T, PhraseQueryPtr>) {
_CLTHROWA(CL_ERR_IllegalArgument,
"PhraseQueryPtr does not support matches function");
} else {
return m.matches(doc);
}
},
_matcher);
return std::ranges::all_of(_matchers, [&doc](auto&& matcher) {
return std::visit([&doc](auto&& m) -> bool { return m.matches(doc); }, matcher);
});
}

void PhraseQuery::parser_slop(std::string& query, InvertedIndexQueryInfo& query_info) {
Expand Down Expand Up @@ -343,6 +358,24 @@ void PhraseQuery::parser_slop(std::string& query, InvertedIndexQueryInfo& query_
}
}

void PhraseQuery::parser_info(std::string& query, const std::string& field_name,
InvertedIndexQueryType query_type,
const std::map<std::string, std::string>& properties,
InvertedIndexQueryInfo& query_info, bool sequential_opt) {
parser_slop(query, query_info);
query_info.terms = inverted_index::InvertedIndexAnalyzer::get_analyse_result(
query, field_name, query_type, properties);
if (sequential_opt && query_info.ordered) {
std::vector<std::string> t_querys;
boost::split(t_querys, query, boost::algorithm::is_any_of(" "));
for (auto& t_query : t_querys) {
auto terms = inverted_index::InvertedIndexAnalyzer::get_analyse_result(
t_query, field_name, query_type, properties);
query_info.additional_terms.emplace_back(std::move(terms));
}
}
}

template class PhraseMatcherBase<ExactPhraseMatcher>;
template class PhraseMatcherBase<OrderedSloppyPhraseMatcher>;

Expand Down
14 changes: 11 additions & 3 deletions be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

#include <variant>

#include "olap/rowset/segment_v2/inverted_index_query_type.h"

CL_NS_USE(index)
CL_NS_USE(search)

Expand Down Expand Up @@ -76,11 +78,11 @@ class OrderedSloppyPhraseMatcher : public PhraseMatcherBase<OrderedSloppyPhraseM
int32_t _match_width = -1;
};

using PhraseQueryPtr = std::unique_ptr<CL_NS(search)::PhraseQuery>;
// ExactPhraseMatcher: x match_phrase 'aaa bbb'
// PhraseQueryPtr: x match_phrase 'aaa bbb ~2', support slop
// OrderedSloppyPhraseMatcher: x match_phrase 'aaa bbb ~2+', ensuring that the words appear in the specified order.
using Matcher = std::variant<ExactPhraseMatcher, OrderedSloppyPhraseMatcher, PhraseQueryPtr>;
using PhraseQueryPtr = std::unique_ptr<CL_NS(search)::PhraseQuery>;
using Matcher = std::variant<ExactPhraseMatcher, OrderedSloppyPhraseMatcher>;

class PhraseQuery : public Query {
public:
Expand All @@ -103,6 +105,10 @@ class PhraseQuery : public Query {

public:
static void parser_slop(std::string& query, InvertedIndexQueryInfo& query_info);
static void parser_info(std::string& query, const std::string& field_name,
InvertedIndexQueryType query_type,
const std::map<std::string, std::string>& properties,
InvertedIndexQueryInfo& query_info, bool sequential_opt);

private:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
Expand All @@ -117,7 +123,9 @@ class PhraseQuery : public Query {
std::vector<TermDocs*> _term_docs;

int32_t _slop = 0;
Matcher _matcher;
std::vector<std::vector<std::string>> _additional_terms;
PhraseQueryPtr _phrase_query = nullptr;
std::vector<Matcher> _matchers;
};

} // namespace doris::segment_v2
10 changes: 10 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index/query/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,18 @@ namespace doris::segment_v2 {
struct InvertedIndexQueryInfo {
std::wstring field_name;
std::vector<std::string> terms;
std::vector<std::vector<std::string>> additional_terms;
int32_t slop = 0;
bool ordered = false;

std::string to_string() {
std::string s;
s += std::to_string(terms.size()) + ", ";
s += std::to_string(additional_terms.size()) + ", ";
s += std::to_string(slop) + ", ";
s += std::to_string(ordered);
return s;
}
};

class Query {
Expand Down
Loading

0 comments on commit e154fba

Please sign in to comment.