Skip to content

Commit

Permalink
[improve](partition_topn) Add partition threshold check in hash table…
Browse files Browse the repository at this point in the history
… to control partition nums (#39057)

## Proposed changes
1.  Add a session variable to control partition_topn partition threshold
2. move the partition threshold check at emplace data to hash table to
control partition nums, so get check every rows.
this could improve some bad case about 50%+ performance improvement, and
some better case before, after move the check in hash table, maybe have
performance degradation almost 10%, I think this is within the
acceptable result。

<!--Describe your changes.-->
  • Loading branch information
zhangstar333 authored Sep 20, 2024
1 parent 066de31 commit 51ba957
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 24 deletions.
2 changes: 0 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1115,8 +1115,6 @@ DEFINE_mInt32(publish_version_gap_logging_threshold, "200");
// The secure path with user files, used in the `local` table function.
DEFINE_mString(user_files_secure_path, "${DORIS_HOME}");

DEFINE_Int32(partition_topn_partition_threshold, "1024");

DEFINE_Int32(fe_expire_duration_seconds, "60");

DEFINE_Int32(grace_shutdown_wait_seconds, "120");
Expand Down
4 changes: 0 additions & 4 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1185,10 +1185,6 @@ DECLARE_mInt32(publish_version_gap_logging_threshold);
// The secure path with user files, used in the `local` table function.
DECLARE_mString(user_files_secure_path);

// This threshold determines how many partitions will be allocated for window function get topn.
// and if this threshold is exceeded, the remaining data will be pass through to other node directly.
DECLARE_Int32(partition_topn_partition_threshold);

// If fe's frontend info has not been updated for more than fe_expire_duration_seconds, it will be regarded
// as an abnormal fe, this will cause be to cancel this fe's related query.
DECLARE_Int32(fe_expire_duration_seconds);
Expand Down
61 changes: 52 additions & 9 deletions be/src/pipeline/exec/partition_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "partition_sort_sink_operator.h"

#include <cstdint>

#include "common/status.h"
#include "partition_sort_source_operator.h"
#include "vec/common/hash_table/hash.h"
Expand Down Expand Up @@ -107,8 +109,13 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
for (size_t i = 0; i < p._partition_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._partition_expr_ctxs[i]->clone(state, _partition_expr_ctxs[i]));
}
_topn_phase = p._topn_phase;
_partition_exprs_num = p._partition_exprs_num;
_hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize", TUnit::UNIT);
_serialize_key_arena_memory_usage =
_profile->AddHighWaterMarkCounter("SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
_hash_table_memory_usage =
ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "HashTable", TUnit::BYTES, "MemoryUsage", 1);
_build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
_selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
_emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime");
Expand All @@ -119,6 +126,8 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
&_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first,
p._child->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit,
p._top_n_algorithm, p._topn_phase);
_profile->add_info_string("PartitionTopNPhase", to_string(p._topn_phase));
_profile->add_info_string("PartitionTopNLimit", std::to_string(p._partition_inner_limit));
RETURN_IF_ERROR(_init_hash_method());
return Status::OK();
}
Expand Down Expand Up @@ -177,11 +186,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
}
local_state._value_places[0]->append_whole_block(input_block, _child->row_desc());
} else {
//just simply use partition num to check
//if is TWO_PHASE_GLOBAL, must be sort all data thought partition num threshold have been exceeded.
if (_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL &&
local_state._num_partition > config::partition_topn_partition_threshold &&
local_state._sorted_partition_input_rows < 10000 * local_state._num_partition) {
if (local_state._is_need_passthrough) {
{
COUNTER_UPDATE(local_state._passthrough_rows_counter, (int64_t)current_rows);
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);
Expand All @@ -193,8 +198,6 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
RETURN_IF_ERROR(_split_block_by_partition(input_block, local_state, eos));
RETURN_IF_CANCELLED(state);
input_block->clear_column_data();
local_state._sorted_partition_input_rows =
local_state._sorted_partition_input_rows + current_rows;
}
}
}
Expand Down Expand Up @@ -225,6 +228,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._shared_state->sink_eos = true;
local_state._dependency->set_ready_to_read();
}
local_state._profile->add_info_string("HasPassThrough",
local_state._is_need_passthrough ? "Yes" : "No");
}

return Status::OK();
Expand All @@ -245,7 +250,7 @@ Status PartitionSortSinkOperatorX::_split_block_by_partition(
}

