diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp index 48695ed56f0029..81126b84ffff33 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp @@ -55,7 +55,8 @@ Status DistinctStreamingAggSinkOperator::sink(RuntimeState* state, vectorized::B // get enough data or reached limit rows, need push block to queue if (_node->limit() != -1 && (_output_block->rows() + _output_distinct_rows) >= _node->limit()) { - auto limit_rows = _node->limit() - _output_block->rows(); + auto need_cut_rows = (_output_block->rows() + _output_distinct_rows) - _node->limit(); + auto limit_rows = _output_block->rows() - need_cut_rows; _output_block->set_num_rows(limit_rows); _output_distinct_rows += limit_rows; _data_queue->push_block(std::move(_output_block)); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h index ae7106178e65e3..46b1dda008d0e3 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h @@ -63,7 +63,7 @@ class DistinctStreamingAggSinkOperator final Status close(RuntimeState* state) override; bool reached_limited_rows() { - return _node->limit() != -1 && _output_distinct_rows > _node->limit(); + return _node->limit() != -1 && _output_distinct_rows >= _node->limit(); } private: