Skip to content

Commit

Permalink
support distinct agg
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Sep 19, 2023
1 parent f5d6799 commit b0b4f19
Show file tree
Hide file tree
Showing 17 changed files with 500 additions and 54 deletions.
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <string>

#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/streaming_aggregation_sink_operator.h"
#include "runtime/primitive_type.h"
Expand Down Expand Up @@ -946,10 +947,12 @@ Status AggSinkLocalState<DependencyType, Derived>::close(RuntimeState* state) {
}

class StreamingAggSinkLocalState;
class DistinctStreamingAggSinkLocalState;

template class AggSinkOperatorX<BlockingAggSinkLocalState>;
template class AggSinkOperatorX<StreamingAggSinkLocalState>;
template class AggSinkOperatorX<DistinctStreamingAggSinkLocalState>;
template class AggSinkLocalState<AggDependency, BlockingAggSinkLocalState>;
template class AggSinkLocalState<AggDependency, StreamingAggSinkLocalState>;

template class AggSinkLocalState<AggDependency, DistinctStreamingAggSinkLocalState>;
} // namespace doris::pipeline
1 change: 1 addition & 0 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
template <typename DependencyType, typename Derived>
friend class AggSinkLocalState;
friend class StreamingAggSinkLocalState;
friend class DistinctStreamingAggSinkLocalState;
std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
bool _can_short_circuit = false;

Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/aggregation_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class AggLocalState : public PipelineXLocalState<AggDependency> {
protected:
friend class AggSourceOperatorX;
friend class StreamingAggSourceOperatorX;
friend class DistinctStreamingAggSourceOperatorX;

void _close_without_key();
void _close_with_serialized_key();
Expand Down
179 changes: 179 additions & 0 deletions be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <gen_cpp/Metrics_types.h>

#include <memory>
#include <utility>

#include "common/compiler_util.h" // IWYU pragma: keep
Expand Down Expand Up @@ -94,4 +95,182 @@ OperatorPtr DistinctStreamingAggSinkOperatorBuilder::build_operator() {
return std::make_shared<DistinctStreamingAggSinkOperator>(this, _node, _data_queue);
}

DistinctStreamingAggSinkLocalState::DistinctStreamingAggSinkLocalState(
DataSinkOperatorXBase* parent, RuntimeState* state)
: AggSinkLocalState<AggDependency, DistinctStreamingAggSinkLocalState>(parent, state) {
dummy_mapped_data = std::make_shared<char>('A');
}

Status DistinctStreamingAggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
return Status::OK();
}

Status DistinctStreamingAggSinkLocalState::_distinct_pre_agg_with_serialized_key(
doris::vectorized::Block* in_block, doris::vectorized::Block* out_block) {
SCOPED_TIMER(_build_timer);
DCHECK(!_shared_state->probe_expr_ctxs.empty());

size_t key_size = _shared_state->probe_expr_ctxs.size();
vectorized::ColumnRawPtrs key_columns(key_size);
{
SCOPED_TIMER(_expr_timer);
for (size_t i = 0; i < key_size; ++i) {
int result_column_id = -1;
RETURN_IF_ERROR(
_shared_state->probe_expr_ctxs[i]->execute(in_block, &result_column_id));
in_block->get_by_position(result_column_id).column =
in_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
key_columns[i] = in_block->get_by_position(result_column_id).column.get();
}
}

int rows = in_block->rows();
_distinct_row.clear();
_distinct_row.reserve(rows);

RETURN_IF_CATCH_EXCEPTION(
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows));

bool mem_reuse = _dependency->make_nullable_keys().empty() && out_block->mem_reuse();
if (mem_reuse) {
for (int i = 0; i < key_size; ++i) {
auto dst = out_block->get_by_position(i).column->assume_mutable();
key_columns[i]->append_data_by_selector(dst, _distinct_row);
}
} else {
vectorized::ColumnsWithTypeAndName columns_with_schema;
for (int i = 0; i < key_size; ++i) {
auto distinct_column = key_columns[i]->clone_empty();
key_columns[i]->append_data_by_selector(distinct_column, _distinct_row);
columns_with_schema.emplace_back(
std::move(distinct_column),
_shared_state->probe_expr_ctxs[i]->root()->data_type(),
_shared_state->probe_expr_ctxs[i]->root()->expr_name());
}
out_block->swap(vectorized::Block(columns_with_schema));
}
return Status::OK();
}

