Skip to content

Commit

Permalink
Merge branch 'branch-2.1' into fix-2.1-0522
Browse files Browse the repository at this point in the history
  • Loading branch information
cambyzju authored May 22, 2024
2 parents 50b1af4 + 05cedfc commit b69a48f
Show file tree
Hide file tree
Showing 449 changed files with 16,850 additions and 4,585 deletions.
1 change: 1 addition & 0 deletions be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,6 @@ constexpr inline int BeExecVersionManager::min_be_exec_version = 0;
/// functional
constexpr inline int BITMAP_SERDE = 3;
constexpr inline int USE_NEW_SERDE = 4; // release on DORIS version 2.1
constexpr inline int OLD_WAL_SERDE = 3; // use to solve compatibility issues, see pr #32299

} // namespace doris
3 changes: 2 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
#include "olap/txn_manager.h"
#include "olap/utils.h"
#include "runtime/exec_env.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/snapshot_loader.h"
#include "service/backend_options.h"
#include "util/debug_points.h"
Expand Down Expand Up @@ -1563,7 +1564,7 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
.error(status);
} else {
if (!config::disable_auto_compaction &&
!MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
!GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
for (auto [tablet_id, _] : succ_tablets) {
TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
if (tablet != nullptr) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> b
}
return true;
});
DEFINE_Int32(remote_split_source_batch_size, "1024");
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
// number of olap scanner thread pool queue size
DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400");
Expand Down Expand Up @@ -601,7 +602,7 @@ DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");
DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");

// sync tablet_meta when modifying meta
DEFINE_mBool(sync_tablet_meta, "true");
DEFINE_mBool(sync_tablet_meta, "false");

// default thrift rpc timeout ms
DEFINE_mInt32(thrift_rpc_timeout_ms, "60000");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms);
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
DECLARE_mInt32(doris_scanner_thread_pool_thread_num);
// number of batch size to fetch the remote split source
DECLARE_mInt32(remote_split_source_batch_size);
// max number of remote scanner thread pool size
// if equal to -1, value is std::max(512, CpuInfo::num_cores() * 10)
DECLARE_Int32(doris_max_remote_scanner_thread_pool_thread_num);
Expand Down
25 changes: 14 additions & 11 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/runtime_query_statistics_mgr.h"
Expand Down Expand Up @@ -191,7 +192,7 @@ void Daemon::memory_maintenance_thread() {
// Refresh process memory metrics.
doris::PerfCounters::refresh_proc_status();
doris::MemInfo::refresh_proc_meminfo();
doris::MemInfo::refresh_proc_mem_no_allocator_cache();
doris::GlobalMemoryArbitrator::refresh_vm_rss_sub_allocator_cache();

// Update and print memory stat when the memory changes by 256M.
if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) {
Expand All @@ -212,7 +213,7 @@ void Daemon::memory_maintenance_thread() {

ExecEnv::GetInstance()->brpc_iobuf_block_memory_tracker()->set_consumption(
butil::IOBuf::block_memory());
LOG(INFO) << MemTrackerLimiter::
LOG(INFO) << doris::GlobalMemoryArbitrator::
process_mem_log_str(); // print mem log when memory state by 256M
}
}
Expand All @@ -229,33 +230,35 @@ void Daemon::memory_gc_thread() {
continue;
}
auto sys_mem_available = doris::MemInfo::sys_mem_available();
auto proc_mem_no_allocator_cache = doris::MemInfo::proc_mem_no_allocator_cache();
auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();

// GC excess memory for resource groups that not enable overcommit
auto tg_free_mem = doris::MemInfo::tg_not_enable_overcommit_group_gc();
auto tg_free_mem = doris::MemInfo::tg_disable_overcommit_group_gc();
sys_mem_available += tg_free_mem;
proc_mem_no_allocator_cache -= tg_free_mem;
process_memory_usage -= tg_free_mem;

