Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PipelineX](improvement) Prepare tasks in parallel #32789

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,7 @@ void IRuntimeFilter::signal() {
}

void IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> timer) {
std::unique_lock lock(_inner_mutex);
_filter_timer.push_back(timer);
}

Expand Down
8 changes: 5 additions & 3 deletions be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,10 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
void set_children(std::shared_ptr<Pipeline> child) { _children.push_back(child); }
void set_children(std::vector<std::shared_ptr<Pipeline>> children) { _children = children; }

void incr_created_tasks() { _num_tasks_created++; }
bool need_to_create_task() const { return _num_tasks > _num_tasks_created; }
int created_task_idx() {
auto idx = _num_tasks_created.fetch_add(1);
return _num_tasks > idx ? idx : -1;
}
void set_num_tasks(int num_tasks) {
_num_tasks = num_tasks;
for (auto& op : operatorXs) {
Expand Down Expand Up @@ -243,7 +245,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
// How many tasks should be created ?
int _num_tasks = 1;
// How many tasks are already created?
int _num_tasks_created = 0;
std::atomic<int> _num_tasks_created = 0;
};

} // namespace doris::pipeline
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class PipelineFragmentContext : public TaskExecutionContext {

Status prepare(const doris::TPipelineFragmentParams& request, size_t idx);

virtual Status prepare(const doris::TPipelineFragmentParams& request) {
virtual Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) {
return Status::InternalError("Pipeline fragment context do not implement prepare");
}

Expand Down Expand Up @@ -167,7 +167,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
int _closed_tasks = 0;
// After prepared, `_total_tasks` is equal to the size of `_tasks`.
// When submit fail, `_total_tasks` is equal to the number of tasks submitted.
int _total_tasks = 0;
std::atomic<int> _total_tasks = 0;

int32_t _next_operator_builder_id = 10000;

Expand Down
115 changes: 80 additions & 35 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,13 @@ PipelineXFragmentContext::~PipelineXFragmentContext() {
auto st = _query_ctx->exec_status();
_tasks.clear();
if (!_task_runtime_states.empty()) {
for (auto& runtime_state : _task_runtime_states) {
_call_back(runtime_state.get(), &st);
runtime_state.reset();
for (auto& runtime_states : _task_runtime_states) {
for (auto& runtime_state : runtime_states) {
if (runtime_state) {
_call_back(runtime_state.get(), &st);
runtime_state.reset();
}
}
}
}
_runtime_state.reset();
Expand Down Expand Up @@ -164,7 +168,8 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
}
}

Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request) {
Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'prepare' exceeds recommended size/complexity thresholds [readability-function-size]

Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
                                 ^
Additional context

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:164: 107 lines including whitespace and comments (threshold 80)

Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
                                 ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'prepare' exceeds recommended size/complexity thresholds [readability-function-size]

Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
                                 ^
Additional context

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:166: 107 lines including whitespace and comments (threshold 80)

Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
                                 ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'prepare' exceeds recommended size/complexity thresholds [readability-function-size]

Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
                                 ^
Additional context

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:168: 106 lines including whitespace and comments (threshold 80)

Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
                                 ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'prepare' exceeds recommended size/complexity thresholds [readability-function-size]

Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
                                 ^
Additional context

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:168: 108 lines including whitespace and comments (threshold 80)

Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
                                 ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'prepare' exceeds recommended size/complexity thresholds [readability-function-size]

Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
                                 ^
Additional context

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:170: 105 lines including whitespace and comments (threshold 80)

Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
                                 ^

ThreadPool* thread_pool) {
if (_prepared) {
return Status::InternalError("Already prepared");
}
Expand All @@ -189,7 +194,6 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
_runtime_state = RuntimeState::create_unique(request.query_id, request.fragment_id,
request.query_options, _query_ctx->query_globals,
_exec_env, _query_ctx.get());

SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
if (request.__isset.backend_id) {
_runtime_state->set_backend_id(request.backend_id);
Expand Down Expand Up @@ -264,7 +268,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
}

// 5. Build pipeline tasks and initialize local state.
RETURN_IF_ERROR(_build_pipeline_tasks(request));
RETURN_IF_ERROR(_build_pipeline_x_tasks(request, thread_pool));

_init_next_report_time();

Expand Down Expand Up @@ -486,11 +490,17 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
return Status::OK();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size]

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^
Additional context

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:491: 216 lines including whitespace and comments (threshold 80)

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^


