Skip to content

Commit

Permalink
Merge branch 'branch-2.0' into branch-2.0-fix-colocate-test
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaokang authored Dec 22, 2023
2 parents b11dde3 + 4b445d9 commit d90f171
Show file tree
Hide file tree
Showing 405 changed files with 20,715 additions and 561 deletions.
2 changes: 1 addition & 1 deletion be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ set(BUILD_SHARED_LIBS OFF)

option(ENABLE_CLANG_COVERAGE "coverage option" OFF)
if (ENABLE_CLANG_COVERAGE AND ENABLE_CLANG_COVERAGE STREQUAL ON AND COMPILER_CLANG)
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fprofile-instr-generate -fcoverage-mapping")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fprofile-instr-generate -fcoverage-mapping -DLLVM_PROFILE")
endif ()

if (${MAKE_TEST} STREQUAL "ON")
Expand Down
1 change: 0 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,6 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
.tag("retry_time", retry_time)
.error(status);
++retry_time;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>() && !is_task_timeout) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1104,8 +1104,12 @@ DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
// Download binlog rate limit, unit is KB/s, 0 means no limit
DEFINE_Int32(download_binlog_rate_limit_kbs, "0");

DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000");

DEFINE_Bool(enable_snapshot_action, "false");

DEFINE_mInt32(s3_writer_buffer_allocation_timeout_second, "60");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1160,9 +1160,14 @@ DECLARE_Int32(ingest_binlog_work_pool_size);
// Download binlog rate limit, unit is KB/s
DECLARE_Int32(download_binlog_rate_limit_kbs);

DECLARE_mInt32(buffered_reader_read_timeout_ms);

// whether to enable /api/snapshot api
DECLARE_Bool(enable_snapshot_action);

