Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PipelineX](improvement) Prepare tasks in parallel #40270

Merged
merged 3 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ DEFINE_Int32(brpc_heavy_work_pool_max_queue_size, "-1");
DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1");

// The maximum amount of data that can be processed by a stream load
DEFINE_mInt64(streaming_load_max_mb, "10240");
DEFINE_mInt64(streaming_load_max_mb, "102400");
// Some data formats, such as JSON, cannot be streamed.
// Therefore, it is necessary to limit the maximum number of
// such data when using stream load to prevent excessive memory consumption.
Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,7 @@ void IRuntimeFilter::signal() {
}

void IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> timer) {
std::unique_lock lock(_inner_mutex);
_filter_timer.push_back(timer);
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
void set_children(std::vector<std::shared_ptr<Pipeline>> children) { _children = children; }

void incr_created_tasks() { _num_tasks_created++; }
bool need_to_create_task() const { return _num_tasks > _num_tasks_created; }
void set_num_tasks(int num_tasks) {
_num_tasks = num_tasks;
for (auto& op : operatorXs) {
Expand Down Expand Up @@ -243,7 +242,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
// How many tasks should be created ?
int _num_tasks = 1;
// How many tasks are already created?
int _num_tasks_created = 0;
std::atomic<int> _num_tasks_created = 0;
};

} // namespace doris::pipeline
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class PipelineFragmentContext : public TaskExecutionContext {

Status prepare(const doris::TPipelineFragmentParams& request, size_t idx);

virtual Status prepare(const doris::TPipelineFragmentParams& request) {
virtual Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) {
return Status::InternalError("Pipeline fragment context do not implement prepare");
}

Expand Down Expand Up @@ -168,7 +168,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
int _closed_tasks = 0;
// After prepared, `_total_tasks` is equal to the size of `_tasks`.
// When submit fail, `_total_tasks` is equal to the number of tasks submitted.
int _total_tasks = 0;
std::atomic<int> _total_tasks = 0;

int32_t _next_operator_builder_id = 10000;

Expand Down
100 changes: 73 additions & 27 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,13 @@ PipelineXFragmentContext::~PipelineXFragmentContext() {
auto st = _query_ctx->exec_status();
_tasks.clear();
if (!_task_runtime_states.empty()) {
for (auto& runtime_state : _task_runtime_states) {
_call_back(runtime_state.get(), &st);
runtime_state.reset();
for (auto& runtime_states : _task_runtime_states) {
for (auto& runtime_state : runtime_states) {
if (runtime_state) {
_call_back(runtime_state.get(), &st);
runtime_state.reset();
}
}
}
} else {
_call_back(nullptr, &st);
Expand Down Expand Up @@ -182,7 +186,8 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
}
}

Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request) {
Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
ThreadPool* thread_pool) {
if (_prepared) {
return Status::InternalError("Already prepared");
}
Expand Down Expand Up @@ -210,7 +215,6 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
_runtime_state = RuntimeState::create_unique(request.query_id, request.fragment_id,
request.query_options, _query_ctx->query_globals,
_exec_env, _query_ctx.get());

SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
if (request.__isset.backend_id) {
_runtime_state->set_backend_id(request.backend_id);
Expand Down Expand Up @@ -284,7 +288,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
}

// 5. Build pipeline tasks and initialize local state.
RETURN_IF_ERROR(_build_pipeline_tasks(request));
RETURN_IF_ERROR(_build_pipeline_x_tasks(request, thread_pool));

_init_next_report_time();

Expand Down Expand Up @@ -511,11 +515,17 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
return Status::OK();
}

Status PipelineXFragmentContext::_build_pipeline_tasks(
const doris::TPipelineFragmentParams& request) {
Status PipelineXFragmentContext::_build_pipeline_x_tasks(
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) {
_total_tasks = 0;
int target_size = request.local_params.size();
_tasks.resize(target_size);
_fragment_instance_ids.resize(target_size);
_runtime_filter_states.resize(target_size);
_task_runtime_states.resize(_pipelines.size());
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
_task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
}
auto& pipeline_id_to_profile = _runtime_state->pipeline_id_to_profile();
DCHECK(pipeline_id_to_profile.empty());
pipeline_id_to_profile.resize(_pipelines.size());
Expand All @@ -528,10 +538,10 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
}
}

for (size_t i = 0; i < target_size; i++) {
auto pre_and_submit = [&](int i, PipelineFragmentContext* ctx) {
const auto& local_params = request.local_params[i];
auto fragment_instance_id = local_params.fragment_instance_id;
_fragment_instance_ids.push_back(fragment_instance_id);
_fragment_instance_ids[i] = fragment_instance_id;
std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr;
auto init_runtime_state = [&](std::unique_ptr<RuntimeState>& runtime_state) {
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
Expand Down Expand Up @@ -588,7 +598,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(

filterparams->runtime_filter_mgr = runtime_filter_mgr.get();

_runtime_filter_states.push_back(std::move(filterparams));
_runtime_filter_states[i] = std::move(filterparams);
std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
auto get_local_exchange_state = [&](PipelinePtr pipeline)
-> std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
Expand All @@ -608,32 +618,30 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
}
return le_state_map;
};
auto get_task_runtime_state = [&](int task_id) -> RuntimeState* {
DCHECK(_task_runtime_states[task_id]);
return _task_runtime_states[task_id].get();
};
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto& pipeline = _pipelines[pip_idx];
if (pipeline->need_to_create_task()) {
if (pipeline->num_tasks() > 1 || i == 0) {
auto cur_task_id = _total_tasks++;
DCHECK(_task_runtime_states[pip_idx][i] == nullptr)
<< print_id(_task_runtime_states[pip_idx][i]->fragment_instance_id()) << " "
<< pipeline->debug_string();
// build task runtime state
_task_runtime_states.push_back(RuntimeState::create_unique(
_task_runtime_states[pip_idx][i] = RuntimeState::create_unique(
this, local_params.fragment_instance_id, request.query_id,
request.fragment_id, request.query_options, _query_ctx->query_globals,
_exec_env, _query_ctx.get()));
auto& task_runtime_state = _task_runtime_states.back();
_exec_env, _query_ctx.get());
auto& task_runtime_state = _task_runtime_states[pip_idx][i];
init_runtime_state(task_runtime_state);
auto cur_task_id = _total_tasks++;
task_runtime_state->set_task_id(cur_task_id);
task_runtime_state->set_task_num(pipeline->num_tasks());
auto task = std::make_unique<PipelineXTask>(
pipeline, cur_task_id, get_task_runtime_state(cur_task_id), this,
pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline),
i);
auto task = std::make_unique<PipelineXTask>(pipeline, cur_task_id,
task_runtime_state.get(), ctx,
pipeline_id_to_profile[pip_idx].get(),
get_local_exchange_state(pipeline), i);
pipeline_id_to_task.insert({pipeline->id(), task.get()});
_tasks[i].emplace_back(std::move(task));
}
}

/**
* Build DAG for pipeline tasks.
* For example, we have
Expand Down Expand Up @@ -693,6 +701,40 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
std::lock_guard<std::mutex> l(_state_map_lock);
_runtime_filter_mgr_map[fragment_instance_id] = std::move(runtime_filter_mgr);
}
return Status::OK();
};
if (target_size > 1 &&
(_runtime_state->query_options().__isset.parallel_prepare_threshold &&
target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
Status prepare_status[target_size];
std::mutex m;
std::condition_variable cv;
int prepare_done = 0;
for (size_t i = 0; i < target_size; i++) {
RETURN_IF_ERROR(thread_pool->submit_func([&, i]() {
SCOPED_ATTACH_TASK(_query_ctx.get());
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
prepare_status[i] = pre_and_submit(i, this);
std::unique_lock<std::mutex> lock(m);
prepare_done++;
if (prepare_done == target_size) {
cv.notify_one();
}
}));
}
std::unique_lock<std::mutex> lock(m);
if (prepare_done != target_size) {
cv.wait(lock);
yiguolei marked this conversation as resolved.
Show resolved Hide resolved

for (size_t i = 0; i < target_size; i++) {
if (!prepare_status[i].ok()) {
return prepare_status[i];
}
}
}
} else {
for (size_t i = 0; i < target_size; i++) {
RETURN_IF_ERROR(pre_and_submit(i, this));
}
}
_pipeline_parent_map.clear();
_dag.clear();
Expand Down Expand Up @@ -1512,8 +1554,12 @@ Status PipelineXFragmentContext::send_report(bool done) {

std::vector<RuntimeState*> runtime_states;

for (auto& task_state : _task_runtime_states) {
runtime_states.push_back(task_state.get());
for (auto& task_states : _task_runtime_states) {
for (auto& task_state : task_states) {
if (task_state) {
runtime_states.push_back(task_state.get());
}
}
}
return _report_status_cb(
{true, exec_status, runtime_states, nullptr, _runtime_state->load_channel_profile(),
Expand Down
7 changes: 4 additions & 3 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext {
}

// Prepare global information including global states and the unique operator tree shared by all pipeline tasks.
Status prepare(const doris::TPipelineFragmentParams& request) override;
Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) override;

Status submit() override;

Expand All @@ -118,7 +118,8 @@ class PipelineXFragmentContext : public PipelineFragmentContext {

private:
void _close_fragment_instance() override;
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override;
Status _build_pipeline_x_tasks(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool);
Status _add_local_exchange(int pip_idx, int idx, int node_id, ObjectPool* pool,
PipelinePtr cur_pipe, DataDistribution data_distribution,
bool* do_local_exchange, int num_buckets,
Expand Down Expand Up @@ -230,7 +231,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext {

std::vector<TUniqueId> _fragment_instance_ids;
// Local runtime states for each task
std::vector<std::unique_ptr<RuntimeState>> _task_runtime_states;
std::vector<std::vector<std::unique_ptr<RuntimeState>>> _task_runtime_states;

std::vector<std::unique_ptr<RuntimeFilterParamsContext>> _runtime_filter_states;

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
std::placeholders::_1, std::placeholders::_2));
{
SCOPED_RAW_TIMER(&duration_ns);
auto prepare_st = context->prepare(params);
auto prepare_st = context->prepare(params, _thread_pool.get());
if (!prepare_st.ok()) {
context->close_if_prepare_failed(prepare_st);
query_ctx->set_execution_dependency_ready();
Expand Down
8 changes: 5 additions & 3 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,6 @@ Status Channel<Parent>::init_stub(RuntimeState* state) {
_is_local &= state->query_options().enable_local_exchange;
}
if (_is_local) {
WARN_IF_ERROR(_parent->state()->exec_env()->vstream_mgr()->find_recvr(
_fragment_instance_id, _dest_node_id, &_local_recvr),
"");
return Status::OK();
}
if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
Expand All @@ -149,6 +146,11 @@ Status Channel<Parent>::init_stub(RuntimeState* state) {

template <typename Parent>
Status Channel<Parent>::open(RuntimeState* state) {
if (_is_local) {
WARN_IF_ERROR(_parent->state()->exec_env()->vstream_mgr()->find_recvr(
_fragment_instance_id, _dest_node_id, &_local_recvr),
"");
}
_be_number = state->be_number();
_brpc_request = std::make_shared<PTransmitDataParams>();
// initialize brpc request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String AUTO_BROADCAST_JOIN_THRESHOLD = "auto_broadcast_join_threshold";

public static final String PARALLEL_PREPARE_THRESHOLD = "parallel_prepare_threshold";

public static final String ENABLE_PROJECTION = "enable_projection";

public static final String CHECK_OVERFLOW_FOR_DECIMAL = "check_overflow_for_decimal";
Expand Down Expand Up @@ -1004,7 +1006,7 @@ public class SessionVariable implements Serializable, Writable {

@VariableMgr.VarAttr(name = PARALLEL_SCAN_MIN_ROWS_PER_SCANNER, fuzzy = true,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private long parallelScanMinRowsPerScanner = 16384; // 16K
private long parallelScanMinRowsPerScanner = 2097152; // 16K

@VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
Expand Down Expand Up @@ -1044,6 +1046,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = AUTO_BROADCAST_JOIN_THRESHOLD)
public double autoBroadcastJoinThreshold = 0.8;

@VariableMgr.VarAttr(name = PARALLEL_PREPARE_THRESHOLD)
public int parallelPrepareThreshold = 32;

@VariableMgr.VarAttr(name = ENABLE_COST_BASED_JOIN_REORDER)
private boolean enableJoinReorderBasedCost = false;

Expand Down Expand Up @@ -3426,6 +3431,7 @@ public TQueryOptions toThrift() {
tResult.setNumScannerThreads(numScannerThreads);
tResult.setScannerScaleUpRatio(scannerScaleUpRatio);
tResult.setMaxColumnReaderNum(maxColumnReaderNum);
tResult.setParallelPrepareThreshold(parallelPrepareThreshold);

// TODO chenhao, reservation will be calculated by cost
tResult.setMinReservation(0);
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ struct TQueryOptions {
130: optional bool enable_adaptive_pipeline_task_serial_read_on_limit = true;
131: optional i32 adaptive_pipeline_task_serial_read_on_limit = 10000;

// only in 2.1
999: optional i32 parallel_prepare_threshold = 0;
// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false
}
Expand Down
Loading