-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PipelineX](improvement) Prepare tasks in parallel #32789
Changes from all commits
8ba05ba
dbf9ad8
ee9a155
6047d6d
1f7840c
e7c9d21
e3fcc68
ac58270
a018575
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -118,9 +118,13 @@ PipelineXFragmentContext::~PipelineXFragmentContext() { | |
auto st = _query_ctx->exec_status(); | ||
_tasks.clear(); | ||
if (!_task_runtime_states.empty()) { | ||
for (auto& runtime_state : _task_runtime_states) { | ||
_call_back(runtime_state.get(), &st); | ||
runtime_state.reset(); | ||
for (auto& runtime_states : _task_runtime_states) { | ||
for (auto& runtime_state : runtime_states) { | ||
if (runtime_state) { | ||
_call_back(runtime_state.get(), &st); | ||
runtime_state.reset(); | ||
} | ||
} | ||
} | ||
} | ||
_runtime_state.reset(); | ||
|
@@ -164,7 +168,8 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, | |
} | ||
} | ||
|
||
Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request) { | ||
Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: function 'prepare' exceeds recommended size/complexity thresholds [readability-function-size] Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
^ Additional contextbe/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:166: 107 lines including whitespace and comments (threshold 80) Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
^ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: function 'prepare' exceeds recommended size/complexity thresholds [readability-function-size] Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
^ Additional contextbe/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:168: 106 lines including whitespace and comments (threshold 80) Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
^ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: function 'prepare' exceeds recommended size/complexity thresholds [readability-function-size] Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
^ Additional contextbe/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:168: 108 lines including whitespace and comments (threshold 80) Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
^ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: function 'prepare' exceeds recommended size/complexity thresholds [readability-function-size] Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
^ Additional contextbe/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:170: 105 lines including whitespace and comments (threshold 80) Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
^ |
||
ThreadPool* thread_pool) { | ||
if (_prepared) { | ||
return Status::InternalError("Already prepared"); | ||
} | ||
|
@@ -189,7 +194,6 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r | |
_runtime_state = RuntimeState::create_unique(request.query_id, request.fragment_id, | ||
request.query_options, _query_ctx->query_globals, | ||
_exec_env, _query_ctx.get()); | ||
|
||
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker()); | ||
if (request.__isset.backend_id) { | ||
_runtime_state->set_backend_id(request.backend_id); | ||
|
@@ -264,7 +268,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r | |
} | ||
|
||
// 5. Build pipeline tasks and initialize local state. | ||
RETURN_IF_ERROR(_build_pipeline_tasks(request)); | ||
RETURN_IF_ERROR(_build_pipeline_x_tasks(request, thread_pool)); | ||
|
||
_init_next_report_time(); | ||
|
||
|
@@ -486,11 +490,17 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData | |
return Status::OK(); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size] Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ Additional contextbe/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:491: 216 lines including whitespace and comments (threshold 80) Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ |
||
|
||
Status PipelineXFragmentContext::_build_pipeline_tasks( | ||
const doris::TPipelineFragmentParams& request) { | ||
Status PipelineXFragmentContext::_build_pipeline_x_tasks( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size] Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ Additional contextbe/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:482: 220 lines including whitespace and comments (threshold 80) Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size] Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ Additional contextbe/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:484: 216 lines including whitespace and comments (threshold 80) Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size] Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ Additional contextbe/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:485: 216 lines including whitespace and comments (threshold 80) Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size] Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ Additional contextbe/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:493: 216 lines including whitespace and comments (threshold 80) Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size] Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ Additional contextbe/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:493: 217 lines including whitespace and comments (threshold 80) Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size] Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ Additional contextbe/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:493: 215 lines including whitespace and comments (threshold 80) Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size] Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ Additional contextbe/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:492: 222 lines including whitespace and comments (threshold 80) Status PipelineXFragmentContext::_build_pipeline_x_tasks(
^ |
||
const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) { | ||
_total_tasks = 0; | ||
int target_size = request.local_params.size(); | ||
_tasks.resize(target_size); | ||
_fragment_instance_ids.resize(target_size); | ||
_runtime_filter_states.resize(target_size); | ||
_task_runtime_states.resize(_pipelines.size()); | ||
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { | ||
_task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks()); | ||
} | ||
auto& pipeline_id_to_profile = _runtime_state->pipeline_id_to_profile(); | ||
DCHECK(pipeline_id_to_profile.empty()); | ||
pipeline_id_to_profile.resize(_pipelines.size()); | ||
|
@@ -503,10 +513,10 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( | |
} | ||
} | ||
|
||
for (size_t i = 0; i < target_size; i++) { | ||
auto pre_and_submit = [&](int i, PipelineFragmentContext* ctx) { | ||
const auto& local_params = request.local_params[i]; | ||
auto fragment_instance_id = local_params.fragment_instance_id; | ||
_fragment_instance_ids.push_back(fragment_instance_id); | ||
_fragment_instance_ids[i] = fragment_instance_id; | ||
std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr; | ||
auto init_runtime_state = [&](std::unique_ptr<RuntimeState>& runtime_state) { | ||
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); | ||
|
@@ -563,7 +573,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( | |
|
||
filterparams->runtime_filter_mgr = runtime_filter_mgr.get(); | ||
|
||
_runtime_filter_states.push_back(std::move(filterparams)); | ||
_runtime_filter_states[i] = std::move(filterparams); | ||
std::map<PipelineId, PipelineXTask*> pipeline_id_to_task; | ||
auto get_local_exchange_state = [&](PipelinePtr pipeline) | ||
-> std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, | ||
|
@@ -583,32 +593,31 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( | |
} | ||
return le_state_map; | ||
}; | ||
auto get_task_runtime_state = [&](int task_id) -> RuntimeState* { | ||
DCHECK(_task_runtime_states[task_id]); | ||
return _task_runtime_states[task_id].get(); | ||
}; | ||
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { | ||
auto& pipeline = _pipelines[pip_idx]; | ||
if (pipeline->need_to_create_task()) { | ||
auto idx = pipeline->created_task_idx(); | ||
if (idx >= 0) { | ||
auto cur_task_id = _total_tasks++; | ||
DCHECK(_task_runtime_states[pip_idx][idx] == nullptr) | ||
<< print_id(_task_runtime_states[pip_idx][idx]->fragment_instance_id()) | ||
<< " " << pipeline->debug_string(); | ||
// build task runtime state | ||
_task_runtime_states.push_back(RuntimeState::create_unique( | ||
_task_runtime_states[pip_idx][idx] = RuntimeState::create_unique( | ||
this, local_params.fragment_instance_id, request.query_id, | ||
request.fragment_id, request.query_options, _query_ctx->query_globals, | ||
_exec_env, _query_ctx.get())); | ||
auto& task_runtime_state = _task_runtime_states.back(); | ||
_exec_env, _query_ctx.get()); | ||
auto& task_runtime_state = _task_runtime_states[pip_idx][idx]; | ||
init_runtime_state(task_runtime_state); | ||
auto cur_task_id = _total_tasks++; | ||
task_runtime_state->set_task_id(cur_task_id); | ||
task_runtime_state->set_task_num(pipeline->num_tasks()); | ||
auto task = std::make_unique<PipelineXTask>( | ||
pipeline, cur_task_id, get_task_runtime_state(cur_task_id), this, | ||
pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline), | ||
i); | ||
auto task = std::make_unique<PipelineXTask>(pipeline, cur_task_id, | ||
task_runtime_state.get(), ctx, | ||
pipeline_id_to_profile[pip_idx].get(), | ||
get_local_exchange_state(pipeline), i); | ||
pipeline_id_to_task.insert({pipeline->id(), task.get()}); | ||
_tasks[i].emplace_back(std::move(task)); | ||
} | ||
} | ||
|
||
/** | ||
* Build DAG for pipeline tasks. | ||
* For example, we have | ||
|
@@ -668,6 +677,36 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( | |
std::lock_guard<std::mutex> l(_state_map_lock); | ||
_runtime_filter_mgr_map[fragment_instance_id] = std::move(runtime_filter_mgr); | ||
} | ||
return Status::OK(); | ||
}; | ||
if (target_size > 1) { | ||
Status prepare_status[target_size]; | ||
std::mutex m; | ||
std::condition_variable cv; | ||
int prepare_done = 0; | ||
for (size_t i = 0; i < target_size; i++) { | ||
RETURN_IF_ERROR(thread_pool->submit_func([&, i]() { | ||
SCOPED_ATTACH_TASK_WITH_ID(_query_ctx->query_mem_tracker, _query_id); | ||
prepare_status[i] = pre_and_submit(i, this); | ||
std::unique_lock<std::mutex> lock(m); | ||
prepare_done++; | ||
if (prepare_done == target_size) { | ||
cv.notify_one(); | ||
} | ||
})); | ||
} | ||
std::unique_lock<std::mutex> lock(m); | ||
if (prepare_done != target_size) { | ||
cv.wait(lock); | ||
|
||
for (size_t i = 0; i < target_size; i++) { | ||
if (!prepare_status[i].ok()) { | ||
return prepare_status[i]; | ||
} | ||
} | ||
} | ||
} else { | ||
RETURN_IF_ERROR(pre_and_submit(0, this)); | ||
} | ||
_pipeline_parent_map.clear(); | ||
_dag.clear(); | ||
|
@@ -1418,8 +1457,12 @@ Status PipelineXFragmentContext::send_report(bool done) { | |
|
||
std::vector<RuntimeState*> runtime_states; | ||
|
||
for (auto& task_state : _task_runtime_states) { | ||
runtime_states.push_back(task_state.get()); | ||
for (auto& task_states : _task_runtime_states) { | ||
for (auto& task_state : task_states) { | ||
if (task_state) { | ||
runtime_states.push_back(task_state.get()); | ||
} | ||
} | ||
} | ||
|
||
ReportStatusRequest req {true, | ||
|
@@ -1450,7 +1493,7 @@ PipelineXFragmentContext::collect_realtime_profile_x() const { | |
print_id(this->_query_id)); | ||
|
||
// we do not have mutex to protect pipeline_id_to_profile | ||
// so we need to make sure this funciton is invoked after fragment context | ||
// so we need to make sure this function is invoked after fragment context | ||
// has already been prepared. | ||
if (!this->_prepared) { | ||
std::string msg = | ||
|
@@ -1483,15 +1526,17 @@ PipelineXFragmentContext::collect_realtime_load_channel_profile_x() const { | |
return nullptr; | ||
} | ||
|
||
for (auto& runtime_state : _task_runtime_states) { | ||
if (runtime_state->runtime_profile() == nullptr) { | ||
continue; | ||
} | ||
for (auto& runtime_states : _task_runtime_states) { | ||
for (auto& runtime_state : runtime_states) { | ||
if (runtime_state == nullptr || runtime_state->runtime_profile() == nullptr) { | ||
continue; | ||
} | ||
|
||
auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>(); | ||
auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>(); | ||
|
||
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get()); | ||
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile); | ||
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get()); | ||
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile); | ||
} | ||
} | ||
|
||
auto load_channel_profile = std::make_shared<TRuntimeProfileTree>(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'prepare' exceeds recommended size/complexity thresholds [readability-function-size]
Additional context
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:164: 107 lines including whitespace and comments (threshold 80)