diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 62dafd54849205..404d9095f96a9f 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -115,6 +115,8 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime"); _emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime"); _passthrough_rows_counter = ADD_COUNTER(_profile, "PassThroughRowsCounter", TUnit::UNIT); + _sorted_partition_input_rows_counter = + ADD_COUNTER(_profile, "SortedPartitionInputRows", TUnit::UNIT); _partition_sort_info = std::make_shared( &_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first, p._child_x->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit, @@ -173,7 +175,6 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* SCOPED_TIMER(local_state.exec_time_counter()); if (current_rows > 0) { COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); - local_state.child_input_rows = local_state.child_input_rows + current_rows; if (UNLIKELY(_partition_exprs_num == 0)) { if (UNLIKELY(local_state._value_places.empty())) { local_state._value_places.push_back(_pool->add(new PartitionBlocks( @@ -185,10 +186,9 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* //if is TWO_PHASE_GLOBAL, must be sort all data thought partition num threshold have been exceeded. if (_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL && local_state._num_partition > config::partition_topn_partition_threshold && - local_state.child_input_rows < 10000 * local_state._num_partition) { + local_state._sorted_partition_input_rows < 10000 * local_state._num_partition) { { - COUNTER_UPDATE(local_state._passthrough_rows_counter, - (int64_t)input_block->rows()); + COUNTER_UPDATE(local_state._passthrough_rows_counter, (int64_t)current_rows); std::lock_guard lock(local_state._shared_state->buffer_mutex); local_state._shared_state->blocks_buffer.push(std::move(*input_block)); // buffer have data, source could read this. @@ -198,6 +198,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* RETURN_IF_ERROR(_split_block_by_partition(input_block, local_state, eos)); RETURN_IF_CANCELLED(state); input_block->clear_column_data(); + local_state._sorted_partition_input_rows = + local_state._sorted_partition_input_rows + current_rows; } } } @@ -220,6 +222,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* } COUNTER_SET(local_state._hash_table_size_counter, int64_t(local_state._num_partition)); + COUNTER_SET(local_state._sorted_partition_input_rows_counter, + local_state._sorted_partition_input_rows); //so all data from child have sink completed { std::unique_lock lc(local_state._shared_state->sink_eos_lock); diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index b7e83763f1dd94..25ad0309bdeac8 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -224,7 +224,7 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState _value_places; int _num_partition = 0; std::vector _partition_columns; @@ -238,6 +238,7 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalStateempty()) { COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); - COUNTER_UPDATE(local_state.rows_returned_counter(), output_block->rows()); + local_state._num_rows_returned += output_block->rows(); } return Status::OK(); } @@ -79,7 +81,7 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: } if (!output_block->empty()) { COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); - COUNTER_UPDATE(local_state.rows_returned_counter(), output_block->rows()); + local_state._num_rows_returned += output_block->rows(); } return Status::OK(); } @@ -98,7 +100,7 @@ Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state, //current sort have eos, so get next idx auto rows = local_state._shared_state->partition_sorts[local_state._sort_idx] ->get_output_rows(); - local_state._num_rows_returned += rows; + COUNTER_UPDATE(local_state._sorted_partition_output_rows_counter, rows); local_state._shared_state->partition_sorts[local_state._sort_idx].reset(nullptr); local_state._sort_idx++; } diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h b/be/src/pipeline/exec/partition_sort_source_operator.h index 4b5589c0e8f0cd..1f75e1f49d4cf7 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.h +++ b/be/src/pipeline/exec/partition_sort_source_operator.h @@ -34,14 +34,14 @@ class PartitionSortSourceLocalState final ENABLE_FACTORY_CREATOR(PartitionSortSourceLocalState); using Base = PipelineXLocalState; PartitionSortSourceLocalState(RuntimeState* state, OperatorXBase* parent) - : PipelineXLocalState(state, parent), - _get_sorted_timer(nullptr) {} + : PipelineXLocalState(state, parent) {} Status init(RuntimeState* state, LocalStateInfo& info) override; private: friend class PartitionSortSourceOperatorX; - RuntimeProfile::Counter* _get_sorted_timer; + RuntimeProfile::Counter* _get_sorted_timer = nullptr; + RuntimeProfile::Counter* _sorted_partition_output_rows_counter = nullptr; std::atomic _sort_idx = 0; };