Skip to content

Commit

Permalink
[pipelineX](fix) Fix correctness problem due to local hash shuffle (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Jan 11, 2024
1 parent 0b606a4 commit 4d6d938
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
using Base = DataSinkOperatorX<LocalExchangeSinkLocalState>;
LocalExchangeSinkOperatorX(int sink_id, int dest_id, int num_partitions,
const std::vector<TExpr>& texprs,
const std::map<int, int>& bucket_seq_to_instance_idx)
const std::map<int, int>& bucket_seq_to_instance_idx,
const std::map<int, int>& shuffle_idx_to_instance_idx)
: Base(sink_id, dest_id, dest_id),
_num_partitions(num_partitions),
_texprs(texprs),
_bucket_seq_to_instance_idx(bucket_seq_to_instance_idx) {}
_bucket_seq_to_instance_idx(bucket_seq_to_instance_idx),
_shuffle_idx_to_instance_idx(shuffle_idx_to_instance_idx) {}

Status init(const TPlanNode& tnode, RuntimeState* state) override {
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
Expand Down Expand Up @@ -143,6 +145,7 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
const std::vector<TExpr>& _texprs;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
const std::map<int, int> _bucket_seq_to_instance_idx;
const std::map<int, int> _shuffle_idx_to_instance_idx;
};

} // namespace doris::pipeline
8 changes: 5 additions & 3 deletions be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,16 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes());
new_block_wrapper->ref(_num_partitions);
if (get_type() == ExchangeType::HASH_SHUFFLE) {
auto map = local_state._parent->cast<LocalExchangeSinkOperatorX>()
._shuffle_idx_to_instance_idx;
for (size_t i = 0; i < _num_partitions; i++) {
size_t start = local_state._partition_rows_histogram[i];
size_t size = local_state._partition_rows_histogram[i + 1] - start;
if (size > 0) {
local_state._shared_state->add_mem_usage(
i, new_block_wrapper->data_block.allocated_bytes(), false);
data_queue[i].enqueue({new_block_wrapper, {row_idx, start, size}});
local_state._shared_state->set_ready_to_read(i);
map[i], new_block_wrapper->data_block.allocated_bytes(), false);
data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}});
local_state._shared_state->set_ready_to_read(map[i]);
} else {
new_block_wrapper->unref(local_state._shared_state);
}
Expand Down
38 changes: 24 additions & 14 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
return Status::InternalError("Already prepared");
}
_num_instances = request.local_params.size();
_total_instances = request.__isset.total_instances ? request.total_instances : _num_instances;
_runtime_profile.reset(new RuntimeProfile("PipelineContext"));
_prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
SCOPED_TIMER(_prepare_timer);
Expand Down Expand Up @@ -235,8 +236,9 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
static_cast<void>(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
}
if (_enable_local_shuffle()) {
RETURN_IF_ERROR(
_plan_local_exchange(request.num_buckets, request.bucket_seq_to_instance_idx));
RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets,
request.bucket_seq_to_instance_idx,
request.shuffle_idx_to_instance_idx));
}
// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
Expand All @@ -254,7 +256,8 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
}