void DistinctStreamingAggSinkLocalState::_emplace_into_hash_table_to_distinct(
vectorized::IColumn::Selector& distinct_row, vectorized::ColumnRawPtrs& key_columns,
const size_t num_rows) {
std::visit(
[&](auto&& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using HashTableType = std::decay_t<decltype(agg_method.data)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns, _shared_state->probe_key_sz, nullptr);
_pre_serialize_key_if_need(state, agg_method, key_columns, num_rows);

if constexpr (HashTableTraits<HashTableType>::is_phmap) {
auto keys = state.get_keys(num_rows);
if (_hash_values.size() < num_rows) {
_hash_values.resize(num_rows);
}

for (size_t i = 0; i < num_rows; ++i) {
_hash_values[i] = agg_method.data.hash(keys[i]);
}
SCOPED_TIMER(_hash_table_emplace_timer);
for (size_t i = 0; i < num_rows; ++i) {
if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) {
agg_method.data.prefetch_by_hash(
_hash_values[i + HASH_MAP_PREFETCH_DIST]);
}
auto result = state.emplace_with_key(
agg_method.data, state.pack_key_holder(keys[i], *_agg_arena_pool),
_hash_values[i], i);
if (result.is_inserted()) {
distinct_row.push_back(i);
}
}
} else {
SCOPED_TIMER(_hash_table_emplace_timer);
for (size_t i = 0; i < num_rows; ++i) {
auto result = state.emplace_key(agg_method.data, i, *_agg_arena_pool);
if (result.is_inserted()) {
result.set_mapped(dummy_mapped_data.get());
distinct_row.push_back(i);
}
}
}
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
},
_agg_data->method_variant);
}

DistinctStreamingAggSinkOperatorX::DistinctStreamingAggSinkOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
const DescriptorTbl& descs)
: AggSinkOperatorX<DistinctStreamingAggSinkLocalState>(pool, tnode, descs) {}

bool DistinctStreamingAggSinkOperatorX::can_write(RuntimeState* state) {
// sink and source in diff threads
return state->get_sink_local_state(id())
->cast<DistinctStreamingAggSinkLocalState>()
._shared_state->data_queue->has_enough_space_to_push();
}

Status DistinctStreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(AggSinkOperatorX<DistinctStreamingAggSinkLocalState>::init(tnode, state));
_name = "DISTINCT_STREAMING_AGGREGATION_SINK_OPERATOR";
return Status::OK();
}

Status DistinctStreamingAggSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) {
auto& local_state =
state->get_sink_local_state(id())->cast<DistinctStreamingAggSinkLocalState>();
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
local_state._shared_state->input_num_rows += in_block->rows();
Status ret = Status::OK();
if (in_block && in_block->rows() > 0) {
if (local_state._output_block == nullptr) {
local_state._output_block = local_state._shared_state->data_queue->get_free_block();
}
RETURN_IF_ERROR(local_state._distinct_pre_agg_with_serialized_key(
in_block, local_state._output_block.get()));

// get enough data or reached limit rows, need push block to queue
if (_limit != -1 &&
(local_state._output_block->rows() + local_state._output_distinct_rows) >= _limit) {
auto limit_rows = _limit - local_state._output_distinct_rows;
local_state._output_block->set_num_rows(limit_rows);
local_state._output_distinct_rows += limit_rows;
local_state._shared_state->data_queue->push_block(std::move(local_state._output_block));
} else if (local_state._output_block->rows() >= state->batch_size()) {
local_state._output_distinct_rows += local_state._output_block->rows();
local_state._shared_state->data_queue->push_block(std::move(local_state._output_block));
}
}

// reach limit or source finish
if ((UNLIKELY(source_state == SourceState::FINISHED)) ||
(_limit != -1 && local_state._output_distinct_rows >= _limit)) {
if (local_state._output_block != nullptr) { //maybe the last block with eos
local_state._output_distinct_rows += local_state._output_block->rows();
local_state._shared_state->data_queue->push_block(std::move(local_state._output_block));
}
local_state._shared_state->data_queue->set_finish();
return Status::Error<ErrorCode::END_OF_FILE>(""); // need given finish signal
}
return Status::OK();
}

Status DistinctStreamingAggSinkLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
if (_shared_state->data_queue && !_shared_state->data_queue->is_finish()) {
// finish should be set, if not set here means error.
_shared_state->data_queue->set_canceled();
}
return Base::close(state);
}

} // namespace doris::pipeline
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
#include <cstdint>
#include <memory>

#include "aggregation_sink_operator.h"
#include "common/status.h"
#include "operator.h"
#include "pipeline/exec/aggregation_sink_operator.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
#include "vec/exec/distinct_vaggregation_node.h"
Expand Down Expand Up @@ -72,5 +74,44 @@ class DistinctStreamingAggSinkOperator final
std::unique_ptr<vectorized::Block> _output_block = vectorized::Block::create_unique();
};

class DistinctStreamingAggSinkOperatorX;

class DistinctStreamingAggSinkLocalState final
: public AggSinkLocalState<AggDependency, DistinctStreamingAggSinkLocalState> {
public:
using Parent = DistinctStreamingAggSinkOperatorX;
using Base = AggSinkLocalState<AggDependency, DistinctStreamingAggSinkLocalState>;
ENABLE_FACTORY_CREATOR(DistinctStreamingAggSinkLocalState);
DistinctStreamingAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state);

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status close(RuntimeState* state) override;
Status _distinct_pre_agg_with_serialized_key(vectorized::Block* in_block,
vectorized::Block* out_block);

