Skip to content

Commit

Permalink
[Bug](partition-topn) fix partition-topn calculate partition input ro…
Browse files Browse the repository at this point in the history
…ws have error (#39100)

1. fix the _sorted_partition_input_rows calculate have error, it's
should only update the rows which have been emplace into hash table, not
include the rows which is pass through.

2. add some counter in profile could get some info of about input/output
rows have been do partition-topn.
  • Loading branch information
zhangstar333 authored Aug 10, 2024
1 parent 947397e commit 0e9951f
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 11 deletions.
12 changes: 8 additions & 4 deletions be/src/pipeline/exec/partition_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
_emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime");
_passthrough_rows_counter = ADD_COUNTER(_profile, "PassThroughRowsCounter", TUnit::UNIT);
_sorted_partition_input_rows_counter =
ADD_COUNTER(_profile, "SortedPartitionInputRows", TUnit::UNIT);
_partition_sort_info = std::make_shared<PartitionSortInfo>(
&_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first,
p._child_x->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit,
Expand Down Expand Up @@ -173,7 +175,6 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
SCOPED_TIMER(local_state.exec_time_counter());
if (current_rows > 0) {
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows());
local_state.child_input_rows = local_state.child_input_rows + current_rows;
if (UNLIKELY(_partition_exprs_num == 0)) {
if (UNLIKELY(local_state._value_places.empty())) {
local_state._value_places.push_back(_pool->add(new PartitionBlocks(
Expand All @@ -185,10 +186,9 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
//if is TWO_PHASE_GLOBAL, must be sort all data thought partition num threshold have been exceeded.
if (_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL &&
local_state._num_partition > config::partition_topn_partition_threshold &&
local_state.child_input_rows < 10000 * local_state._num_partition) {
local_state._sorted_partition_input_rows < 10000 * local_state._num_partition) {
{
COUNTER_UPDATE(local_state._passthrough_rows_counter,
(int64_t)input_block->rows());
COUNTER_UPDATE(local_state._passthrough_rows_counter, (int64_t)current_rows);
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);
local_state._shared_state->blocks_buffer.push(std::move(*input_block));
// buffer have data, source could read this.
Expand All @@ -198,6 +198,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
RETURN_IF_ERROR(_split_block_by_partition(input_block, local_state, eos));
RETURN_IF_CANCELLED(state);
input_block->clear_column_data();
local_state._sorted_partition_input_rows =
local_state._sorted_partition_input_rows + current_rows;
}
}
}
Expand All @@ -220,6 +222,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
}

COUNTER_SET(local_state._hash_table_size_counter, int64_t(local_state._num_partition));
COUNTER_SET(local_state._sorted_partition_input_rows_counter,
local_state._sorted_partition_input_rows);
//so all data from child have sink completed
{
std::unique_lock<std::mutex> lc(local_state._shared_state->sink_eos_lock);
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/partition_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState<PartitionSort
// Expressions and parameters used for build _sort_description
vectorized::VSortExecExprs _vsort_exec_exprs;
vectorized::VExprContextSPtrs _partition_expr_ctxs;
int64_t child_input_rows = 0;
int64_t _sorted_partition_input_rows = 0;
std::vector<PartitionDataPtr> _value_places;
int _num_partition = 0;
std::vector<const vectorized::IColumn*> _partition_columns;
Expand All @@ -238,6 +238,7 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState<PartitionSort
RuntimeProfile::Counter* _selector_block_timer = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::Counter* _passthrough_rows_counter = nullptr;
RuntimeProfile::Counter* _sorted_partition_input_rows_counter = nullptr;
Status _init_hash_method();
};

Expand Down
8 changes: 5 additions & 3 deletions be/src/pipeline/exec/partition_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Status PartitionSortSourceLocalState::init(RuntimeState* state, LocalStateInfo&
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
_get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
_sorted_partition_output_rows_counter =
ADD_COUNTER(profile(), "SortedPartitionOutputRows", TUnit::UNIT);
return Status::OK();
}

Expand Down Expand Up @@ -57,7 +59,7 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
}
if (!output_block->empty()) {
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
COUNTER_UPDATE(local_state.rows_returned_counter(), output_block->rows());
local_state._num_rows_returned += output_block->rows();
}
return Status::OK();
}
Expand All @@ -79,7 +81,7 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
}
if (!output_block->empty()) {
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
COUNTER_UPDATE(local_state.rows_returned_counter(), output_block->rows());
local_state._num_rows_returned += output_block->rows();
}
return Status::OK();
}
Expand All @@ -98,7 +100,7 @@ Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state,
//current sort have eos, so get next idx
auto rows = local_state._shared_state->partition_sorts[local_state._sort_idx]
->get_output_rows();
local_state._num_rows_returned += rows;
COUNTER_UPDATE(local_state._sorted_partition_output_rows_counter, rows);
local_state._shared_state->partition_sorts[local_state._sort_idx].reset(nullptr);
local_state._sort_idx++;
}
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/partition_sort_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ class PartitionSortSourceLocalState final
ENABLE_FACTORY_CREATOR(PartitionSortSourceLocalState);
using Base = PipelineXLocalState<PartitionSortNodeSharedState>;
PartitionSortSourceLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<PartitionSortNodeSharedState>(state, parent),
_get_sorted_timer(nullptr) {}
: PipelineXLocalState<PartitionSortNodeSharedState>(state, parent) {}

Status init(RuntimeState* state, LocalStateInfo& info) override;

private:
friend class PartitionSortSourceOperatorX;
RuntimeProfile::Counter* _get_sorted_timer;
RuntimeProfile::Counter* _get_sorted_timer = nullptr;
RuntimeProfile::Counter* _sorted_partition_output_rows_counter = nullptr;
std::atomic<int> _sort_idx = 0;
};

Expand Down

0 comments on commit 0e9951f

Please sign in to comment.