// The timeout config for S3 write buffer allocation
DECLARE_mInt32(s3_writer_buffer_allocation_timeout_second);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
14 changes: 7 additions & 7 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ Status get_date_value_int(const rapidjson::Value& col, PrimitiveType type, bool
if (ok) {
// The local time zone can change by session variable `time_zone`
// We should use the user specified time zone, not the actual system local time zone.
success = dt_val.from_unixtime(std::chrono::system_clock::to_time_t(tp), time_zone);
success = true;
dt_val.from_unixtime(std::chrono::system_clock::to_time_t(tp), time_zone);
}
} else if (str_length == 19) {
// YYYY-MM-DDTHH:MM:SS
Expand All @@ -231,8 +232,8 @@ Status get_date_value_int(const rapidjson::Value& col, PrimitiveType type, bool
const bool ok =
cctz::parse("%Y-%m-%dT%H:%M:%S", str_date, cctz::utc_time_zone(), &tp);
if (ok) {
success = dt_val.from_unixtime(std::chrono::system_clock::to_time_t(tp),
time_zone);
success = true;
dt_val.from_unixtime(std::chrono::system_clock::to_time_t(tp), time_zone);
}
} else {
// YYYY-MM-DD HH:MM:SS
Expand All @@ -243,7 +244,8 @@ Status get_date_value_int(const rapidjson::Value& col, PrimitiveType type, bool
// string long like "1677895728000"
int64_t time_long = std::atol(str_date.c_str());
if (time_long > 0) {
success = dt_val.from_unixtime(time_long / 1000, time_zone);
success = true;
dt_val.from_unixtime(time_long / 1000, time_zone);
}
} else {
// YYYY-MM-DD or others
Expand All @@ -255,9 +257,7 @@ Status get_date_value_int(const rapidjson::Value& col, PrimitiveType type, bool
}

} else {
if (!dt_val.from_unixtime(col.GetInt64() / 1000, time_zone)) {
RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type);
}
dt_val.from_unixtime(col.GetInt64() / 1000, time_zone);
}
if constexpr (is_datetime_v1) {
if (type == TYPE_DATE) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ class ColumnValueRange {
condition.__set_condition_op("match_all");
} else if (value.first == MatchType::MATCH_PHRASE) {
condition.__set_condition_op("match_phrase");
} else if (value.first == MatchType::MATCH_PHRASE_PREFIX) {
condition.__set_condition_op("match_phrase_prefix");
} else if (value.first == MatchType::MATCH_REGEXP) {
condition.__set_condition_op("match_regexp");
} else if (value.first == MatchType::MATCH_ELEMENT_EQ) {
condition.__set_condition_op("match_element_eq");
} else if (value.first == MatchType::MATCH_ELEMENT_LT) {
Expand Down
17 changes: 16 additions & 1 deletion be/src/exec/olap_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ enum class MatchType {
MATCH_ELEMENT_GT = 5,
MATCH_ELEMENT_LE = 6,
MATCH_ELEMENT_GE = 7,
MATCH_PHRASE_PREFIX = 8,
MATCH_REGEXP = 9,
};

inline MatchType to_match_type(TExprOpcode::type type) {
Expand All @@ -183,6 +185,12 @@ inline MatchType to_match_type(TExprOpcode::type type) {
case TExprOpcode::type::MATCH_PHRASE:
return MatchType::MATCH_PHRASE;
break;
case TExprOpcode::type::MATCH_PHRASE_PREFIX:
return MatchType::MATCH_PHRASE_PREFIX;
break;
case TExprOpcode::type::MATCH_REGEXP:
return MatchType::MATCH_REGEXP;
break;
case TExprOpcode::type::MATCH_ELEMENT_EQ:
return MatchType::MATCH_ELEMENT_EQ;
break;
Expand Down Expand Up @@ -212,6 +220,10 @@ inline MatchType to_match_type(const std::string& condition_op) {
return MatchType::MATCH_ALL;
} else if (condition_op.compare("match_phrase") == 0) {
return MatchType::MATCH_PHRASE;
} else if (condition_op.compare("match_phrase_prefix") == 0) {
return MatchType::MATCH_PHRASE_PREFIX;
} else if (condition_op.compare("match_regexp") == 0) {
return MatchType::MATCH_REGEXP;
} else if (condition_op.compare("match_element_eq") == 0) {
return MatchType::MATCH_ELEMENT_EQ;
} else if (condition_op.compare("match_element_lt") == 0) {
Expand All @@ -229,6 +241,8 @@ inline MatchType to_match_type(const std::string& condition_op) {
inline bool is_match_condition(const std::string& op) {
if (0 == strcasecmp(op.c_str(), "match_any") || 0 == strcasecmp(op.c_str(), "match_all") ||
0 == strcasecmp(op.c_str(), "match_phrase") ||
0 == strcasecmp(op.c_str(), "match_phrase_prefix") ||
0 == strcasecmp(op.c_str(), "match_regexp") ||
0 == strcasecmp(op.c_str(), "match_element_eq") ||
0 == strcasecmp(op.c_str(), "match_element_lt") ||
0 == strcasecmp(op.c_str(), "match_element_gt") ||
Expand All @@ -241,7 +255,8 @@ inline bool is_match_condition(const std::string& op) {

inline bool is_match_operator(const TExprOpcode::type& op_type) {
return TExprOpcode::MATCH_ANY == op_type || TExprOpcode::MATCH_ALL == op_type ||
TExprOpcode::MATCH_PHRASE == op_type || TExprOpcode::MATCH_ELEMENT_EQ == op_type ||
TExprOpcode::MATCH_PHRASE == op_type || TExprOpcode::MATCH_PHRASE_PREFIX == op_type ||
TExprOpcode::MATCH_REGEXP == op_type || TExprOpcode::MATCH_ELEMENT_EQ == op_type ||
TExprOpcode::MATCH_ELEMENT_LT == op_type || TExprOpcode::MATCH_ELEMENT_GT == op_type ||
TExprOpcode::MATCH_ELEMENT_LE == op_type || TExprOpcode::MATCH_ELEMENT_GE == op_type;
}
Expand Down
37 changes: 19 additions & 18 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,20 +387,15 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz
return Status::OK();
}

// the condition variable would wait at most 10 seconds
// otherwise it would quit the procedure and treat it
// as one time out error status and would make the load
// task failed
constexpr static int WAIT_TIME_OUT_MS = 10000;

// there exists occasions where the buffer is already closed but
// some prior tasks are still queued in thread pool, so we have to check whether
// the buffer is closed each time the condition variable is notified.
void PrefetchBuffer::reset_offset(size_t offset) {
{
std::unique_lock lck {_lock};
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
[this]() { return _buffer_status != BufferStatus::PENDING; })) {
if (!_prefetched.wait_for(
lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() { return _buffer_status != BufferStatus::PENDING; })) {
_prefetch_status = Status::TimedOut("time out when reset prefetch buffer");
return;
}
Expand All @@ -427,10 +422,12 @@ void PrefetchBuffer::reset_offset(size_t offset) {
void PrefetchBuffer::prefetch_buffer() {
{
std::unique_lock lck {_lock};
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() {
return _buffer_status == BufferStatus::RESET ||
_buffer_status == BufferStatus::CLOSED;
})) {
if (!_prefetched.wait_for(
lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() {
return _buffer_status == BufferStatus::RESET ||
_buffer_status == BufferStatus::CLOSED;
})) {
_prefetch_status = Status::TimedOut("time out when invoking prefetch buffer");
return;
}
Expand Down Expand Up @@ -470,7 +467,8 @@ void PrefetchBuffer::prefetch_buffer() {
_statis.prefetch_request_io += 1;
_statis.prefetch_request_bytes += _len;
std::unique_lock lck {_lock};
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
if (!_prefetched.wait_for(lck,
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() { return _buffer_status == BufferStatus::PENDING; })) {
_prefetch_status = Status::TimedOut("time out when invoking prefetch buffer");
return;
Expand Down Expand Up @@ -555,10 +553,12 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
{
std::unique_lock lck {_lock};
// buffer must be prefetched or it's closed
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() {
return _buffer_status == BufferStatus::PREFETCHED ||
_buffer_status == BufferStatus::CLOSED;
})) {
if (!_prefetched.wait_for(
lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() {
return _buffer_status == BufferStatus::PREFETCHED ||
_buffer_status == BufferStatus::CLOSED;
})) {
_prefetch_status = Status::TimedOut("time out when read prefetch buffer");
return _prefetch_status;
}
Expand Down Expand Up @@ -594,7 +594,8 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
void PrefetchBuffer::close() {
std::unique_lock lck {_lock};
// in case _reader still tries to write to the buf after we close the buffer
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
if (!_prefetched.wait_for(lck,
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() { return _buffer_status != BufferStatus::PENDING; })) {
_prefetch_status = Status::TimedOut("time out when close prefetch buffer");
return;
Expand Down
18 changes: 14 additions & 4 deletions be/src/io/fs/s3_file_write_bufferpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "s3_file_write_bufferpool.h"

#include <chrono>
#include <cstring>

#include "common/config.h"
Expand All @@ -40,7 +41,7 @@ void S3FileBuffer::on_finished() {
// when there is memory preserved, directly write data to buf
// TODO:(AlexYue): write to file cache otherwise, then we'll wait for free buffer
// and to rob it
void S3FileBuffer::append_data(const Slice& data) {
Status S3FileBuffer::append_data(const Slice& data) {
Defer defer {[&] { _size += data.get_size(); }};
while (true) {
// if buf is not empty, it means there is memory preserved for this buf
Expand All @@ -50,9 +51,14 @@ void S3FileBuffer::append_data(const Slice& data) {
} else {
// wait allocate buffer pool
auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
if (tmp->get_size() == 0) {
return Status::InternalError("Failed to allocate s3 writer buffer for {} seconds",
config::s3_writer_buffer_allocation_timeout_second);
}
rob_buffer(tmp);
}
}
return Status::OK();
}

void S3FileBuffer::submit() {
Expand Down Expand Up @@ -81,13 +87,17 @@ void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t s3_write

std::shared_ptr<S3FileBuffer> S3FileBufferPool::allocate(bool reserve) {
std::shared_ptr<S3FileBuffer> buf = std::make_shared<S3FileBuffer>(_thread_pool);
int64_t timeout = config::s3_writer_buffer_allocation_timeout_second;
// if need reserve then we must ensure return buf with memory preserved
if (reserve) {
{
std::unique_lock<std::mutex> lck {_lock};
_cv.wait(lck, [this]() { return !_free_raw_buffers.empty(); });
buf->reserve_buffer(_free_raw_buffers.front());
_free_raw_buffers.pop_front();
_cv.wait_for(lck, std::chrono::seconds(timeout),
[this]() { return !_free_raw_buffers.empty(); });
if (!_free_raw_buffers.empty()) {
buf->reserve_buffer(_free_raw_buffers.front());
_free_raw_buffers.pop_front();
}
}
return buf;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/s3_file_write_bufferpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {

// append data into the memory buffer inside
// or into the file cache if the buffer has no memory buffer
void append_data(const Slice& data);
Status append_data(const Slice& data);
// upload to S3 and file cache in async threadpool
void submit();
// set the callback to upload to S3 file
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {

// if the buffer has memory buf inside, the data would be written into memory first then S3 then file cache
// it would be written to cache then S3 if the buffer doesn't have memory preserved
_pending_buf->append_data(Slice {data[i].get_data() + pos, data_size_to_append});
RETURN_IF_ERROR(_pending_buf->append_data(
Slice {data[i].get_data() + pos, data_size_to_append}));

// if it's the last part, it could be less than 5MB, or it must
// satisfy that the size is larger than or euqal to 5MB
Expand Down
8 changes: 7 additions & 1 deletion be/src/olap/match_predicate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ InvertedIndexQueryType MatchPredicate::_to_inverted_index_query_type(MatchType m
case MatchType::MATCH_PHRASE:
ret = InvertedIndexQueryType::MATCH_PHRASE_QUERY;
break;
case MatchType::MATCH_PHRASE_PREFIX:
ret = InvertedIndexQueryType::MATCH_PHRASE_PREFIX_QUERY;
break;
case MatchType::MATCH_REGEXP:
ret = InvertedIndexQueryType::MATCH_REGEXP_QUERY;
break;
case MatchType::MATCH_ELEMENT_EQ:
ret = InvertedIndexQueryType::EQUAL_QUERY;
break;
Expand All @@ -117,7 +123,7 @@ InvertedIndexQueryType MatchPredicate::_to_inverted_index_query_type(MatchType m
}

bool MatchPredicate::_skip_evaluate(InvertedIndexIterator* iterator) const {
if (_match_type == MatchType::MATCH_PHRASE &&
if ((_match_type == MatchType::MATCH_PHRASE || _match_type == MatchType::MATCH_PHRASE_PREFIX) &&
iterator->get_inverted_index_reader_type() == InvertedIndexReaderType::FULLTEXT &&
get_parser_phrase_support_string_from_properties(iterator->get_index_properties()) ==
INVERTED_INDEX_PARSER_PHRASE_SUPPORT_NO) {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) {
_reader_context.remaining_conjunct_roots = read_params.remaining_conjunct_roots;
_reader_context.common_expr_ctxs_push_down = read_params.common_expr_ctxs_push_down;
_reader_context.output_columns = &read_params.output_columns;
_reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt;

return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ ConjunctionQuery::~ConjunctionQuery() {
}

void ConjunctionQuery::add(const std::wstring& field_name, const std::vector<std::string>& terms) {
if (terms.size() < 1) {
_CLTHROWA(CL_ERR_IllegalArgument, "ConjunctionQuery::add: terms.size() < 1");
if (terms.empty()) {
_CLTHROWA(CL_ERR_IllegalArgument, "ConjunctionQuery::add: terms empty");
}

std::vector<TermIterator> iterators;
for (auto& term : terms) {
for (const auto& term : terms) {
std::wstring ws_term = StringUtil::string_to_wstring(term);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
_terms.push_back(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,25 @@ namespace doris {
DisjunctionQuery::DisjunctionQuery(IndexReader* reader) : _reader(reader) {}

DisjunctionQuery::~DisjunctionQuery() {
for (auto& term : _terms) {
if (term) {
_CLDELETE(term);
}
}
for (auto& term_doc : _term_docs) {
if (term_doc) {
_CLDELETE(term_doc);
}
}
for (auto& term : _terms) {
if (term) {
_CLDELETE(term);
}
}
}

void DisjunctionQuery::add(const std::wstring& field_name, const std::vector<std::string>& terms) {
if (terms.size() < 1) {
_CLTHROWA(CL_ERR_IllegalArgument, "ConjunctionQuery::add: terms.size() < 1");
if (terms.empty()) {
_CLTHROWA(CL_ERR_IllegalArgument, "DisjunctionQuery::add: terms empty");
}

for (auto& term : terms) {
for (const auto& term : terms) {
std::wstring ws_term = StringUtil::string_to_wstring(term);
_wsterms.emplace_back(&ws_term);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
_terms.push_back(t);
TermDocs* term_doc = _reader->termDocs(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class DisjunctionQuery {

private:
IndexReader* _reader = nullptr;
std::vector<std::wstring*> _wsterms;
std::vector<Term*> _terms;
std::vector<TermDocs*> _term_docs;
std::vector<TermIterator> _term_iterators;
Expand Down
Loading

0 comments on commit d90f171

Please sign in to comment.