Status PipelineXFragmentContext::_build_pipeline_tasks(
const doris::TPipelineFragmentParams& request) {
Status PipelineXFragmentContext::_build_pipeline_x_tasks(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size]

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^
Additional context

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:482: 220 lines including whitespace and comments (threshold 80)

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size]

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^
Additional context

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:484: 216 lines including whitespace and comments (threshold 80)

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size]

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^
Additional context

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:485: 216 lines including whitespace and comments (threshold 80)

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size]

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^
Additional context

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:493: 216 lines including whitespace and comments (threshold 80)

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size]

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^
Additional context

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:493: 217 lines including whitespace and comments (threshold 80)

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size]

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^
Additional context

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:493: 215 lines including whitespace and comments (threshold 80)

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function '_build_pipeline_x_tasks' exceeds recommended size/complexity thresholds [readability-function-size]

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^
Additional context

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:492: 222 lines including whitespace and comments (threshold 80)

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
                                 ^

const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) {
_total_tasks = 0;
int target_size = request.local_params.size();
_tasks.resize(target_size);
_fragment_instance_ids.resize(target_size);
_runtime_filter_states.resize(target_size);
_task_runtime_states.resize(_pipelines.size());
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
_task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
}
auto& pipeline_id_to_profile = _runtime_state->pipeline_id_to_profile();
DCHECK(pipeline_id_to_profile.empty());
pipeline_id_to_profile.resize(_pipelines.size());
Expand All @@ -503,10 +513,10 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
}
}

