Skip to content

Commit

Permalink
[bug](pipelineX) Fix pipelineX bug on multiple BE
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Dec 20, 2023
1 parent 280a01b commit 0c73ccf
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
36 changes: 27 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1578,17 +1578,21 @@ private void computeFragmentExecParams() throws Exception {
}
// process bucket shuffle join on fragment without scan node
TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);
Set<TNetworkAddress> hostSet = new HashSet<>();
while (bucketSeq < bucketNum) {
TPlanFragmentDestination dest = new TPlanFragmentDestination();

dest.fragment_instance_id = new TUniqueId(-1, -1);
dest.server = dummyServer;
dest.setBrpcServer(dummyServer);

int parallelTasksNum = destParams.ignoreDataDistribution
? destParams.parallelTasksNum : destParams.instanceExecParams.size();
for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) {
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
if (destParams.ignoreDataDistribution
&& hostSet.contains(instanceExecParams.host)) {
continue;
}
hostSet.add(instanceExecParams.host);
if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
Expand Down Expand Up @@ -1623,10 +1627,14 @@ private void computeFragmentExecParams() throws Exception {
}
});
} else {
Set<TNetworkAddress> hostSet = new HashSet<>();
// add destination host to this fragment's destination
int parallelTasksNum = destParams.ignoreDataDistribution
? destParams.parallelTasksNum : destParams.instanceExecParams.size();
for (int j = 0; j < parallelTasksNum; ++j) {
for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
if (destParams.ignoreDataDistribution
&& hostSet.contains(destParams.instanceExecParams.get(j).host)) {
continue;
}
hostSet.add(destParams.instanceExecParams.get(j).host);
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
Expand Down Expand Up @@ -1691,17 +1699,21 @@ private void computeMultiCastFragmentParams() throws Exception {
}
// process bucket shuffle join on fragment without scan node
TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);
Set<TNetworkAddress> hostSet = new HashSet<>();
while (bucketSeq < bucketNum) {
TPlanFragmentDestination dest = new TPlanFragmentDestination();

dest.fragment_instance_id = new TUniqueId(-1, -1);
dest.server = dummyServer;
dest.setBrpcServer(dummyServer);

int parallelTasksNum = destParams.ignoreDataDistribution
? destParams.parallelTasksNum : destParams.instanceExecParams.size();
for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) {
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
if (destParams.ignoreDataDistribution
&& hostSet.contains(instanceExecParams.host)) {
continue;
}
hostSet.add(instanceExecParams.host);
if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
Expand Down Expand Up @@ -1736,7 +1748,13 @@ private void computeMultiCastFragmentParams() throws Exception {
}
});
} else {
Set<TNetworkAddress> hostSet = new HashSet<>();
for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
if (destParams.ignoreDataDistribution
&& hostSet.contains(destParams.instanceExecParams.get(j).host)) {
continue;
}
hostSet.add(destParams.instanceExecParams.get(j).host);
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ public class SessionVariable implements Serializable, Writable {

@VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private boolean ignoreStorageDataDistribution = false;
private boolean ignoreStorageDataDistribution = true;

@VariableMgr.VarAttr(
name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
Expand Down

0 comments on commit 0c73ccf

Please sign in to comment.