diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index b0b0dd7aafc6c2..58679fbe9b4245 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1115,8 +1115,6 @@ DEFINE_mInt32(publish_version_gap_logging_threshold, "200"); // The secure path with user files, used in the `local` table function. DEFINE_mString(user_files_secure_path, "${DORIS_HOME}"); -DEFINE_Int32(partition_topn_partition_threshold, "1024"); - DEFINE_Int32(fe_expire_duration_seconds, "60"); DEFINE_Int32(grace_shutdown_wait_seconds, "120"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 4dabccc7db3651..56a9357e72e798 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1185,10 +1185,6 @@ DECLARE_mInt32(publish_version_gap_logging_threshold); // The secure path with user files, used in the `local` table function. DECLARE_mString(user_files_secure_path); -// This threshold determines how many partitions will be allocated for window function get topn. -// and if this threshold is exceeded, the remaining data will be pass through to other node directly. -DECLARE_Int32(partition_topn_partition_threshold); - // If fe's frontend info has not been updated for more than fe_expire_duration_seconds, it will be regarded // as an abnormal fe, this will cause be to cancel this fe's related query. DECLARE_Int32(fe_expire_duration_seconds); diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 94c51e160da2a2..3a850a40b13c66 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -17,6 +17,8 @@ #include "partition_sort_sink_operator.h" +#include + #include "common/status.h" #include "partition_sort_source_operator.h" #include "vec/common/hash_table/hash.h" @@ -107,8 +109,13 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo for (size_t i = 0; i < p._partition_expr_ctxs.size(); i++) { RETURN_IF_ERROR(p._partition_expr_ctxs[i]->clone(state, _partition_expr_ctxs[i])); } + _topn_phase = p._topn_phase; _partition_exprs_num = p._partition_exprs_num; _hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize", TUnit::UNIT); + _serialize_key_arena_memory_usage = + _profile->AddHighWaterMarkCounter("SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + _hash_table_memory_usage = + ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "HashTable", TUnit::BYTES, "MemoryUsage", 1); _build_timer = ADD_TIMER(_profile, "HashTableBuildTime"); _selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime"); _emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime"); @@ -119,6 +126,8 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo &_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first, p._child->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit, p._top_n_algorithm, p._topn_phase); + _profile->add_info_string("PartitionTopNPhase", to_string(p._topn_phase)); + _profile->add_info_string("PartitionTopNLimit", std::to_string(p._partition_inner_limit)); RETURN_IF_ERROR(_init_hash_method()); return Status::OK(); } @@ -177,11 +186,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* } local_state._value_places[0]->append_whole_block(input_block, _child->row_desc()); } else { - //just simply use partition num to check - //if is TWO_PHASE_GLOBAL, must be sort all data thought partition num threshold have been exceeded. - if (_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL && - local_state._num_partition > config::partition_topn_partition_threshold && - local_state._sorted_partition_input_rows < 10000 * local_state._num_partition) { + if (local_state._is_need_passthrough) { { COUNTER_UPDATE(local_state._passthrough_rows_counter, (int64_t)current_rows); std::lock_guard lock(local_state._shared_state->buffer_mutex); @@ -193,8 +198,6 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* RETURN_IF_ERROR(_split_block_by_partition(input_block, local_state, eos)); RETURN_IF_CANCELLED(state); input_block->clear_column_data(); - local_state._sorted_partition_input_rows = - local_state._sorted_partition_input_rows + current_rows; } } } @@ -225,6 +228,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._shared_state->sink_eos = true; local_state._dependency->set_ready_to_read(); } + local_state._profile->add_info_string("HasPassThrough", + local_state._is_need_passthrough ? "Yes" : "No"); } return Status::OK(); @@ -245,7 +250,7 @@ Status PartitionSortSinkOperatorX::_split_block_by_partition( } Status PartitionSortSinkOperatorX::_emplace_into_hash_table( - const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block* input_block, + const vectorized::ColumnRawPtrs& key_columns, vectorized::Block* input_block, PartitionSortSinkLocalState& local_state, bool eos) { return std::visit( vectorized::Overload { @@ -280,15 +285,37 @@ Status PartitionSortSinkOperatorX::_emplace_into_hash_table( }; SCOPED_TIMER(local_state._emplace_key_timer); - for (size_t row = 0; row < num_rows; ++row) { + int row = num_rows; + for (row = row - 1; row >= 0 && !local_state._is_need_passthrough; --row) { auto& mapped = agg_method.lazy_emplace(state, row, creator, creator_for_null_key); mapped->add_row_idx(row); + local_state._sorted_partition_input_rows++; + local_state._is_need_passthrough = + local_state.check_whether_need_passthrough(); } for (auto* place : local_state._value_places) { SCOPED_TIMER(local_state._selector_block_timer); RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos)); } + if (local_state._is_need_passthrough) { + { + COUNTER_UPDATE(local_state._passthrough_rows_counter, + (int64_t)(num_rows - row)); + std::lock_guard lock( + local_state._shared_state->buffer_mutex); + // have emplace (num_rows - row) to hashtable, and now have row remaining needed in block; + input_block->set_num_rows(row); + local_state._shared_state->blocks_buffer.push( + std::move(*input_block)); + // buffer have data, source could read this. + local_state._dependency->set_ready_to_read(); + } + } + local_state._serialize_key_arena_memory_usage->set( + (int64_t)local_state._agg_arena_pool->size()); + COUNTER_SET(local_state._hash_table_memory_usage, + (int64_t)agg_method.hash_table->get_buffer_size_in_bytes()); return Status::OK(); }}, local_state._partitioned_data->method_variant); @@ -303,4 +330,20 @@ Status PartitionSortSinkLocalState::_init_hash_method() { return Status::OK(); } +// NOLINTBEGIN(readability-simplify-boolean-expr) +// just simply use partition num to check +// but if is TWO_PHASE_GLOBAL, must be sort all data thought partition num threshold have been exceeded. +// partition_topn_max_partitions default is : 1024 +// partition_topn_per_partition_rows default is : 1000 +bool PartitionSortSinkLocalState::check_whether_need_passthrough() { + if (_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL && + _num_partition > _state->partition_topn_max_partitions() && + _sorted_partition_input_rows < + _state->partition_topn_per_partition_rows() * _num_partition) { + return true; + } + return false; +} +// NOLINTEND(readability-simplify-boolean-expr) + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index e58ac5fea9eaf1..f16df509dca4a0 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -239,6 +239,8 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState _agg_arena_pool; int _partition_exprs_num = 0; std::shared_ptr _partition_sort_info = nullptr; + TPartTopNPhase::type _topn_phase; + bool _is_need_passthrough = false; RuntimeProfile::Counter* _build_timer = nullptr; RuntimeProfile::Counter* _emplace_key_timer = nullptr; @@ -246,7 +248,10 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState { @@ -289,7 +294,7 @@ class PartitionSortSinkOperatorX final : public DataSinkOperatorXpartition_sorts.size()) { RETURN_IF_ERROR(local_state._shared_state->partition_sorts[local_state._sort_idx]->get_next( state, output_block, ¤t_eos)); + COUNTER_UPDATE(local_state._sorted_partition_output_rows_counter, output_block->rows()); } if (current_eos) { - //current sort have eos, so get next idx - auto rows = local_state._shared_state->partition_sorts[local_state._sort_idx] - ->get_output_rows(); - COUNTER_UPDATE(local_state._sorted_partition_output_rows_counter, rows); + // current sort have eos, so get next idx local_state._shared_state->partition_sorts[local_state._sort_idx].reset(nullptr); local_state._sort_idx++; } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index f43d0a163dfdaa..90cf1bc34bdeca 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -490,6 +490,18 @@ class RuntimeState { : 0; } + int partition_topn_max_partitions() const { + return _query_options.__isset.partition_topn_max_partitions + ? _query_options.partition_topn_max_partitions + : 1024; + } + + int partition_topn_per_partition_rows() const { + return _query_options.__isset.partition_topn_pre_partition_rows + ? _query_options.partition_topn_pre_partition_rows + : 1000; + } + int64_t parallel_scan_min_rows_per_scanner() const { return _query_options.__isset.parallel_scan_min_rows_per_scanner ? _query_options.parallel_scan_min_rows_per_scanner diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 94cf0cb3469d65..e4fb6153e3359c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -240,6 +240,8 @@ public class SessionVariable implements Serializable, Writable { public static final String MAX_JOIN_NUMBER_BUSHY_TREE = "max_join_number_bushy_tree"; public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn"; + public static final String PARTITION_TOPN_MAX_PARTITIONS = "partition_topn_max_partitions"; + public static final String PARTITION_TOPN_PER_PARTITION_ROWS = "partition_topn_pre_partition_rows"; public static final String GLOBAL_PARTITION_TOPN_THRESHOLD = "global_partition_topn_threshold"; @@ -1240,6 +1242,22 @@ public void setMaxJoinNumberOfReorder(int maxJoinNumberOfReorder) { @VariableMgr.VarAttr(name = ENABLE_PARTITION_TOPN) private boolean enablePartitionTopN = true; + @VariableMgr.VarAttr(name = PARTITION_TOPN_MAX_PARTITIONS, needForward = true, description = { + "这个阈值决定了partition_topn计算时的最大分区数量,超过这个阈值后且输入总行数少于预估总量,剩余的数据将直接透传给下一个算子", + "This threshold determines how many partitions will be allocated for window function get topn." + + " if this threshold is exceeded and input rows less than the estimated total rows, the remaining" + + " data will be pass through to other node directly." + }) + private int partitionTopNMaxPartitions = 1024; + + @VariableMgr.VarAttr(name = PARTITION_TOPN_PER_PARTITION_ROWS, needForward = true, description = { + "这个数值用于partition_topn预估每个分区的行数,用来计算所有分区的预估数据总量,决定是否能透传下一个算子", + "This value is used for partition_topn to estimate the number of rows in each partition, to calculate " + + " the estimated total amount of data for all partitions, and to determine whether the next operator " + + " can be passed transparently." + }) + private int partitionTopNPerPartitionRows = 1000; + @VariableMgr.VarAttr(name = GLOBAL_PARTITION_TOPN_THRESHOLD) private double globalPartitionTopNThreshold = 100; @@ -3661,6 +3679,8 @@ public TQueryOptions toThrift() { tResult.setBatchSize(batchSize); tResult.setDisableStreamPreaggregations(disableStreamPreaggregations); tResult.setEnableDistinctStreamingAggregation(enableDistinctStreamingAggregation); + tResult.setPartitionTopnMaxPartitions(partitionTopNMaxPartitions); + tResult.setPartitionTopnPrePartitionRows(partitionTopNPerPartitionRows); if (maxScanKeyNum > 0) { tResult.setMaxScanKeyNum(maxScanKeyNum); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 871101c5c35d6b..48f41e8e0ab9f9 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -343,6 +343,8 @@ struct TQueryOptions { 131: optional i32 adaptive_pipeline_task_serial_read_on_limit = 10000; 132: optional i32 parallel_prepare_threshold = 0; + 133: optional i32 partition_topn_max_partitions = 1024; + 134: optional i32 partition_topn_pre_partition_rows = 1000; // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query. diff --git a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out index ec2058ca46e0bd..acfe2adad2ec46 100644 --- a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out +++ b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out @@ -1358,9 +1358,9 @@ true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022 6 -- !sql87 -- -1 3 -2 0 -3 1 +1 0 +2 1 +3 2 -- !sql88 -- 1 diff --git a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy index 492fdeb349be02..623bd6e8932105 100644 --- a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy +++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy @@ -575,7 +575,7 @@ suite("test_jdbc_query_pg", "p0,external,pg,external_docker,external_docker_pg") order_qt_sql84 """ SELECT NULL, NULL INTERSECT SELECT NULL, NULL FROM $jdbcPg14Table1 """ order_qt_sql85 """ SELECT COUNT(*) FROM $jdbcPg14Table1 INTERSECT SELECT COUNT(k8) FROM $jdbcPg14Table1 HAVING SUM(k7) IS NOT NULL """ order_qt_sql86 """ SELECT k8 FROM $jdbcPg14Table1 WHERE k8 < 7 EXCEPT SELECT k8 FROM $jdbcPg14Table1 WHERE k8 > 21 """ - order_qt_sql87 """ SELECT row_number() OVER (PARTITION BY k7) rn, k8 FROM $jdbcPg14Table1 LIMIT 3 """ + order_qt_sql87 """ SELECT row_number() OVER (PARTITION BY k7 order by k8) rn, k8 FROM $jdbcPg14Table1 LIMIT 3 """ order_qt_sql88 """ SELECT row_number() OVER (PARTITION BY k7 ORDER BY k8) rn FROM $jdbcPg14Table1 LIMIT 3 """ order_qt_sql89 """ SELECT row_number() OVER (ORDER BY k8) rn FROM $jdbcPg14Table1 LIMIT 3 """ order_qt_sql90 """ SELECT row_number() OVER () FROM $jdbcPg14Table1 as a JOIN ${dorisExTable1} as b ON a.k8 = b.id WHERE a.k8 > 111 LIMIT 2 """