Status PipelineXFragmentContext::_plan_local_exchange(
int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx) {
int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
const std::map<int, int>& shuffle_idx_to_instance_idx) {
for (int pip_idx = _pipelines.size() - 1; pip_idx >= 0; pip_idx--) {
_pipelines[pip_idx]->init_data_distribution();
// Set property if child pipeline is not join operator's child.
Expand All @@ -274,6 +277,7 @@ Status PipelineXFragmentContext::_plan_local_exchange(
? _num_instances
: num_buckets,
pip_idx, _pipelines[pip_idx], bucket_seq_to_instance_idx,
shuffle_idx_to_instance_idx,
_pipelines[pip_idx]->operator_xs().front()->ignore_data_hash_distribution()));
}
return Status::OK();
Expand All @@ -282,6 +286,7 @@ Status PipelineXFragmentContext::_plan_local_exchange(
Status PipelineXFragmentContext::_plan_local_exchange(
int num_buckets, int pip_idx, PipelinePtr pip,
const std::map<int, int>& bucket_seq_to_instance_idx,
const std::map<int, int>& shuffle_idx_to_instance_idx,
const bool ignore_data_hash_distribution) {
int idx = 1;
bool do_local_exchange = false;
Expand All @@ -294,7 +299,8 @@ Status PipelineXFragmentContext::_plan_local_exchange(
RETURN_IF_ERROR(_add_local_exchange(
pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
ops[idx]->required_data_distribution(), &do_local_exchange, num_buckets,
bucket_seq_to_instance_idx, ignore_data_hash_distribution));
bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx,
ignore_data_hash_distribution));
}
if (do_local_exchange) {
// If local exchange is needed for current operator, we will split this pipeline to
Expand All @@ -311,7 +317,8 @@ Status PipelineXFragmentContext::_plan_local_exchange(
RETURN_IF_ERROR(_add_local_exchange(
pip_idx, idx, pip->sink_x()->node_id(), _runtime_state->obj_pool(), pip,
pip->sink_x()->required_data_distribution(), &do_local_exchange, num_buckets,
bucket_seq_to_instance_idx, ignore_data_hash_distribution));
bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx,
ignore_data_hash_distribution));
}
return Status::OK();
}
Expand Down Expand Up @@ -713,16 +720,17 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
const std::map<int, int>& bucket_seq_to_instance_idx,
const std::map<int, int>& shuffle_idx_to_instance_idx,
const bool ignore_data_hash_distribution) {
auto& operator_xs = cur_pipe->operator_xs();
const auto downstream_pipeline_id = cur_pipe->id();
auto local_exchange_id = next_operator_id();
// 1. Create a new pipeline with local exchange sink.
DataSinkOperatorXPtr sink;
auto sink_id = next_sink_operator_id();
sink.reset(new LocalExchangeSinkOperatorX(sink_id, local_exchange_id, _num_instances,
data_distribution.partition_exprs,
bucket_seq_to_instance_idx));
sink.reset(new LocalExchangeSinkOperatorX(
sink_id, local_exchange_id, _total_instances, data_distribution.partition_exprs,
bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
RETURN_IF_ERROR(new_pip->set_sink(sink));
RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, num_buckets));

Expand All @@ -731,7 +739,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
switch (data_distribution.distribution_type) {
case ExchangeType::HASH_SHUFFLE:
shared_state->exchanger = ShuffleExchanger::create_unique(
std::max(cur_pipe->num_tasks(), _num_instances), _num_instances);
std::max(cur_pipe->num_tasks(), _num_instances), _total_instances);
break;
case ExchangeType::BUCKET_HASH_SHUFFLE:
shared_state->exchanger = BucketShuffleExchanger::create_unique(
Expand Down Expand Up @@ -826,7 +834,9 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
Status PipelineXFragmentContext::_add_local_exchange(
int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
const std::map<int, int>& bucket_seq_to_instance_idx, const bool ignore_data_distribution) {
const std::map<int, int>& bucket_seq_to_instance_idx,
const std::map<int, int>& shuffle_idx_to_instance_idx,
const bool ignore_data_distribution) {
DCHECK(_enable_local_shuffle());
if (_num_instances <= 1) {
return Status::OK();
Expand All @@ -840,9 +850,9 @@ Status PipelineXFragmentContext::_add_local_exchange(
auto& operator_xs = cur_pipe->operator_xs();
auto total_op_num = operator_xs.size();
auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
RETURN_IF_ERROR(_add_local_exchange_impl(idx, pool, cur_pipe, new_pip, data_distribution,
do_local_exchange, num_buckets,
bucket_seq_to_instance_idx, ignore_data_distribution));
RETURN_IF_ERROR(_add_local_exchange_impl(
idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, ignore_data_distribution));

CHECK(total_op_num + 1 == cur_pipe->operator_xs().size() + new_pip->operator_xs().size())
<< "total_op_num: " << total_op_num
Expand All @@ -855,7 +865,7 @@ Status PipelineXFragmentContext::_add_local_exchange(
RETURN_IF_ERROR(_add_local_exchange_impl(
new_pip->operator_xs().size(), pool, new_pip, add_pipeline(new_pip, pip_idx + 2),
DataDistribution(ExchangeType::PASSTHROUGH), do_local_exchange, num_buckets,
bucket_seq_to_instance_idx, ignore_data_distribution));
bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, ignore_data_distribution));
}
return Status::OK();
}
Expand Down
8 changes: 7 additions & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,15 @@ class PipelineXFragmentContext : public PipelineFragmentContext {
PipelinePtr cur_pipe, DataDistribution data_distribution,
bool* do_local_exchange, int num_buckets,
const std::map<int, int>& bucket_seq_to_instance_idx,
const std::map<int, int>& shuffle_idx_to_instance_idx,
const bool ignore_data_distribution);
void _inherit_pipeline_properties(const DataDistribution& data_distribution,
PipelinePtr pipe_with_source, PipelinePtr pipe_with_sink);
Status _add_local_exchange_impl(int idx, ObjectPool* pool, PipelinePtr cur_pipe,
PipelinePtr new_pipe, DataDistribution data_distribution,
bool* do_local_exchange, int num_buckets,
const std::map<int, int>& bucket_seq_to_instance_idx,
const std::map<int, int>& shuffle_idx_to_instance_idx,
const bool ignore_data_distribution);

[[nodiscard]] Status _build_pipelines(ObjectPool* pool,
Expand Down Expand Up @@ -160,9 +162,11 @@ class PipelineXFragmentContext : public PipelineFragmentContext {
RuntimeState* state, DescriptorTbl& desc_tbl,
PipelineId cur_pipeline_id);
Status _plan_local_exchange(int num_buckets,
const std::map<int, int>& bucket_seq_to_instance_idx);
const std::map<int, int>& bucket_seq_to_instance_idx,
const std::map<int, int>& shuffle_idx_to_instance_idx);
Status _plan_local_exchange(int num_buckets, int pip_idx, PipelinePtr pip,
const std::map<int, int>& bucket_seq_to_instance_idx,
const std::map<int, int>& shuffle_idx_to_instance_idx,
const bool ignore_data_distribution);

bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
Expand Down Expand Up @@ -239,6 +243,8 @@ class PipelineXFragmentContext : public PipelineFragmentContext {
std::vector<std::unique_ptr<RuntimeState>> _task_runtime_states;

std::vector<std::unique_ptr<RuntimeFilterParamsContext>> _runtime_filter_states;

int _total_instances = -1;
};

} // namespace pipeline
Expand Down
14 changes: 13 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1611,6 +1611,7 @@ private void computeFragmentExecParams() throws Exception {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
instanceExecParams.recvrId = params.destinations.size();
break;
}
}
Expand All @@ -1630,6 +1631,7 @@ private void computeFragmentExecParams() throws Exception {
destHosts.put(param.host, param);
param.buildHashTableForBroadcastJoin = true;
TPlanFragmentDestination dest = new TPlanFragmentDestination();
param.recvrId = params.destinations.size();
dest.fragment_instance_id = param.instanceId;
try {
dest.server = toRpcHost(param.host);
Expand All @@ -1653,6 +1655,7 @@ private void computeFragmentExecParams() throws Exception {
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
dest.setBrpcServer(toBrpcHost(destParams.instanceExecParams.get(j).host));
destParams.instanceExecParams.get(j).recvrId = params.destinations.size();
params.destinations.add(dest);
}
}
Expand Down Expand Up @@ -1732,6 +1735,7 @@ private void computeMultiCastFragmentParams() throws Exception {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
instanceExecParams.recvrId = params.destinations.size();
break;
}
}
Expand All @@ -1752,6 +1756,7 @@ private void computeMultiCastFragmentParams() throws Exception {
param.buildHashTableForBroadcastJoin = true;
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = param.instanceId;
param.recvrId = params.destinations.size();
try {
dest.server = toRpcHost(param.host);
dest.setBrpcServer(toBrpcHost(param.host));
Expand All @@ -1773,6 +1778,7 @@ private void computeMultiCastFragmentParams() throws Exception {
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
dest.brpc_server = toBrpcHost(destParams.instanceExecParams.get(j).host);
destParams.instanceExecParams.get(j).recvrId = params.destinations.size();
destinations.add(dest);
}
}
Expand Down Expand Up @@ -3755,22 +3761,26 @@ Map<TNetworkAddress, TPipelineFragmentParams> toTPipelineParams(int backendNum)
params.setFileScanParams(fileScanRangeParamsMap);
params.setNumBuckets(fragment.getBucketNum());
params.setPerNodeSharedScans(perNodeSharedScans);
params.setTotalInstances(instanceExecParams.size());
if (ignoreDataDistribution) {
params.setParallelInstances(parallelTasksNum);
}
res.put(instanceExecParam.host, params);
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer, Integer>());
res.get(instanceExecParam.host).setShuffleIdxToInstanceIdx(new HashMap<Integer, Integer>());
instanceIdx.put(instanceExecParam.host, 0);
}
// Set each bucket belongs to which instance on this BE.
// This is used for LocalExchange(BUCKET_HASH_SHUFFLE).
int instanceId = instanceIdx.get(instanceExecParam.host);

for (int bucket : instanceExecParam.bucketSeqSet) {
res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, instanceId);

}
instanceIdx.replace(instanceExecParam.host, ++instanceId);
TPipelineFragmentParams params = res.get(instanceExecParam.host);
res.get(instanceExecParam.host).getShuffleIdxToInstanceIdx().put(instanceExecParam.recvrId,
params.getLocalParams().size());
TPipelineInstanceParams localParams = new TPipelineInstanceParams();

localParams.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin);
Expand Down Expand Up @@ -3919,6 +3929,8 @@ static class FInstanceExecParam {

boolean buildHashTableForBroadcastJoin = false;

int recvrId = -1;

List<TUniqueId> instancesSharingHashTable = Lists.newArrayList();

public void addBucketSeq(int bucketSeq) {
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,8 @@ struct TPipelineFragmentParams {
35: optional map<i32, i32> bucket_seq_to_instance_idx
36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
37: optional i32 parallel_instances
38: optional i32 total_instances
39: optional map<i32, i32> shuffle_idx_to_instance_idx

// For cloud
1000: optional bool is_mow_table;
Expand Down

0 comments on commit 4d6d938

Please sign in to comment.