for (size_t i = 0; i < target_size; i++) {
auto pre_and_submit = [&](int i, PipelineFragmentContext* ctx) {
const auto& local_params = request.local_params[i];
auto fragment_instance_id = local_params.fragment_instance_id;
_fragment_instance_ids.push_back(fragment_instance_id);
_fragment_instance_ids[i] = fragment_instance_id;
std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr;
auto init_runtime_state = [&](std::unique_ptr<RuntimeState>& runtime_state) {
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
Expand Down Expand Up @@ -563,7 +573,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(

filterparams->runtime_filter_mgr = runtime_filter_mgr.get();

_runtime_filter_states.push_back(std::move(filterparams));
_runtime_filter_states[i] = std::move(filterparams);
std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
auto get_local_exchange_state = [&](PipelinePtr pipeline)
-> std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
Expand All @@ -583,32 +593,31 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
}
return le_state_map;
};
auto get_task_runtime_state = [&](int task_id) -> RuntimeState* {
DCHECK(_task_runtime_states[task_id]);
return _task_runtime_states[task_id].get();
};
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto& pipeline = _pipelines[pip_idx];
if (pipeline->need_to_create_task()) {
auto idx = pipeline->created_task_idx();
if (idx >= 0) {
auto cur_task_id = _total_tasks++;
DCHECK(_task_runtime_states[pip_idx][idx] == nullptr)
<< print_id(_task_runtime_states[pip_idx][idx]->fragment_instance_id())
<< " " << pipeline->debug_string();
// build task runtime state
_task_runtime_states.push_back(RuntimeState::create_unique(
_task_runtime_states[pip_idx][idx] = RuntimeState::create_unique(
this, local_params.fragment_instance_id, request.query_id,
request.fragment_id, request.query_options, _query_ctx->query_globals,
_exec_env, _query_ctx.get()));
auto& task_runtime_state = _task_runtime_states.back();
_exec_env, _query_ctx.get());
auto& task_runtime_state = _task_runtime_states[pip_idx][idx];
init_runtime_state(task_runtime_state);
auto cur_task_id = _total_tasks++;
task_runtime_state->set_task_id(cur_task_id);
task_runtime_state->set_task_num(pipeline->num_tasks());
auto task = std::make_unique<PipelineXTask>(
pipeline, cur_task_id, get_task_runtime_state(cur_task_id), this,
pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline),
i);
auto task = std::make_unique<PipelineXTask>(pipeline, cur_task_id,
task_runtime_state.get(), ctx,
pipeline_id_to_profile[pip_idx].get(),
get_local_exchange_state(pipeline), i);
pipeline_id_to_task.insert({pipeline->id(), task.get()});
_tasks[i].emplace_back(std::move(task));
}
}

/**
* Build DAG for pipeline tasks.
* For example, we have
Expand Down Expand Up @@ -668,6 +677,36 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
std::lock_guard<std::mutex> l(_state_map_lock);
_runtime_filter_mgr_map[fragment_instance_id] = std::move(runtime_filter_mgr);
}
return Status::OK();
};
if (target_size > 1) {
Status prepare_status[target_size];
std::mutex m;
std::condition_variable cv;
int prepare_done = 0;
for (size_t i = 0; i < target_size; i++) {
RETURN_IF_ERROR(thread_pool->submit_func([&, i]() {
SCOPED_ATTACH_TASK_WITH_ID(_query_ctx->query_mem_tracker, _query_id);
prepare_status[i] = pre_and_submit(i, this);
std::unique_lock<std::mutex> lock(m);
prepare_done++;
if (prepare_done == target_size) {
cv.notify_one();
}
}));
}
std::unique_lock<std::mutex> lock(m);
if (prepare_done != target_size) {
cv.wait(lock);

for (size_t i = 0; i < target_size; i++) {
if (!prepare_status[i].ok()) {
return prepare_status[i];
}
}
}
} else {
RETURN_IF_ERROR(pre_and_submit(0, this));
}
_pipeline_parent_map.clear();
_dag.clear();
Expand Down Expand Up @@ -1418,8 +1457,12 @@ Status PipelineXFragmentContext::send_report(bool done) {

std::vector<RuntimeState*> runtime_states;

for (auto& task_state : _task_runtime_states) {
runtime_states.push_back(task_state.get());
for (auto& task_states : _task_runtime_states) {
for (auto& task_state : task_states) {
if (task_state) {
runtime_states.push_back(task_state.get());
}
}
}

ReportStatusRequest req {true,
Expand Down Expand Up @@ -1450,7 +1493,7 @@ PipelineXFragmentContext::collect_realtime_profile_x() const {
print_id(this->_query_id));

// we do not have mutex to protect pipeline_id_to_profile
// so we need to make sure this funciton is invoked after fragment context
// so we need to make sure this function is invoked after fragment context
// has already been prepared.
if (!this->_prepared) {
std::string msg =
Expand Down Expand Up @@ -1483,15 +1526,17 @@ PipelineXFragmentContext::collect_realtime_load_channel_profile_x() const {
return nullptr;
}

for (auto& runtime_state : _task_runtime_states) {
if (runtime_state->runtime_profile() == nullptr) {
continue;
}
for (auto& runtime_states : _task_runtime_states) {
for (auto& runtime_state : runtime_states) {
if (runtime_state == nullptr || runtime_state->runtime_profile() == nullptr) {
continue;
}

auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();

runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
}
}

auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
Expand Down
7 changes: 4 additions & 3 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext {
// bool is_canceled() const { return _runtime_state->is_cancelled(); }

// Prepare global information including global states and the unique operator tree shared by all pipeline tasks.
Status prepare(const doris::TPipelineFragmentParams& request) override;
Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) override;

Status submit() override;

Expand All @@ -119,7 +119,8 @@ class PipelineXFragmentContext : public PipelineFragmentContext {

private:
void _close_fragment_instance() override;
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override;
Status _build_pipeline_x_tasks(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool);
Status _add_local_exchange(int pip_idx, int idx, int node_id, ObjectPool* pool,
PipelinePtr cur_pipe, DataDistribution data_distribution,
bool* do_local_exchange, int num_buckets,
Expand Down Expand Up @@ -233,7 +234,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext {

std::vector<TUniqueId> _fragment_instance_ids;
// Local runtime states for each task
std::vector<std::unique_ptr<RuntimeState>> _task_runtime_states;
std::vector<std::vector<std::unique_ptr<RuntimeState>>> _task_runtime_states;

std::vector<std::unique_ptr<RuntimeFilterParamsContext>> _runtime_filter_states;

Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ PipelineXTask::PipelineXTask(
if (shared_state) {
_sink_shared_state = shared_state;
}
pipeline->incr_created_tasks();
}

Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink,
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
this, std::placeholders::_1, std::placeholders::_2));
{
SCOPED_RAW_TIMER(&duration_ns);
auto prepare_st = context->prepare(params);
auto prepare_st = context->prepare(params, _thread_pool.get());
if (!prepare_st.ok()) {
context->close_if_prepare_failed(prepare_st);
query_ctx->set_execution_dependency_ready();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ public class SessionVariable implements Serializable, Writable {

@VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private boolean ignoreStorageDataDistribution = true;
private boolean ignoreStorageDataDistribution = false;

@VariableMgr.VarAttr(
name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
Expand Down
Loading