Skip to content

Commit

Permalink
[PipelineX](improvement) Prepare tasks in parallel (apache#40844) (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Sep 18, 2024
1 parent a4a9e80 commit 5137841
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 33 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1");
DEFINE_mBool(enable_bthread_transmit_block, "true");

// The maximum amount of data that can be processed by a stream load
DEFINE_mInt64(streaming_load_max_mb, "10240");
DEFINE_mInt64(streaming_load_max_mb, "102400");
// Some data formats, such as JSON, cannot be streamed.
// Therefore, it is necessary to limit the maximum number of
// such data when using stream load to prevent excessive memory consumption.
Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,7 @@ void IRuntimeFilter::signal() {
}

void IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> timer) {
std::unique_lock lock(_inner_mutex);
_filter_timer.push_back(timer);
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
void set_children(std::vector<std::shared_ptr<Pipeline>> children) { _children = children; }

void incr_created_tasks() { _num_tasks_created++; }
bool need_to_create_task() const { return _num_tasks > _num_tasks_created; }
void set_num_tasks(int num_tasks) {
_num_tasks = num_tasks;
for (auto& op : operatorXs) {
Expand Down Expand Up @@ -160,7 +159,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
// How many tasks should be created ?
int _num_tasks = 1;
// How many tasks are already created?
int _num_tasks_created = 0;
std::atomic<int> _num_tasks_created = 0;
};

} // namespace doris::pipeline
98 changes: 74 additions & 24 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ PipelineFragmentContext::~PipelineFragmentContext() {
}
}
_tasks.clear();
for (auto& runtime_state : _task_runtime_states) {
runtime_state.reset();
for (auto& runtime_states : _task_runtime_states) {
for (auto& runtime_state : runtime_states) {
runtime_state.reset();
}
}
_pipelines.clear();
_sink.reset();
Expand Down Expand Up @@ -229,7 +231,8 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
return pipeline;
}

Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request) {
Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool) {
if (_prepared) {
return Status::InternalError("Already prepared");
}
Expand Down Expand Up @@ -346,7 +349,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
{
SCOPED_TIMER(_build_tasks_timer);
// 5. Build pipeline tasks and initialize local state.
RETURN_IF_ERROR(_build_pipeline_tasks(request));
RETURN_IF_ERROR(_build_pipeline_tasks(request, thread_pool));
}

_init_next_report_time();
Expand All @@ -355,17 +358,23 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
return Status::OK();
}

Status PipelineFragmentContext::_build_pipeline_tasks(
const doris::TPipelineFragmentParams& request) {
Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool) {
_total_tasks = 0;
int target_size = request.local_params.size();
const auto target_size = request.local_params.size();
_tasks.resize(target_size);
_fragment_instance_ids.resize(target_size);
_runtime_filter_states.resize(target_size);
_task_runtime_states.resize(_pipelines.size());
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
_task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
}
auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());