Status PartitionSortSinkOperatorX::_emplace_into_hash_table(
const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block* input_block,
const vectorized::ColumnRawPtrs& key_columns, vectorized::Block* input_block,
PartitionSortSinkLocalState& local_state, bool eos) {
return std::visit(
vectorized::Overload {
Expand Down Expand Up @@ -280,15 +285,37 @@ Status PartitionSortSinkOperatorX::_emplace_into_hash_table(
};

SCOPED_TIMER(local_state._emplace_key_timer);
for (size_t row = 0; row < num_rows; ++row) {
int row = num_rows;
for (row = row - 1; row >= 0 && !local_state._is_need_passthrough; --row) {
auto& mapped = agg_method.lazy_emplace(state, row, creator,
creator_for_null_key);
mapped->add_row_idx(row);
local_state._sorted_partition_input_rows++;
local_state._is_need_passthrough =
local_state.check_whether_need_passthrough();
}
for (auto* place : local_state._value_places) {
SCOPED_TIMER(local_state._selector_block_timer);
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
}
if (local_state._is_need_passthrough) {
{
COUNTER_UPDATE(local_state._passthrough_rows_counter,
(int64_t)(num_rows - row));
std::lock_guard<std::mutex> lock(
local_state._shared_state->buffer_mutex);
// have emplace (num_rows - row) to hashtable, and now have row remaining needed in block;
input_block->set_num_rows(row);
local_state._shared_state->blocks_buffer.push(
std::move(*input_block));
// buffer have data, source could read this.
local_state._dependency->set_ready_to_read();
}
}
local_state._serialize_key_arena_memory_usage->set(
(int64_t)local_state._agg_arena_pool->size());
COUNTER_SET(local_state._hash_table_memory_usage,
(int64_t)agg_method.hash_table->get_buffer_size_in_bytes());
return Status::OK();
}},
local_state._partitioned_data->method_variant);
Expand All @@ -303,4 +330,20 @@ Status PartitionSortSinkLocalState::_init_hash_method() {
return Status::OK();
}

// NOLINTBEGIN(readability-simplify-boolean-expr)
// just simply use partition num to check
// but if is TWO_PHASE_GLOBAL, must be sort all data thought partition num threshold have been exceeded.
// partition_topn_max_partitions default is : 1024
// partition_topn_per_partition_rows default is : 1000
bool PartitionSortSinkLocalState::check_whether_need_passthrough() {
if (_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL &&
_num_partition > _state->partition_topn_max_partitions() &&
_sorted_partition_input_rows <
_state->partition_topn_per_partition_rows() * _num_partition) {
return true;
}
return false;
}
// NOLINTEND(readability-simplify-boolean-expr)

} // namespace doris::pipeline
7 changes: 6 additions & 1 deletion be/src/pipeline/exec/partition_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,19 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState<PartitionSort
std::unique_ptr<vectorized::Arena> _agg_arena_pool;
int _partition_exprs_num = 0;
std::shared_ptr<PartitionSortInfo> _partition_sort_info = nullptr;
TPartTopNPhase::type _topn_phase;
bool _is_need_passthrough = false;

RuntimeProfile::Counter* _build_timer = nullptr;
RuntimeProfile::Counter* _emplace_key_timer = nullptr;
RuntimeProfile::Counter* _selector_block_timer = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::Counter* _passthrough_rows_counter = nullptr;
RuntimeProfile::Counter* _sorted_partition_input_rows_counter = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr;
Status _init_hash_method();
bool check_whether_need_passthrough();
};

class PartitionSortSinkOperatorX final : public DataSinkOperatorX<PartitionSortSinkLocalState> {
Expand Down Expand Up @@ -289,7 +294,7 @@ class PartitionSortSinkOperatorX final : public DataSinkOperatorX<PartitionSortS
Status _split_block_by_partition(vectorized::Block* input_block,
PartitionSortSinkLocalState& local_state, bool eos);
Status _emplace_into_hash_table(const vectorized::ColumnRawPtrs& key_columns,
const vectorized::Block* input_block,
vectorized::Block* input_block,
PartitionSortSinkLocalState& local_state, bool eos);
};

Expand Down
6 changes: 2 additions & 4 deletions be/src/pipeline/exec/partition_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,10 @@ Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state,
if (local_state._sort_idx < local_state._shared_state->partition_sorts.size()) {
RETURN_IF_ERROR(local_state._shared_state->partition_sorts[local_state._sort_idx]->get_next(
state, output_block, &current_eos));
COUNTER_UPDATE(local_state._sorted_partition_output_rows_counter, output_block->rows());
}
if (current_eos) {
//current sort have eos, so get next idx
auto rows = local_state._shared_state->partition_sorts[local_state._sort_idx]
->get_output_rows();
COUNTER_UPDATE(local_state._sorted_partition_output_rows_counter, rows);
// current sort have eos, so get next idx
local_state._shared_state->partition_sorts[local_state._sort_idx].reset(nullptr);
local_state._sort_idx++;
}
Expand Down
12 changes: 12 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,18 @@ class RuntimeState {
: 0;
}

int partition_topn_max_partitions() const {
return _query_options.__isset.partition_topn_max_partitions
? _query_options.partition_topn_max_partitions
: 1024;
}

int partition_topn_per_partition_rows() const {
return _query_options.__isset.partition_topn_pre_partition_rows
? _query_options.partition_topn_pre_partition_rows
: 1000;
}

