diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 0a89d6f5afb0dfb..7a0218c7aa8d729 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1578,6 +1578,7 @@ private void computeFragmentExecParams() throws Exception { } // process bucket shuffle join on fragment without scan node TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0); + Set hostSet = new HashSet<>(); while (bucketSeq < bucketNum) { TPlanFragmentDestination dest = new TPlanFragmentDestination(); @@ -1585,10 +1586,13 @@ private void computeFragmentExecParams() throws Exception { dest.server = dummyServer; dest.setBrpcServer(dummyServer); - int parallelTasksNum = destParams.ignoreDataDistribution - ? destParams.parallelTasksNum : destParams.instanceExecParams.size(); - for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) { + for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) { FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); + if (destParams.ignoreDataDistribution + && hostSet.contains(instanceExecParams.host)) { + continue; + } + hostSet.add(instanceExecParams.host); if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) { dest.fragment_instance_id = instanceExecParams.instanceId; dest.server = toRpcHost(instanceExecParams.host); @@ -1623,10 +1627,14 @@ private void computeFragmentExecParams() throws Exception { } }); } else { + Set hostSet = new HashSet<>(); // add destination host to this fragment's destination - int parallelTasksNum = destParams.ignoreDataDistribution - ? destParams.parallelTasksNum : destParams.instanceExecParams.size(); - for (int j = 0; j < parallelTasksNum; ++j) { + for (int j = 0; j < destParams.instanceExecParams.size(); ++j) { + if (destParams.ignoreDataDistribution + && hostSet.contains(destParams.instanceExecParams.get(j).host)) { + continue; + } + hostSet.add(destParams.instanceExecParams.get(j).host); TPlanFragmentDestination dest = new TPlanFragmentDestination(); dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); @@ -1691,6 +1699,7 @@ private void computeMultiCastFragmentParams() throws Exception { } // process bucket shuffle join on fragment without scan node TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0); + Set hostSet = new HashSet<>(); while (bucketSeq < bucketNum) { TPlanFragmentDestination dest = new TPlanFragmentDestination(); @@ -1698,10 +1707,13 @@ private void computeMultiCastFragmentParams() throws Exception { dest.server = dummyServer; dest.setBrpcServer(dummyServer); - int parallelTasksNum = destParams.ignoreDataDistribution - ? destParams.parallelTasksNum : destParams.instanceExecParams.size(); - for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) { + for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) { FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); + if (destParams.ignoreDataDistribution + && hostSet.contains(instanceExecParams.host)) { + continue; + } + hostSet.add(instanceExecParams.host); if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) { dest.fragment_instance_id = instanceExecParams.instanceId; dest.server = toRpcHost(instanceExecParams.host); @@ -1736,7 +1748,13 @@ private void computeMultiCastFragmentParams() throws Exception { } }); } else { + Set hostSet = new HashSet<>(); for (int j = 0; j < destParams.instanceExecParams.size(); ++j) { + if (destParams.ignoreDataDistribution + && hostSet.contains(destParams.instanceExecParams.get(j).host)) { + continue; + } + hostSet.add(destParams.instanceExecParams.get(j).host); TPlanFragmentDestination dest = new TPlanFragmentDestination(); dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 182cfc4424aff3d..dac5004250e3bcc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -788,7 +788,7 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, needForward = true) - private boolean ignoreStorageDataDistribution = false; + private boolean ignoreStorageDataDistribution = true; @VariableMgr.VarAttr( name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,