Skip to content

Commit

Permalink
Merge branch 'master' into var-perf
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon authored May 8, 2024
2 parents e1cf319 + 6f4738c commit 25640de
Show file tree
Hide file tree
Showing 2,042 changed files with 232,285 additions and 32,938 deletions.
2 changes: 1 addition & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ github:
- shuke987
- wm1581066
- KassieZ
- gavinchou
- yujun777
- gavinchou

notifications:
pullrequests_status: [email protected]
Expand Down
2 changes: 2 additions & 0 deletions .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ Checks: |
-readability-inconsistent-declaration-parameter-name,
-readability-isolate-declaration,
-readability-named-parameter,
-readability-avoid-const-params-in-decls,
-readability-convert-member-functions-to-static,
portability-simd-intrinsics,
performance-type-promotion-in-math-fn,
performance-faster-string-find,
Expand Down
3 changes: 1 addition & 2 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,7 @@ if (COMPILER_CLANG)
-Wunused-member-function
-Wunused-macros
-Wconversion)
add_compile_options(-Wno-vla-extension
-Wno-gnu-statement-expression
add_compile_options( -Wno-gnu-statement-expression
-Wno-implicit-float-conversion
-Wno-implicit-int-conversion
-Wno-sign-conversion
Expand Down
8 changes: 7 additions & 1 deletion be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) {
_workers[TTaskType::GC_BINLOG] = std::make_unique<TaskWorkerPool>(
"GC_BINLOG", 1, [&engine](auto&& task) { return gc_binlog_callback(engine, task); });

_workers[TTaskType::CLEAN_TRASH] = std::make_unique<TaskWorkerPool>(
"CLEAN_TRASH", 1, [&engine](auto&& task) {return clean_trash_callback(engine, task); });

_workers[TTaskType::UPDATE_VISIBLE_VERSION] = std::make_unique<TaskWorkerPool>(
"UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return visible_version_callback(engine, task); });

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds, [&master_info = _master_info] { report_task_callback(master_info); }));

Expand All @@ -200,7 +206,7 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_

_workers[TTaskType::CALCULATE_DELETE_BITMAP] = std::make_unique<TaskWorkerPool>(
"CALC_DBM_TASK", config::calc_delete_bitmap_worker_count,
[&engine](auto&& task) { return calc_delete_bimtap_callback(engine, task); });
[&engine](auto&& task) { return calc_delete_bitmap_callback(engine, task); });

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds,
Expand Down
5 changes: 4 additions & 1 deletion be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ class BeExecVersionManager {
* f. shrink some function's nullable mode.
* g. do local merge of remote runtime filter
* h. "now": ALWAYS_NOT_NULLABLE -> DEPEND_ON_ARGUMENTS
*
* 5: start from doris 2.1.4
* a. change the impl of percentile
*/
constexpr inline int BeExecVersionManager::max_be_exec_version = 4;
constexpr inline int BeExecVersionManager::max_be_exec_version = 5;
constexpr inline int BeExecVersionManager::min_be_exec_version = 0;

/// functional
Expand Down
2 changes: 2 additions & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "runtime/heartbeat_flags.h"
#include "service/backend_options.h"
#include "util/debug_util.h"
#include "util/mem_info.h"
#include "util/network_util.h"
#include "util/thrift_server.h"
#include "util/time.h"
Expand Down Expand Up @@ -88,6 +89,7 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
get_fragment_executing_count());
heartbeat_result.backend_info.__set_fragment_last_active_time(
get_fragment_last_active_time());
heartbeat_result.backend_info.__set_be_mem(MemInfo::physical_mem());
}
watch.stop();
if (watch.elapsed_time() > 1000L * 1000L * 1000L) {
Expand Down
48 changes: 42 additions & 6 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
#include "runtime/fragment_mgr.h"
#include "runtime/snapshot_loader.h"
#include "service/backend_options.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/random.h"
Expand Down Expand Up @@ -184,11 +185,17 @@ void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
config::memory_limitation_per_thread_for_schema_change_bytes));
engine.memory_limitation_bytes_per_thread_for_schema_change()));
SCOPED_ATTACH_TASK(mem_tracker);
DorisMetrics::instance()->create_rollup_requests_total->increment(1);
Status res = Status::OK();
try {
LOG_INFO("start {}", process_name)
.tag("signature", agent_task_req.signature)
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
.tag("new_tablet_id", new_tablet_id)
.tag("mem_limit",
engine.memory_limitation_bytes_per_thread_for_schema_change());
DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
SchemaChangeJob job(engine, agent_task_req.alter_tablet_req_v2,
std::to_string(agent_task_req.alter_tablet_req_v2.job_id));
Expand Down Expand Up @@ -253,11 +260,17 @@ void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& age
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
config::memory_limitation_per_thread_for_schema_change_bytes));
engine.memory_limitation_bytes_per_thread_for_schema_change()));
SCOPED_ATTACH_TASK(mem_tracker);
DorisMetrics::instance()->create_rollup_requests_total->increment(1);
Status res = Status::OK();
try {
LOG_INFO("start {}", process_name)
.tag("signature", agent_task_req.signature)
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
.tag("new_tablet_id", new_tablet_id)
.tag("mem_limit",
engine.memory_limitation_bytes_per_thread_for_schema_change());
DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
CloudSchemaChangeJob job(engine,
std::to_string(agent_task_req.alter_tablet_req_v2.job_id),
Expand Down Expand Up @@ -425,6 +438,7 @@ bvar::Adder<uint64_t> ALTER_count("task", "ALTER_TABLE");
bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE");
bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION");

void add_task_count(const TAgentTaskRequest& task, int n) {
// clang-format off
Expand All @@ -451,6 +465,7 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
ADD_TASK_COUNT(CLONE)
ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
ADD_TASK_COUNT(GC_BINLOG)
ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
#undef ADD_TASK_COUNT
case TTaskType::REALTIME_PUSH:
case TTaskType::PUSH:
Expand Down Expand Up @@ -521,7 +536,7 @@ Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
}

PriorTaskWorkerPool::PriorTaskWorkerPool(
std::string_view name, int normal_worker_count, int high_prior_worker_conut,
std::string_view name, int normal_worker_count, int high_prior_worker_count,
std::function<void(const TAgentTaskRequest& task)> callback)
: _callback(std::move(callback)) {
auto st = ThreadPoolBuilder(fmt::format("TaskWP_.{}", name))
Expand All @@ -534,8 +549,8 @@ PriorTaskWorkerPool::PriorTaskWorkerPool(
CHECK(st.ok()) << name << ": " << st;

st = ThreadPoolBuilder(fmt::format("HighPriorPool.{}", name))
.set_min_threads(high_prior_worker_conut)
.set_max_threads(high_prior_worker_conut)
.set_min_threads(high_prior_worker_count)
.set_max_threads(high_prior_worker_count)
.build(&_high_prior_pool);
CHECK(st.ok()) << name << ": " << st;

Expand Down Expand Up @@ -1076,6 +1091,11 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
return;
}

std::map<int64_t, int64_t> partitions_version;
engine.tablet_manager()->get_partitions_visible_version(&partitions_version);
request.__set_partitions_version(std::move(partitions_version));

int64_t max_compaction_score =
std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
DorisMetrics::instance()->tablet_base_max_compaction_score->value());
Expand Down Expand Up @@ -1364,6 +1384,7 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr
.region = param.s3_storage_param.region,
.ak = param.s3_storage_param.ak,
.sk = param.s3_storage_param.sk,
.token = param.s3_storage_param.token,
.max_connections = param.s3_storage_param.max_conn,
.request_timeout_ms = param.s3_storage_param.request_timeout_ms,
.connect_timeout_ms = param.s3_storage_param.conn_timeout_ms,
Expand All @@ -1383,6 +1404,7 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr
S3ClientConf conf {
.ak = param.s3_storage_param.ak,
.sk = param.s3_storage_param.sk,
.token = param.s3_storage_param.token,
};
st = client->reset(conf);
fs = std::move(existed_fs);
Expand Down Expand Up @@ -1924,6 +1946,12 @@ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
engine.gc_binlogs(gc_tablet_infos);
}

void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
const TVisibleVersionReq& visible_version_req = req.visible_version_req;
engine.tablet_manager()->update_partitions_visible_version(
visible_version_req.partition_version);
}