int64_t parallel_scan_min_rows_per_scanner() const {
return _query_options.__isset.parallel_scan_min_rows_per_scanner
? _query_options.parallel_scan_min_rows_per_scanner
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String MAX_JOIN_NUMBER_BUSHY_TREE = "max_join_number_bushy_tree";
public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn";
public static final String PARTITION_TOPN_MAX_PARTITIONS = "partition_topn_max_partitions";
public static final String PARTITION_TOPN_PER_PARTITION_ROWS = "partition_topn_pre_partition_rows";

public static final String GLOBAL_PARTITION_TOPN_THRESHOLD = "global_partition_topn_threshold";

Expand Down Expand Up @@ -1240,6 +1242,22 @@ public void setMaxJoinNumberOfReorder(int maxJoinNumberOfReorder) {
@VariableMgr.VarAttr(name = ENABLE_PARTITION_TOPN)
private boolean enablePartitionTopN = true;

@VariableMgr.VarAttr(name = PARTITION_TOPN_MAX_PARTITIONS, needForward = true, description = {
"这个阈值决定了partition_topn计算时的最大分区数量,超过这个阈值后且输入总行数少于预估总量,剩余的数据将直接透传给下一个算子",
"This threshold determines how many partitions will be allocated for window function get topn."
+ " if this threshold is exceeded and input rows less than the estimated total rows, the remaining"
+ " data will be pass through to other node directly."
})
private int partitionTopNMaxPartitions = 1024;

@VariableMgr.VarAttr(name = PARTITION_TOPN_PER_PARTITION_ROWS, needForward = true, description = {
"这个数值用于partition_topn预估每个分区的行数,用来计算所有分区的预估数据总量,决定是否能透传下一个算子",
"This value is used for partition_topn to estimate the number of rows in each partition, to calculate "
+ " the estimated total amount of data for all partitions, and to determine whether the next operator "
+ " can be passed transparently."
})
private int partitionTopNPerPartitionRows = 1000;

@VariableMgr.VarAttr(name = GLOBAL_PARTITION_TOPN_THRESHOLD)
private double globalPartitionTopNThreshold = 100;

Expand Down Expand Up @@ -3661,6 +3679,8 @@ public TQueryOptions toThrift() {
tResult.setBatchSize(batchSize);
tResult.setDisableStreamPreaggregations(disableStreamPreaggregations);
tResult.setEnableDistinctStreamingAggregation(enableDistinctStreamingAggregation);
tResult.setPartitionTopnMaxPartitions(partitionTopNMaxPartitions);
tResult.setPartitionTopnPrePartitionRows(partitionTopNPerPartitionRows);

if (maxScanKeyNum > 0) {
tResult.setMaxScanKeyNum(maxScanKeyNum);
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ struct TQueryOptions {
131: optional i32 adaptive_pipeline_task_serial_read_on_limit = 10000;

132: optional i32 parallel_prepare_threshold = 0;
133: optional i32 partition_topn_max_partitions = 1024;
134: optional i32 partition_topn_pre_partition_rows = 1000;
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1358,9 +1358,9 @@ true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022
6

-- !sql87 --
1 3
2 0
3 1
1 0
2 1
3 2

-- !sql88 --
1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ suite("test_jdbc_query_pg", "p0,external,pg,external_docker,external_docker_pg")
order_qt_sql84 """ SELECT NULL, NULL INTERSECT SELECT NULL, NULL FROM $jdbcPg14Table1 """
order_qt_sql85 """ SELECT COUNT(*) FROM $jdbcPg14Table1 INTERSECT SELECT COUNT(k8) FROM $jdbcPg14Table1 HAVING SUM(k7) IS NOT NULL """
order_qt_sql86 """ SELECT k8 FROM $jdbcPg14Table1 WHERE k8 < 7 EXCEPT SELECT k8 FROM $jdbcPg14Table1 WHERE k8 > 21 """
order_qt_sql87 """ SELECT row_number() OVER (PARTITION BY k7) rn, k8 FROM $jdbcPg14Table1 LIMIT 3 """
order_qt_sql87 """ SELECT row_number() OVER (PARTITION BY k7 order by k8) rn, k8 FROM $jdbcPg14Table1 LIMIT 3 """
order_qt_sql88 """ SELECT row_number() OVER (PARTITION BY k7 ORDER BY k8) rn FROM $jdbcPg14Table1 LIMIT 3 """
order_qt_sql89 """ SELECT row_number() OVER (ORDER BY k8) rn FROM $jdbcPg14Table1 LIMIT 3 """
order_qt_sql90 """ SELECT row_number() OVER () FROM $jdbcPg14Table1 as a JOIN ${dorisExTable1} as b ON a.k8 = b.id WHERE a.k8 > 111 LIMIT 2 """
Expand Down

0 comments on commit 51ba957

Please sign in to comment.