Skip to content

Commit

Permalink
enablePassToOneExchange
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Sep 23, 2024
1 parent 3458574 commit 8cbcfa3
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 2 deletions.
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_tablet_sink_txn_id(sink.tablet_sink_txn_id),
_t_tablet_sink_exprs(&sink.tablet_sink_exprs),
_enable_local_merge_sort(state->enable_local_merge_sort()),
_is_multi_cast(is_multi_cast) {
_is_multi_cast(is_multi_cast),
_enable_pass_to_one_exchange(state->enable_pass_to_one_exchange()) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
Expand Down Expand Up @@ -683,7 +684,7 @@ DataDistribution ExchangeSinkOperatorX::required_data_distribution() const {
sort_source && sort_source->use_local_merge()) {
// Sort the data local
return ExchangeType::LOCAL_MERGE_SORT;
} else if (!_is_multi_cast) {
} else if (!_is_multi_cast && _enable_pass_to_one_exchange) {
return ExchangeType::PASS_TO_ONE_EXCHANGE;
}
}
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
int _writer_count = 1;
const bool _enable_local_merge_sort;
const bool _is_multi_cast;
const bool _enable_pass_to_one_exchange;

int _close_sender_number = 1;
};
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,11 @@ class RuntimeState {
_query_options.enable_local_merge_sort;
}

bool enable_pass_to_one_exchange() const {
return _query_options.__isset.enable_pass_to_one_exchange &&
_query_options.enable_pass_to_one_exchange;
}

int64_t min_revocable_mem() const {
if (_query_options.__isset.min_revocable_mem) {
return std::max(_query_options.min_revocable_mem, (int64_t)1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_LOCAL_MERGE_SORT = "enable_local_merge_sort";

public static final String ENABLE_PASS_TO_ONE_EXCHANGE = "enable_pass_to_one_exchange";

public static final String ENABLE_AGG_STATE = "enable_agg_state";

public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline";
Expand Down Expand Up @@ -1068,6 +1070,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_LOCAL_MERGE_SORT)
private boolean enableLocalMergeSort = true;

@VariableMgr.VarAttr(name = ENABLE_PASS_TO_ONE_EXCHANGE)
private boolean enablePassToOneExchange = false;

@VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
needForward = true)
public boolean enableAggState = false;
Expand Down Expand Up @@ -2215,6 +2220,7 @@ public void initFuzzyModeVariables() {
this.parallelPrepareThreshold = random.nextInt(32) + 1;
this.enableCommonExprPushdown = random.nextBoolean();
this.enableLocalExchange = random.nextBoolean();
this.enablePassToOneExchange = random.nextBoolean();
// This will cause be dead loop, disable it first
// this.disableJoinReorder = random.nextBoolean();
this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
Expand Down Expand Up @@ -3774,6 +3780,7 @@ public TQueryOptions toThrift() {
tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);

tResult.setEnableLocalMergeSort(enableLocalMergeSort);
tResult.setEnablePassToOneExchange(enablePassToOneExchange);
tResult.setEnableParallelResultSink(enableParallelResultSink);
tResult.setEnableShortCircuitQueryAccessColumnStore(enableShortCircuitQueryAcessColumnStore);
tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull);
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ struct TQueryOptions {
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
135: optional bool enable_pass_to_one_exchange = true;
1000: optional bool disable_file_cache = false
}

Expand Down

0 comments on commit 8cbcfa3

Please sign in to comment.