private:
friend class DistinctStreamingAggSinkOperatorX;
void _emplace_into_hash_table_to_distinct(vectorized::IColumn::Selector& distinct_row,
vectorized::ColumnRawPtrs& key_columns,
const size_t num_rows);

std::unique_ptr<vectorized::Block> _output_block = vectorized::Block::create_unique();
std::shared_ptr<char> dummy_mapped_data = nullptr;
vectorized::IColumn::Selector _distinct_row;
int64_t _output_distinct_rows = 0;
};

class DistinctStreamingAggSinkOperatorX final
: public AggSinkOperatorX<DistinctStreamingAggSinkLocalState> {
public:
DistinctStreamingAggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;

bool can_write(RuntimeState* state) override;
};

} // namespace pipeline
} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,53 @@ OperatorPtr DistinctStreamingAggSourceOperatorBuilder::build_operator() {
return std::make_shared<DistinctStreamingAggSourceOperator>(this, _node, _data_queue);
}

DistinctStreamingAggSourceOperatorX::DistinctStreamingAggSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
const DescriptorTbl& descs)
: Base(pool, tnode, descs) {
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
if (_is_streaming_preagg) {
DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping";
DCHECK(_limit == -1) << "Preaggs have no limits";
}
} else {
_is_streaming_preagg = false;
}
}

Status DistinctStreamingAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
auto& local_state = state->get_local_state(id())->cast<AggLocalState>();
SCOPED_TIMER(local_state.profile()->total_time_counter());
std::unique_ptr<vectorized::Block> agg_block;
RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block));
if (agg_block != nullptr) {
block->swap(*agg_block);
agg_block->clear_column_data(block->columns());
local_state._shared_state->data_queue->push_free_block(std::move(agg_block));
}

local_state._dependency->_make_nullable_output_key(block);
if (_is_streaming_preagg == false) {
// dispose the having clause, should not be execute in prestreaming agg
RETURN_IF_ERROR(
vectorized::VExprContext::filter_block(_conjuncts, block, block->columns()));
}

if (UNLIKELY(local_state._shared_state->data_queue->data_exhausted())) {
source_state = SourceState::FINISHED;
} else {
source_state = SourceState::DEPEND_ON_SOURCE;
}
return Status::OK();
}

Status DistinctStreamingAggSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(Base::init(tnode, state));
_op_name = "DISTINCT_STREAMING_AGGREGATION_OPERATOR";
return Status::OK();
}

} // namespace pipeline
} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "common/status.h"
#include "operator.h"
#include "pipeline/exec/aggregation_source_operator.h"
#include "vec/exec/distinct_vaggregation_node.h"
#include "vec/exec/vaggregation_node.h"

Expand Down Expand Up @@ -63,5 +64,19 @@ class DistinctStreamingAggSourceOperator final
std::shared_ptr<DataQueue> _data_queue;
};

class DistinctStreamingAggSourceOperatorX final : public AggSourceOperatorX {
public:
using Base = AggSourceOperatorX;
DistinctStreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs);
~DistinctStreamingAggSourceOperatorX() = default;

Status init(const TPlanNode& tnode, RuntimeState* state) override;

Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
bool _is_streaming_preagg = false;
};

} // namespace pipeline
} // namespace doris
8 changes: 4 additions & 4 deletions be/src/pipeline/exec/partition_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
if (local_state._num_partition > config::partition_topn_partition_threshold &&
local_state.child_input_rows < 10000 * local_state._num_partition) {
{
std::lock_guard<std::mutex> lock(local_state._shared_state->_buffer_mutex);
local_state._shared_state->_blocks_buffer.push(std::move(*input_block));
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);
local_state._shared_state->blocks_buffer.push(std::move(*input_block));
// buffer have data, source could read this.
local_state._dependency->set_ready_for_read();
}
Expand All @@ -133,7 +133,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
_vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first,
_child_x->row_desc(), state, i == 0 ? local_state._profile : nullptr,
_has_global_limit, _partition_inner_limit, _top_n_algorithm,
local_state._shared_state->_previous_row.get());
local_state._shared_state->previous_row.get());

DCHECK(_child_x->row_desc().num_materialized_slots() ==
local_state._value_places[i]->blocks.back()->columns());
Expand All @@ -143,7 +143,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
}
sorter->init_profile(local_state._profile);
RETURN_IF_ERROR(sorter->prepare_for_read());
local_state._shared_state->_partition_sorts.push_back(std::move(sorter));
local_state._shared_state->partition_sorts.push_back(std::move(sorter));
}

COUNTER_SET(local_state._hash_table_size_counter, int64_t(local_state._num_partition));
Expand Down
Loading

0 comments on commit b0b4f19

Please sign in to comment.