for (size_t i = 0; i < target_size; i++) {
auto pre_and_submit = [&](int i, PipelineFragmentContext* ctx) {
const auto& local_params = request.local_params[i];
auto fragment_instance_id = local_params.fragment_instance_id;
_fragment_instance_ids.push_back(fragment_instance_id);
_fragment_instance_ids[i] = fragment_instance_id;
std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr;
auto init_runtime_state = [&](std::unique_ptr<RuntimeState>& runtime_state) {
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
Expand Down Expand Up @@ -424,7 +433,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(

filterparams->runtime_filter_mgr = runtime_filter_mgr.get();

_runtime_filter_states.push_back(std::move(filterparams));
_runtime_filter_states[i] = std::move(filterparams);
std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
auto get_local_exchange_state = [&](PipelinePtr pipeline)
-> std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
Expand All @@ -447,13 +456,15 @@ Status PipelineFragmentContext::_build_pipeline_tasks(

for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto& pipeline = _pipelines[pip_idx];
if (pipeline->need_to_create_task()) {
// build task runtime state
_task_runtime_states.push_back(RuntimeState::create_unique(
if (pipeline->num_tasks() > 1 || i == 0) {
DCHECK(_task_runtime_states[pip_idx][i] == nullptr)
<< print_id(_task_runtime_states[pip_idx][i]->fragment_instance_id()) << " "
<< pipeline->debug_string();
_task_runtime_states[pip_idx][i] = RuntimeState::create_unique(
this, local_params.fragment_instance_id, request.query_id,
request.fragment_id, request.query_options, _query_ctx->query_globals,
_exec_env, _query_ctx.get()));
auto& task_runtime_state = _task_runtime_states.back();
_exec_env, _query_ctx.get());
auto& task_runtime_state = _task_runtime_states[pip_idx][i];
init_runtime_state(task_runtime_state);
auto cur_task_id = _total_tasks++;
task_runtime_state->set_task_id(cur_task_id);
Expand Down Expand Up @@ -527,6 +538,39 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
std::lock_guard<std::mutex> l(_state_map_lock);
_runtime_filter_mgr_map[fragment_instance_id] = std::move(runtime_filter_mgr);
}
return Status::OK();
};
if (target_size > 1 &&
(_runtime_state->query_options().__isset.parallel_prepare_threshold &&
target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
std::vector<Status> prepare_status(target_size);
std::mutex m;
std::condition_variable cv;
int prepare_done = 0;
for (size_t i = 0; i < target_size; i++) {
RETURN_IF_ERROR(thread_pool->submit_func([&, i]() {
SCOPED_ATTACH_TASK(_query_ctx.get());
prepare_status[i] = pre_and_submit(i, this);
std::unique_lock<std::mutex> lock(m);
prepare_done++;
if (prepare_done == target_size) {
cv.notify_one();
}
}));
}
std::unique_lock<std::mutex> lock(m);
if (prepare_done != target_size) {
cv.wait(lock);
for (size_t i = 0; i < target_size; i++) {
if (!prepare_status[i].ok()) {
return prepare_status[i];
}
}
}
} else {
for (size_t i = 0; i < target_size; i++) {
RETURN_IF_ERROR(pre_and_submit(i, this));
}
}
_pipeline_parent_map.clear();
_dag.clear();
Expand Down Expand Up @@ -1683,8 +1727,12 @@ Status PipelineFragmentContext::send_report(bool done) {

std::vector<RuntimeState*> runtime_states;

for (auto& task_state : _task_runtime_states) {
runtime_states.push_back(task_state.get());
for (auto& task_states : _task_runtime_states) {
for (auto& task_state : task_states) {
if (task_state) {
runtime_states.push_back(task_state.get());
}
}
}

ReportStatusRequest req {exec_status,
Expand Down Expand Up @@ -1755,15 +1803,17 @@ PipelineFragmentContext::collect_realtime_load_channel_profile_x() const {
return nullptr;
}

for (auto& runtime_state : _task_runtime_states) {
if (runtime_state->runtime_profile() == nullptr) {
continue;
}
for (auto& runtime_states : _task_runtime_states) {
for (auto& runtime_state : runtime_states) {
if (runtime_state->runtime_profile() == nullptr) {
continue;
}

auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();

runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
}
}

auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
Expand Down
9 changes: 5 additions & 4 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
// should be protected by lock?
[[nodiscard]] bool is_canceled() const { return _runtime_state->is_cancelled(); }

Status prepare(const doris::TPipelineFragmentParams& request);
Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool);

Status submit();

Expand Down Expand Up @@ -187,7 +187,8 @@ class PipelineFragmentContext : public TaskExecutionContext {

bool _enable_local_shuffle() const { return _runtime_state->enable_local_shuffle(); }

Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request);
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool);
void _close_fragment_instance();
void _init_next_report_time();

Expand All @@ -206,7 +207,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
int _closed_tasks = 0;
// After prepared, `_total_tasks` is equal to the size of `_tasks`.
// When submit fail, `_total_tasks` is equal to the number of tasks submitted.
int _total_tasks = 0;
std::atomic<int> _total_tasks = 0;

std::unique_ptr<RuntimeProfile> _runtime_profile;
bool _is_report_success = false;
Expand Down Expand Up @@ -303,7 +304,7 @@ class PipelineFragmentContext : public TaskExecutionContext {

std::vector<TUniqueId> _fragment_instance_ids;
// Local runtime states for each task
std::vector<std::unique_ptr<RuntimeState>> _task_runtime_states;
std::vector<std::vector<std::unique_ptr<RuntimeState>>> _task_runtime_states;

std::vector<std::unique_ptr<RuntimeFilterParamsContext>> _runtime_filter_states;

Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
{
SCOPED_RAW_TIMER(&duration_ns);
Status prepare_st = Status::OK();
ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(params), prepare_st);
ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(params, _thread_pool.get()),
prepare_st);
if (!prepare_st.ok()) {
query_ctx->cancel(prepare_st, params.fragment_id);
query_ctx->set_execution_dependency_ready();
Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ Status Channel<Parent>::init_stub(RuntimeState* state) {

template <typename Parent>
Status Channel<Parent>::open(RuntimeState* state) {
if (_is_local) {
auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr(
_fragment_instance_id, _dest_node_id, &_local_recvr);
if (!st.ok()) {
// Recvr not found. Maybe downstream task is finished already.
LOG(INFO) << "Recvr is not found : " << st.to_string();
}
}
_be_number = state->be_number();
_brpc_request = std::make_shared<PTransmitDataParams>();
// initialize brpc request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String AUTO_BROADCAST_JOIN_THRESHOLD = "auto_broadcast_join_threshold";

public static final String PARALLEL_PREPARE_THRESHOLD = "parallel_prepare_threshold";

public static final String ENABLE_PROJECTION = "enable_projection";

public static final String ENABLE_SHORT_CIRCUIT_QUERY = "enable_short_circuit_query";
Expand Down Expand Up @@ -1010,7 +1012,7 @@ public class SessionVariable implements Serializable, Writable {

@VariableMgr.VarAttr(name = PARALLEL_SCAN_MIN_ROWS_PER_SCANNER, fuzzy = true,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private long parallelScanMinRowsPerScanner = 16384; // 16K
private long parallelScanMinRowsPerScanner = 2097152; // 16K

@VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
Expand Down Expand Up @@ -1053,6 +1055,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = AUTO_BROADCAST_JOIN_THRESHOLD)
public double autoBroadcastJoinThreshold = 0.8;

@VariableMgr.VarAttr(name = PARALLEL_PREPARE_THRESHOLD, fuzzy = true)
public int parallelPrepareThreshold = 32;

@VariableMgr.VarAttr(name = ENABLE_COST_BASED_JOIN_REORDER)
private boolean enableJoinReorderBasedCost = false;

Expand Down Expand Up @@ -2109,6 +2114,7 @@ public void initFuzzyModeVariables() {
Random random = new SecureRandom();
this.parallelExecInstanceNum = random.nextInt(8) + 1;
this.parallelPipelineTaskNum = random.nextInt(8);
this.parallelPrepareThreshold = random.nextInt(32) + 1;
this.enableCommonExprPushdown = random.nextBoolean();
this.enableLocalExchange = random.nextBoolean();
// This will cause be dead loop, disable it first
Expand Down Expand Up @@ -3529,6 +3535,7 @@ public TQueryOptions toThrift() {
tResult.setNumScannerThreads(numScannerThreads);
tResult.setScannerScaleUpRatio(scannerScaleUpRatio);
tResult.setMaxColumnReaderNum(maxColumnReaderNum);
tResult.setParallelPrepareThreshold(parallelPrepareThreshold);

// TODO chenhao, reservation will be calculated by cost
tResult.setMinReservation(0);
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ struct TQueryOptions {

125: optional bool enable_segment_cache = true;

132: optional i32 parallel_prepare_threshold = 0;
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
Expand Down

0 comments on commit 5137841

Please sign in to comment.