From 51ba957fd6b274886d89ad28b6c8c4899bd5bdba Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Fri, 20 Sep 2024 15:06:21 +0800 Subject: [PATCH] [improve](partition_topn) Add partition threshold check in hash table to control partition nums (#39057) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Proposed changes 1. Add a session variable to control partition_topn partition threshold 2. move the partition threshold check at emplace data to hash table to control partition nums, so get check every rows. this could improve some bad case about 50%+ performance improvement, and some better case before, after move the check in hash table, maybe have performance degradation almost 10%, I think this is within the acceptable result。 --- be/src/common/config.cpp | 2 - be/src/common/config.h | 4 -- .../exec/partition_sort_sink_operator.cpp | 61 ++++++++++++++++--- .../exec/partition_sort_sink_operator.h | 7 ++- .../exec/partition_sort_source_operator.cpp | 6 +- be/src/runtime/runtime_state.h | 12 ++++ .../org/apache/doris/qe/SessionVariable.java | 20 ++++++ gensrc/thrift/PaloInternalService.thrift | 2 + .../jdbc/test_jdbc_query_pg.out | 6 +- .../jdbc/test_jdbc_query_pg.groovy | 2 +- 10 files changed, 98 insertions(+), 24 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index b0b0dd7aafc6c2..58679fbe9b4245 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1115,8 +1115,6 @@ DEFINE_mInt32(publish_version_gap_logging_threshold, "200"); // The secure path with user files, used in the `local` table function. DEFINE_mString(user_files_secure_path, "${DORIS_HOME}"); -DEFINE_Int32(partition_topn_partition_threshold, "1024"); - DEFINE_Int32(fe_expire_duration_seconds, "60"); DEFINE_Int32(grace_shutdown_wait_seconds, "120"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 4dabccc7db3651..56a9357e72e798 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1185,10 +1185,6 @@ DECLARE_mInt32(publish_version_gap_logging_threshold); // The secure path with user files, used in the `local` table function. DECLARE_mString(user_files_secure_path); -// This threshold determines how many partitions will be allocated for window function get topn. -// and if this threshold is exceeded, the remaining data will be pass through to other node directly. -DECLARE_Int32(partition_topn_partition_threshold); - // If fe's frontend info has not been updated for more than fe_expire_duration_seconds, it will be regarded // as an abnormal fe, this will cause be to cancel this fe's related query. DECLARE_Int32(fe_expire_duration_seconds); diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 94c51e160da2a2..3a850a40b13c66 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -17,6 +17,8 @@ #include "partition_sort_sink_operator.h" +#include + #include "common/status.h" #include "partition_sort_source_operator.h" #include "vec/common/hash_table/hash.h" @@ -107,8 +109,13 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo for (size_t i = 0; i < p._partition_expr_ctxs.size(); i++) { RETURN_IF_ERROR(p._partition_expr_ctxs[i]->clone(state, _partition_expr_ctxs[i])); } + _topn_phase = p._topn_phase; _partition_exprs_num = p._partition_exprs_num; _hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize", TUnit::UNIT); + _serialize_key_arena_memory_usage = + _profile->AddHighWaterMarkCounter("SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + _hash_table_memory_usage = + ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "HashTable", TUnit::BYTES, "MemoryUsage", 1); _build_timer = ADD_TIMER(_profile, "HashTableBuildTime"); _selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime"); _emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime"); @@ -119,6 +126,8 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo &_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first, p._child->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit, p._top_n_algorithm, p._topn_phase); + _profile->add_info_string("PartitionTopNPhase", to_string(p._topn_phase)); + _profile->add_info_string("PartitionTopNLimit", std::to_string(p._partition_inner_limit)); RETURN_IF_ERROR(_init_hash_method()); return Status::OK(); } @@ -177,11 +186,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* } local_state._value_places[0]->append_whole_block(input_block, _child->row_desc()); } else { - //just simply use partition num to check - //if is TWO_PHASE_GLOBAL, must be sort all data thought partition num threshold have been exceeded. - if (_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL && - local_state._num_partition > config::partition_topn_partition_threshold && - local_state._sorted_partition_input_rows < 10000 * local_state._num_partition) { + if (local_state._is_need_passthrough) { { COUNTER_UPDATE(local_state._passthrough_rows_counter, (int64_t)current_rows); std::lock_guard lock(local_state._shared_state->buffer_mutex); @@ -193,8 +198,6 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* RETURN_IF_ERROR(_split_block_by_partition(input_block, local_state, eos)); RETURN_IF_CANCELLED(state); input_block->clear_column_data(); - local_state._sorted_partition_input_rows = - local_state._sorted_partition_input_rows + current_rows; } } } @@ -225,6 +228,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._shared_state->sink_eos = true; local_state._dependency->set_ready_to_read(); } + local_state._profile->add_info_string("HasPassThrough", + local_state._is_need_passthrough ? "Yes" : "No"); } return Status::OK(); @@ -245,7 +250,7 @@ Status PartitionSortSinkOperatorX::_split_block_by_partition( } Status PartitionSortSinkOperatorX::_emplace_into_hash_table( - const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block* input_block, + const vectorized::ColumnRawPtrs& key_columns, vectorized::Block* input_block, PartitionSortSinkLocalState& local_state, bool eos) { return std::visit( vectorized::Overload { @@ -280,15 +285,37 @@ Status PartitionSortSinkOperatorX::_emplace_into_hash_table( }; SCOPED_TIMER(local_state._emplace_key_timer); - for (size_t row = 0; row < num_rows; ++row) { + int row = num_rows; + for (row = row - 1; row >= 0 && !local_state._is_need_passthrough; --row) { auto& mapped = agg_method.lazy_emplace(state, row, creator, creator_for_null_key); mapped->add_row_idx(row); + local_state._sorted_partition_input_rows++; + local_state._is_need_passthrough = + local_state.check_whether_need_passthrough(); } for (auto* place : local_state._value_places) { SCOPED_TIMER(local_state._selector_block_timer); RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos)); } + if (local_state._is_need_passthrough) { + { + COUNTER_UPDATE(local_state._passthrough_rows_counter, + (int64_t)(num_rows - row)); + std::lock_guard lock( + local_state._shared_state->buffer_mutex); + // have emplace (num_rows - row) to hashtable, and now have row remaining needed in block; + input_block->set_num_rows(row); + local_state._shared_state->blocks_buffer.push( + std::move(*input_block)); + // buffer have data, source could read this. + local_state._dependency->set_ready_to_read(); + } + } + local_state._serialize_key_arena_memory_usage->set( + (int64_t)local_state._agg_arena_pool->size()); + COUNTER_SET(local_state._hash_table_memory_usage, + (int64_t)agg_method.hash_table->get_buffer_size_in_bytes()); return Status::OK(); }}, local_state._partitioned_data->method_variant); @@ -303,4 +330,20 @@ Status PartitionSortSinkLocalState::_init_hash_method() { return Status::OK(); } +// NOLINTBEGIN(readability-simplify-boolean-expr) +// just simply use partition num to check +// but if is TWO_PHASE_GLOBAL, must be sort all data thought partition num threshold have been exceeded. +// partition_topn_max_partitions default is : 1024 +// partition_topn_per_partition_rows default is : 1000 +bool PartitionSortSinkLocalState::check_whether_need_passthrough() { + if (_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL && + _num_partition > _state->partition_topn_max_partitions() && + _sorted_partition_input_rows < + _state->partition_topn_per_partition_rows() * _num_partition) { + return true; + } + return false; +} +// NOLINTEND(readability-simplify-boolean-expr) + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index e58ac5fea9eaf1..f16df509dca4a0 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -239,6 +239,8 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState _agg_arena_pool; int _partition_exprs_num = 0; std::shared_ptr _partition_sort_info = nullptr; + TPartTopNPhase::type _topn_phase; + bool _is_need_passthrough = false; RuntimeProfile::Counter* _build_timer = nullptr; RuntimeProfile::Counter* _emplace_key_timer = nullptr; @@ -246,7 +248,10 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState { @@ -289,7 +294,7 @@ class PartitionSortSinkOperatorX final : public DataSinkOperatorXpartition_sorts.size()) { RETURN_IF_ERROR(local_state._shared_state->partition_sorts[local_state._sort_idx]->get_next( state, output_block, ¤t_eos)); + COUNTER_UPDATE(local_state._sorted_partition_output_rows_counter, output_block->rows()); } if (current_eos) { - //current sort have eos, so get next idx - auto rows = local_state._shared_state->partition_sorts[local_state._sort_idx] - ->get_output_rows(); - COUNTER_UPDATE(local_state._sorted_partition_output_rows_counter, rows); + // current sort have eos, so get next idx local_state._shared_state->partition_sorts[local_state._sort_idx].reset(nullptr); local_state._sort_idx++; } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index f43d0a163dfdaa..90cf1bc34bdeca 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -490,6 +490,18 @@ class RuntimeState { : 0; } + int partition_topn_max_partitions() const { + return _query_options.__isset.partition_topn_max_partitions + ? _query_options.partition_topn_max_partitions + : 1024; + } + + int partition_topn_per_partition_rows() const { + return _query_options.__isset.partition_topn_pre_partition_rows + ? _query_options.partition_topn_pre_partition_rows + : 1000; + } + int64_t parallel_scan_min_rows_per_scanner() const { return _query_options.__isset.parallel_scan_min_rows_per_scanner ? _query_options.parallel_scan_min_rows_per_scanner diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 94cf0cb3469d65..e4fb6153e3359c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -240,6 +240,8 @@ public class SessionVariable implements Serializable, Writable { public static final String MAX_JOIN_NUMBER_BUSHY_TREE = "max_join_number_bushy_tree"; public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn"; + public static final String PARTITION_TOPN_MAX_PARTITIONS = "partition_topn_max_partitions"; + public static final String PARTITION_TOPN_PER_PARTITION_ROWS = "partition_topn_pre_partition_rows"; public static final String GLOBAL_PARTITION_TOPN_THRESHOLD = "global_partition_topn_threshold"; @@ -1240,6 +1242,22 @@ public void setMaxJoinNumberOfReorder(int maxJoinNumberOfReorder) { @VariableMgr.VarAttr(name = ENABLE_PARTITION_TOPN) private boolean enablePartitionTopN = true; + @VariableMgr.VarAttr(name = PARTITION_TOPN_MAX_PARTITIONS, needForward = true, description = { + "这个阈值决定了partition_topn计算时的最大分区数量,超过这个阈值后且输入总行数少于预估总量,剩余的数据将直接透传给下一个算子", + "This threshold determines how many partitions will be allocated for window function get topn." + + " if this threshold is exceeded and input rows less than the estimated total rows, the remaining" + + " data will be pass through to other node directly." + }) + private int partitionTopNMaxPartitions = 1024; + + @VariableMgr.VarAttr(name = PARTITION_TOPN_PER_PARTITION_ROWS, needForward = true, description = { + "这个数值用于partition_topn预估每个分区的行数,用来计算所有分区的预估数据总量,决定是否能透传下一个算子", + "This value is used for partition_topn to estimate the number of rows in each partition, to calculate " + + " the estimated total amount of data for all partitions, and to determine whether the next operator " + + " can be passed transparently." + }) + private int partitionTopNPerPartitionRows = 1000; + @VariableMgr.VarAttr(name = GLOBAL_PARTITION_TOPN_THRESHOLD) private double globalPartitionTopNThreshold = 100; @@ -3661,6 +3679,8 @@ public TQueryOptions toThrift() { tResult.setBatchSize(batchSize); tResult.setDisableStreamPreaggregations(disableStreamPreaggregations); tResult.setEnableDistinctStreamingAggregation(enableDistinctStreamingAggregation); + tResult.setPartitionTopnMaxPartitions(partitionTopNMaxPartitions); + tResult.setPartitionTopnPrePartitionRows(partitionTopNPerPartitionRows); if (maxScanKeyNum > 0) { tResult.setMaxScanKeyNum(maxScanKeyNum); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 871101c5c35d6b..48f41e8e0ab9f9 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -343,6 +343,8 @@ struct TQueryOptions { 131: optional i32 adaptive_pipeline_task_serial_read_on_limit = 10000; 132: optional i32 parallel_prepare_threshold = 0; + 133: optional i32 partition_topn_max_partitions = 1024; + 134: optional i32 partition_topn_pre_partition_rows = 1000; // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query. diff --git a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out index ec2058ca46e0bd..acfe2adad2ec46 100644 --- a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out +++ b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out @@ -1358,9 +1358,9 @@ true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022 6 -- !sql87 -- -1 3 -2 0 -3 1 +1 0 +2 1 +3 2 -- !sql88 -- 1 diff --git a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy index 492fdeb349be02..623bd6e8932105 100644 --- a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy +++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy @@ -575,7 +575,7 @@ suite("test_jdbc_query_pg", "p0,external,pg,external_docker,external_docker_pg") order_qt_sql84 """ SELECT NULL, NULL INTERSECT SELECT NULL, NULL FROM $jdbcPg14Table1 """ order_qt_sql85 """ SELECT COUNT(*) FROM $jdbcPg14Table1 INTERSECT SELECT COUNT(k8) FROM $jdbcPg14Table1 HAVING SUM(k7) IS NOT NULL """ order_qt_sql86 """ SELECT k8 FROM $jdbcPg14Table1 WHERE k8 < 7 EXCEPT SELECT k8 FROM $jdbcPg14Table1 WHERE k8 > 21 """ - order_qt_sql87 """ SELECT row_number() OVER (PARTITION BY k7) rn, k8 FROM $jdbcPg14Table1 LIMIT 3 """ + order_qt_sql87 """ SELECT row_number() OVER (PARTITION BY k7 order by k8) rn, k8 FROM $jdbcPg14Table1 LIMIT 3 """ order_qt_sql88 """ SELECT row_number() OVER (PARTITION BY k7 ORDER BY k8) rn FROM $jdbcPg14Table1 LIMIT 3 """ order_qt_sql89 """ SELECT row_number() OVER (ORDER BY k8) rn FROM $jdbcPg14Table1 LIMIT 3 """ order_qt_sql90 """ SELECT row_number() OVER () FROM $jdbcPg14Table1 as a JOIN ${dorisExTable1} as b ON a.k8 = b.id WHERE a.k8 > 111 LIMIT 2 """