Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Dec 20, 2023
1 parent 5819485 commit 50c325d
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 20 deletions.
3 changes: 1 addition & 2 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ Status RuntimeFilterMgr::get_consume_filters(const int filter_id,
std::lock_guard<std::mutex> l(_lock);
auto iter = _consumer_map.find(key);
if (iter == _consumer_map.end()) {
return Status::InvalidArgument("unknown filter: {}, role: CONSUMER. stack trace: {}", key,
get_stack_trace());
return Status::InvalidArgument("unknown filter: {}, role: CONSUMER.", key);
}
for (auto& holder : iter->second) {
consumer_filters.emplace_back(holder.filter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends
return join;
}
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
if (ctx.getSessionVariable().isIgnoreStorageDataDistribution()) {
// BITMAP filter is not supported to merge. So we disable this kind of runtime filter
// if IgnoreStorageDataDistribution is enabled.
return join;
}

if ((ctx.getSessionVariable().getRuntimeFilterType() & TRuntimeFilterType.BITMAP.getValue()) != 0) {
generateBitMapRuntimeFilterForNLJ(join, ctx);
Expand Down Expand Up @@ -363,7 +368,8 @@ private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? exte
List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
.filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
.collect(Collectors.toList());
if (ctx.getSessionVariable().isIgnoreScanDistribution()) {
if (ctx.getSessionVariable().isIgnoreStorageDataDistribution()) {
// If storage data distribution is ignored, we use BLOOM filter.
legalTypes.clear();
legalTypes.add(TRuntimeFilterType.BLOOM);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1297,7 +1297,7 @@ public int getNumInstances() {
return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
}
if (ConnectContext.get().getSessionVariable().getEnablePipelineXEngine()
&& ConnectContext.get().getSessionVariable().isIgnoreScanDistribution()) {
&& ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) {
return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
}
return scanRangeLocations.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,9 +720,9 @@ public boolean shouldDisableSharedScan(ConnectContext context) {
|| getShouldColoScan();
}

public boolean ignoreScanDistribution(ConnectContext context) {
public boolean ignoreStorageDataDistribution(ConnectContext context) {
return !isKeySearch() && context != null
&& context.getSessionVariable().isIgnoreScanDistribution()
&& context.getSessionVariable().isIgnoreStorageDataDistribution()
&& context.getSessionVariable().getEnablePipelineXEngine()
&& !fragment.isHasColocateFinalizeAggNode()
&& !fragment.isHasNullAwareLeftAntiJoin();
Expand Down
12 changes: 6 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -2010,7 +2010,7 @@ private void computeFragmentHosts() throws Exception {
// 4. Disable shared scan optimization by session variable
boolean sharedScan = true;
if (node.isPresent() && (!node.get().shouldDisableSharedScan(context)
|| (node.get().ignoreScanDistribution(context) && useNereids))) {
|| (node.get().ignoreStorageDataDistribution(context) && useNereids))) {
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
Expand Down Expand Up @@ -2837,9 +2837,9 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc
BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);

boolean ignoreScanDistribution = scanNodes.stream().filter(scanNode -> {
boolean ignoreStorageDataDistribution = scanNodes.stream().filter(scanNode -> {
return scanNodeIds.contains(scanNode.getId().asInt());
}).allMatch(node -> node.ignoreScanDistribution(context)) && useNereids;
}).allMatch(node -> node.ignoreStorageDataDistribution(context)) && useNereids;

// 1. count each node in one fragment should scan how many tablet, gather them in one list
Map<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressToScanRanges
Expand Down Expand Up @@ -2870,7 +2870,7 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc
Map<Integer, List<TScanRangeParams>> range
= findOrInsert(assignment, addressScanRange.getKey(), new HashMap<>());

if (ignoreScanDistribution) {
if (ignoreStorageDataDistribution) {
FInstanceExecParam instanceParam = new FInstanceExecParam(
null, addressScanRange.getKey(), 0, params);

Expand Down Expand Up @@ -2926,8 +2926,8 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc
}
}
}
params.parallelTasksNum = ignoreScanDistribution ? 1 : params.instanceExecParams.size();
params.ignoreDataDistribution = ignoreScanDistribution;
params.parallelTasksNum = ignoreStorageDataDistribution ? 1 : params.instanceExecParams.size();
params.ignoreDataDistribution = ignoreStorageDataDistribution;
}

private final Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdTobucketSeqToScanRangeMap = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_PIPELINE_X_ENGINE = "enable_pipeline_x_engine";

public static final String ENABLE_SHARED_SCAN = "enable_shared_scan";
public static final String IGNORE_SCAN_DISTRIBUTION = "ignore_scan_distribution";

public static final String IGNORE_STORAGE_DATA_DISTRIBUTION = "ignore_storage_data_distribution";

public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle";

Expand Down Expand Up @@ -785,9 +786,9 @@ public class SessionVariable implements Serializable, Writable {
needForward = true)
private boolean enableSharedScan = false;

@VariableMgr.VarAttr(name = IGNORE_SCAN_DISTRIBUTION, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
needForward = true)
private boolean ignoreScanDistribution = true;
@VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private boolean ignoreStorageDataDistribution = false;

@VariableMgr.VarAttr(
name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
Expand Down Expand Up @@ -3173,11 +3174,11 @@ public boolean isMaterializedViewRewriteEnableContainForeignTable() {
return materializedViewRewriteEnableContainForeignTable;
}

public boolean isIgnoreScanDistribution() {
return ignoreScanDistribution && getEnablePipelineXEngine() && enableLocalShuffle;
public boolean isIgnoreStorageDataDistribution() {
return ignoreStorageDataDistribution && getEnablePipelineXEngine() && enableLocalShuffle;
}

public void setIgnoreScanDistribution(boolean ignoreScanDistribution) {
this.ignoreScanDistribution = ignoreScanDistribution;
public void setIgnoreStorageDataDistribution(boolean ignoreStorageDataDistribution) {
this.ignoreStorageDataDistribution = ignoreStorageDataDistribution;
}
}

0 comments on commit 50c325d

Please sign in to comment.