void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,
const TAgentTaskRequest& req) {
const auto& clone_req = req.clone_req;
Expand Down Expand Up @@ -1997,7 +2025,7 @@ void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequ
remove_task_info(req.task_type, req.signature);
}

void calc_delete_bimtap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
std::vector<TTabletId> error_tablet_ids;
std::vector<TTabletId> succ_tablet_ids;
Status status;
Expand Down Expand Up @@ -2029,4 +2057,12 @@ void calc_delete_bimtap_callback(CloudStorageEngine& engine, const TAgentTaskReq
remove_task_info(req.task_type, req.signature);
}

void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
LOG(INFO) << "clean trash start";
DBUG_EXECUTE_IF("clean_trash_callback_sleep", { sleep(100); })
static_cast<void>(engine.start_trash_sweep(nullptr, true));
static_cast<void>(engine.notify_listener("REPORT_DISK_STATE"));
LOG(INFO) << "clean trash finish";
}

} // namespace doris
8 changes: 6 additions & 2 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class PublishVersionWorkerPool final : public TaskWorkerPool {

class PriorTaskWorkerPool final : public TaskWorkerPoolIf {
public:
PriorTaskWorkerPool(std::string_view name, int normal_worker_count, int high_prior_worker_conut,
PriorTaskWorkerPool(std::string_view name, int normal_worker_count, int high_prior_worker_count,
std::function<void(const TAgentTaskRequest& task)> callback);

~PriorTaskWorkerPool() override;
Expand Down Expand Up @@ -174,6 +174,10 @@ void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequ

void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void report_task_callback(const TMasterInfo& master_info);

void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info);
Expand All @@ -182,6 +186,6 @@ void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo& master_

void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info);

void calc_delete_bimtap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req);
void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req);

} // namespace doris
6 changes: 4 additions & 2 deletions be/src/agent/topic_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_reques
// eg, update workload info may delay other listener, then we need add a thread here
// to handle_topic_info asynchronous
std::shared_lock lock(_listener_mtx);
LOG(INFO) << "begin handle topic info";
LOG(INFO) << "[topic_publish]begin handle topic info";
for (auto& listener_pair : _registered_listeners) {
if (topic_request.topic_map.find(listener_pair.first) != topic_request.topic_map.end()) {
LOG(INFO) << "[topic_publish]begin handle topic " << listener_pair.first
<< ", size=" << topic_request.topic_map.at(listener_pair.first).size();
listener_pair.second->handle_topic_info(
topic_request.topic_map.at(listener_pair.first));
LOG(INFO) << "handle topic " << listener_pair.first << " successfully";
LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first;
}
}
}
Expand Down
29 changes: 22 additions & 7 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,27 @@ namespace doris {

void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topic_info_list) {
std::set<uint64_t> current_wg_ids;
bool is_set_workload_group_info = false;
int list_size = topic_info_list.size();
for (const TopicInfo& topic_info : topic_info_list) {
if (!topic_info.__isset.workload_group_info) {
continue;
}
is_set_workload_group_info = true;

// 1 parse topicinfo to group info
// 1 parse topic info to group info
WorkloadGroupInfo workload_group_info;
Status ret = WorkloadGroupInfo::parse_topic_info(topic_info.workload_group_info,
&workload_group_info);
// it means FE has this wg, but may parse failed, so we should not delete it.
if (workload_group_info.id != 0) {
current_wg_ids.insert(workload_group_info.id);
}
if (!ret.ok()) {
LOG(INFO) << "parse topic info failed, tg_id=" << workload_group_info.id
<< ", reason:" << ret.to_string();
LOG(INFO) << "[topic_publish_wg]parse topic info failed, tg_id="
<< workload_group_info.id << ", reason:" << ret.to_string();
continue;
}
current_wg_ids.insert(workload_group_info.id);

// 2 update workload group
auto tg =
Expand All @@ -53,16 +59,25 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
// 4 create and update task scheduler
tg->upsert_task_scheduler(&workload_group_info, _exec_env);

LOG(INFO) << "update workload group finish, tg info=" << tg->debug_string()
<< ", enable_cpu_hard_limit="
LOG(INFO) << "[topic_publish_wg]update workload group finish, tg info="
<< tg->debug_string() << ", enable_cpu_hard_limit="
<< (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false")
<< ", cgroup cpu_shares=" << workload_group_info.cgroup_cpu_shares
<< ", cgroup cpu_hard_limit=" << workload_group_info.cgroup_cpu_hard_limit
<< ", enable_cgroup_cpu_soft_limit="
<< (config::enable_cgroup_cpu_soft_limit ? "true" : "false")
<< ", cgroup home path=" << config::doris_cgroup_cpu_path;
<< ", cgroup home path=" << config::doris_cgroup_cpu_path
<< ", list size=" << list_size;
}

// NOTE(wb) when is_set_workload_group_info=false, it means FE send a empty workload group list
// this should not happens, because FE should has at least one normal group.
// just log it if that happens
if (!is_set_workload_group_info) {
LOG(INFO) << "[topic_publish_wg]unexpected error happens, no workload group info is "
"set, list size="
<< list_size;
}
_exec_env->workload_group_mgr()->delete_workload_group_by_ids(current_wg_ids);
}
} // namespace doris
Loading

0 comments on commit 25640de

Please sign in to comment.