if (memory_full_gc_sleep_time_ms <= 0 &&
(sys_mem_available < doris::MemInfo::sys_mem_available_low_water_mark() ||
proc_mem_no_allocator_cache >= doris::MemInfo::mem_limit())) {
process_memory_usage >= doris::MemInfo::mem_limit())) {
// No longer full gc and minor gc during sleep.
memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms;
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.",
MemTrackerLimiter::process_limit_exceeded_errmsg_str());
LOG(INFO) << fmt::format(
"[MemoryGC] start full GC, {}.",
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
doris::MemTrackerLimiter::print_log_process_usage();
if (doris::MemInfo::process_full_gc()) {
// If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc.
doris::MemTrackerLimiter::enable_print_log_process_usage();
}
} else if (memory_minor_gc_sleep_time_ms <= 0 &&
(sys_mem_available < doris::MemInfo::sys_mem_available_warning_water_mark() ||
proc_mem_no_allocator_cache >= doris::MemInfo::soft_mem_limit())) {
process_memory_usage >= doris::MemInfo::soft_mem_limit())) {
// No minor gc during sleep, but full gc is possible.
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.",
MemTrackerLimiter::process_soft_limit_exceeded_errmsg_str());
LOG(INFO) << fmt::format(
"[MemoryGC] start minor GC, {}.",
doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
doris::MemTrackerLimiter::print_log_process_usage();
if (doris::MemInfo::process_minor_gc()) {
doris::MemTrackerLimiter::enable_print_log_process_usage();
Expand Down
51 changes: 51 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,50 @@ class [[nodiscard]] Status {
}
};

// There are many thread using status to indicate the cancel state, one thread may update it and
// the other thread will read it. Status is not thread safe, for example, if one thread is update it
// and another thread is call to_string method, it may core, because the _err_msg is an unique ptr and
// it is deconstructed during copy method.
// And also we could not use lock, because we need get status frequently to check if it is cancelled.
// The defaule value is ok.
class AtomicStatus {
public:
AtomicStatus() : error_st_(Status::OK()) {}

bool ok() const { return error_code_.load() == 0; }

bool update(const Status& new_status) {
// If new status is normal, or the old status is abnormal, then not need update
if (new_status.ok() || error_code_.load() != 0) {
return false;
}
int16_t expected_error_code = 0;
if (error_code_.compare_exchange_strong(expected_error_code, new_status.code(),
std::memory_order_acq_rel)) {
// lock here for read status, to avoid core during return error_st_
std::lock_guard l(mutex_);
error_st_ = new_status;
return true;
} else {
return false;
}
}

// will copy a new status object to avoid concurrency
Status status() {
std::lock_guard l(mutex_);
return error_st_;
}

private:
std::atomic_int16_t error_code_ = 0;
Status error_st_;
std::mutex mutex_;

AtomicStatus(const AtomicStatus&) = delete;
void operator=(const AtomicStatus&) = delete;
};

inline std::ostream& operator<<(std::ostream& ostr, const Status& status) {
ostr << '[' << status.code_as_string() << ']';
ostr << status.msg();
Expand Down Expand Up @@ -575,6 +619,13 @@ inline std::string Status::to_string_no_stack() const {
} \
} while (false)

#define PROPAGATE_FALSE(stmt) \
do { \
if (UNLIKELY(!static_cast<bool>(stmt))) { \
return false; \
} \
} while (false)

#define THROW_IF_ERROR(stmt) \
do { \
Status _status_ = (stmt); \
Expand Down
27 changes: 22 additions & 5 deletions be/src/exec/decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
// See:
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
while (remaining_input_size > 0) {
if (remaining_input_size < 4) {
*more_input_bytes = 4 - remaining_input_size;
break;
}
// Read uncompressed size
uint32_t uncompressed_block_len = Decompressor::_read_int32(src);
int64_t remaining_output_len = output_max_len - uncompressed_total_len;
Expand All @@ -566,12 +570,24 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
break;
}

if (uncompressed_block_len == 0) {
remaining_input_size -= sizeof(uint32_t);
break;
}

if (remaining_input_size <= 2 * sizeof(uint32_t)) {
// The remaining input size should be larger then <uncompressed size><compressed size><compressed data>
// +1 means we need at least 1 bytes of compressed data.
*more_input_bytes = 2 * sizeof(uint32_t) + 1 - remaining_input_size;
break;
}

// Read compressed size
size_t tmp_src_size = remaining_input_size - sizeof(uint32_t);
size_t tmp_remaining_size = remaining_input_size - 2 * sizeof(uint32_t);
size_t compressed_len = _read_int32(src + sizeof(uint32_t));
if (compressed_len == 0 || compressed_len > tmp_src_size) {
if (compressed_len > tmp_remaining_size) {
// Need more input data
*more_input_bytes = compressed_len - tmp_src_size;
*more_input_bytes = compressed_len - tmp_remaining_size;
break;
}

Expand All @@ -590,8 +606,9 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
// Decompress
if (!snappy::RawUncompress(reinterpret_cast<const char*>(src), compressed_len,
reinterpret_cast<char*>(output))) {
return Status::InternalError("snappy block decompress failed. uncompressed_len: {}",
uncompressed_block_len);
return Status::InternalError(
"snappy block decompress failed. uncompressed_len: {}, compressed_len: {}",
uncompressed_block_len, compressed_len);
}

output += uncompressed_block_len;
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -702,14 +702,15 @@ bool ColumnValueRange<primitive_type>::convert_to_close_range(
bool is_empty = false;

if (!is_begin_include()) {
if (_low_value == TYPE_MIN) {
if (_low_value == TYPE_MAX) {
is_empty = true;
} else {
++_low_value;
}
}

if (!is_end_include()) {
if (_high_value == TYPE_MAX) {
if (_high_value == TYPE_MIN) {
is_empty = true;
} else {
--_high_value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ Status SchemaActiveQueriesScanner::get_next_block(vectorized::Block* block, bool

int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
mblock.add_rows(_active_query_block.get(), _row_idx, current_batch_rows);
RETURN_IF_ERROR(mblock.add_rows(_active_query_block.get(), _row_idx, current_batch_rows));
_row_idx += current_batch_rows;

*eos = _row_idx == _total_rows;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Status SchemaBackendActiveTasksScanner::get_next_block(vectorized::Block* block,

int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
mblock.add_rows(_task_stats_block.get(), _row_idx, current_batch_rows);
RETURN_IF_ERROR(mblock.add_rows(_task_stats_block.get(), _row_idx, current_batch_rows));
_row_idx += current_batch_rows;

*eos = _row_idx == _total_rows;
Expand Down
27 changes: 23 additions & 4 deletions be/src/exec/schema_scanner/schema_columns_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,9 @@ Status SchemaColumnsScanner::_fill_block_impl(vectorized::Block* block) {
{
int64_t srcs[columns_num];
for (int i = 0; i < columns_num; ++i) {
if (_desc_result.columns[i].columnDesc.__isset.columnPrecision) {
int data_type = _desc_result.columns[i].columnDesc.columnType;
if (_desc_result.columns[i].columnDesc.__isset.columnPrecision &&
data_type != TPrimitiveType::DATETIMEV2) {
srcs[i] = _desc_result.columns[i].columnDesc.columnPrecision;
datas[i] = srcs + i;
} else {
Expand All @@ -525,7 +527,9 @@ Status SchemaColumnsScanner::_fill_block_impl(vectorized::Block* block) {
{
int64_t srcs[columns_num];
for (int i = 0; i < columns_num; ++i) {
if (_desc_result.columns[i].columnDesc.__isset.columnScale) {
int data_type = _desc_result.columns[i].columnDesc.columnType;
if (_desc_result.columns[i].columnDesc.__isset.columnScale &&
data_type != TPrimitiveType::DATETIMEV2) {
srcs[i] = _desc_result.columns[i].columnDesc.columnScale;
datas[i] = srcs + i;
} else {
Expand All @@ -535,7 +539,20 @@ Status SchemaColumnsScanner::_fill_block_impl(vectorized::Block* block) {
RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, datas));
}
// DATETIME_PRECISION
{ RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, null_datas)); }
{
std::vector<int64_t> srcs(columns_num);
for (int i = 0; i < columns_num; ++i) {
int data_type = _desc_result.columns[i].columnDesc.columnType;
if (_desc_result.columns[i].columnDesc.__isset.columnScale &&
data_type == TPrimitiveType::DATETIMEV2) {
srcs[i] = _desc_result.columns[i].columnDesc.columnScale;
datas[i] = srcs.data() + i;
} else {
datas[i] = nullptr;
}
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, datas));
}
// CHARACTER_SET_NAME
{ RETURN_IF_ERROR(fill_dest_column_for_range(block, 13, null_datas)); }
// COLLATION_NAME
Expand Down Expand Up @@ -605,7 +622,9 @@ Status SchemaColumnsScanner::_fill_block_impl(vectorized::Block* block) {
{
int64_t srcs[columns_num];
for (int i = 0; i < columns_num; ++i) {
if (_desc_result.columns[i].columnDesc.__isset.columnScale) {
int data_type = _desc_result.columns[i].columnDesc.columnType;
if (_desc_result.columns[i].columnDesc.__isset.columnScale &&
data_type != TPrimitiveType::DATETIMEV2) {
srcs[i] = _desc_result.columns[i].columnDesc.columnScale;
datas[i] = srcs + i;
} else {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_routine_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ Status SchemaRoutinesScanner::get_next_block(vectorized::Block* block, bool* eos

int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
mblock.add_rows(_routines_block.get(), _row_idx, current_batch_rows);
RETURN_IF_ERROR(mblock.add_rows(_routines_block.get(), _row_idx, current_batch_rows));
_row_idx += current_batch_rows;

*eos = _row_idx == _total_rows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ Status SchemaWorkloadGroupsScanner::get_next_block(vectorized::Block* block, boo

int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
mblock.add_rows(_workload_groups_block.get(), _row_idx, current_batch_rows);
RETURN_IF_ERROR(mblock.add_rows(_workload_groups_block.get(), _row_idx, current_batch_rows));
_row_idx += current_batch_rows;

*eos = _row_idx == _total_rows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ Status SchemaWorkloadSchedulePolicyScanner::get_next_block(vectorized::Block* bl

int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
mblock.add_rows(_block.get(), _row_idx, current_batch_rows);
RETURN_IF_ERROR(mblock.add_rows(_block.get(), _row_idx, current_batch_rows));
_row_idx += current_batch_rows;

*eos = _row_idx == _total_rows;
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_is_strict_mode = pschema.is_strict_mode();
if (_is_partial_update) {
_auto_increment_column = pschema.auto_increment_column();
_auto_increment_column_unique_id = pschema.auto_increment_column_unique_id();
}
_timestamp_ms = pschema.timestamp_ms();
_timezone = pschema.timezone();
Expand Down Expand Up @@ -186,6 +187,7 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
}
if (_is_partial_update) {
_auto_increment_column = tschema.auto_increment_column;
_auto_increment_column_unique_id = tschema.auto_increment_column_unique_id;
}

for (const auto& tcolumn : tschema.partial_update_input_columns) {
Expand Down Expand Up @@ -258,6 +260,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_partial_update(_is_partial_update);
pschema->set_is_strict_mode(_is_strict_mode);
pschema->set_auto_increment_column(_auto_increment_column);
pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
pschema->set_timestamp_ms(_timestamp_ms);
pschema->set_timezone(_timezone);
for (auto col : _partial_update_input_columns) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class OlapTableSchemaParam {
return _partial_update_input_columns;
}
std::string auto_increment_coulumn() const { return _auto_increment_column; }
int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; }
void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; }
int64_t timestamp_ms() const { return _timestamp_ms; }
void set_timezone(std::string timezone) { _timezone = timezone; }
Expand All @@ -113,6 +114,7 @@ class OlapTableSchemaParam {
std::set<std::string> _partial_update_input_columns;
bool _is_strict_mode = false;
std::string _auto_increment_column;
int32_t _auto_increment_column_unique_id;
int64_t _timestamp_ms = 0;
std::string _timezone;
};
Expand Down
Loading

0 comments on commit b69a48f

Please sign in to comment.