Skip to content

Commit

Permalink
[pipelinX](node) support partition sort node
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Sep 18, 2023
1 parent 08dd53d commit 7212924
Show file tree
Hide file tree
Showing 11 changed files with 818 additions and 24 deletions.
367 changes: 367 additions & 0 deletions be/src/pipeline/exec/partition_sort_sink_operator.cpp

Large diffs are not rendered by default.

93 changes: 90 additions & 3 deletions be/src/pipeline/exec/partition_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@

#include <stdint.h>

#include <cstdint>

#include "operator.h"
#include "pipeline/pipeline_x/operator.h"
#include "vec/common/sort/partition_sorter.h"
#include "vec/exec/vpartition_sort_node.h"

namespace doris {
Expand All @@ -46,9 +50,92 @@ class PartitionSortSinkOperator final : public StreamingOperator<PartitionSortSi
bool can_write() override { return true; }
};

OperatorPtr PartitionSortSinkOperatorBuilder::build_operator() {
return std::make_shared<PartitionSortSinkOperator>(this, _node);
}
class PartitionSortSinkOperatorX;
class PartitionSortSinkLocalState : public PipelineXSinkLocalState<PartitionSortDependency> {
ENABLE_FACTORY_CREATOR(PartitionSortSinkLocalState);

public:
PartitionSortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: PipelineXSinkLocalState<PartitionSortDependency>(parent, state) {}

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;

private:
friend class PartitionSortSinkOperatorX;

// Expressions and parameters used for build _sort_description
vectorized::VSortExecExprs _vsort_exec_exprs;
vectorized::VExprContextSPtrs _partition_expr_ctxs;
int64_t child_input_rows = 0;
std::vector<vectorized::PartitionDataPtr> _value_places;
int _num_partition = 0;
std::vector<const vectorized::IColumn*> _partition_columns;
std::vector<size_t> _hash_values;
std::unique_ptr<vectorized::PartitionedHashMapVariants> _partitioned_data;
std::unique_ptr<vectorized::Arena> _agg_arena_pool;
std::vector<size_t> _partition_key_sz;
int _partition_exprs_num = 0;

RuntimeProfile::Counter* _build_timer;
RuntimeProfile::Counter* _emplace_key_timer;
RuntimeProfile::Counter* _partition_sort_timer;
RuntimeProfile::Counter* _get_sorted_timer;
RuntimeProfile::Counter* _selector_block_timer;

RuntimeProfile::Counter* _hash_table_size_counter;
void _init_hash_method();
};

class PartitionSortSinkOperatorX final : public DataSinkOperatorX<PartitionSortSinkLocalState> {
public:
PartitionSortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
DataSinkOperatorX<PartitionSortSinkLocalState>::_name);
}

Status init(const TPlanNode& tnode, RuntimeState* state) override;

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;

bool can_write(RuntimeState* state) override { return true; }

private:
friend class PartitionSortSinkLocalState;
ObjectPool* _pool;
const RowDescriptor _row_descriptor;
int64_t _limit = -1;
int _partition_exprs_num = 0;
vectorized::VExprContextSPtrs _partition_expr_ctxs;

// Expressions and parameters used for build _sort_description
vectorized::VSortExecExprs _vsort_exec_exprs;
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;
TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER;
bool _has_global_limit = false;
int64_t _partition_inner_limit = 0;

Status _split_block_by_partition(vectorized::Block* input_block, int batch_size,
PartitionSortSinkLocalState& local_state);
void _emplace_into_hash_table(const vectorized::ColumnRawPtrs& key_columns,
const vectorized::Block* input_block, int batch_size,
PartitionSortSinkLocalState& local_state);
template <typename AggState, typename AggMethod>
void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method,
const vectorized::ColumnRawPtrs& key_columns,
const size_t num_rows) {
if constexpr (vectorized::ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
AggState>::value) {
(agg_method.serialize_keys(key_columns, num_rows));
state.set_serialized_keys(agg_method.keys.data());
}
}
};

} // namespace pipeline
} // namespace doris
107 changes: 107 additions & 0 deletions be/src/pipeline/exec/partition_sort_source_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "partition_sort_source_operator.h"

#include "pipeline/exec/operator.h"

namespace doris {
class ExecNode;
class RuntimeState;

namespace pipeline {

OperatorPtr PartitionSortSourceOperatorBuilder::build_operator() {
return std::make_shared<PartitionSortSourceOperator>(this, _node);
}

Status PartitionSortSourceLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
_shared_state->_previous_row = nullptr;
_shared_state->_partition_sorts.clear();
return PipelineXLocalState<PartitionSortDependency>::close(state);
}

Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* output_block,
SourceState& source_state) {
RETURN_IF_CANCELLED(state);
auto& local_state = state->get_local_state(id())->cast<PartitionSortSourceLocalState>();
output_block->clear_column_data();
{
std::lock_guard<std::mutex> lock(local_state._shared_state->_buffer_mutex);
if (local_state._shared_state->_blocks_buffer.empty() == false) {
local_state._shared_state->_blocks_buffer.front().swap(*output_block);
local_state._shared_state->_blocks_buffer.pop();
//if buffer have no data, block reading and wait for signal again
if (local_state._shared_state->_blocks_buffer.empty()) {
local_state._dependency->block_reading();
}
return Status::OK();
}
}

// this is set by sink node using: local_state._dependency->set_ready_for_read()
if (local_state._dependency->is_ready_for_read()) {
bool current_eos = false;
RETURN_IF_ERROR(get_sorted_block(state, output_block, &current_eos, local_state));
}
{
std::lock_guard<std::mutex> lock(local_state._shared_state->_buffer_mutex);
if (local_state._shared_state->_blocks_buffer.empty() &&
local_state._shared_state->_sort_idx >=
local_state._shared_state->_partition_sorts.size()) {
source_state = SourceState::FINISHED;
}
}
return Status::OK();
}

Dependency* PartitionSortSourceOperatorX::wait_for_dependency(RuntimeState* state) {
auto& local_state = state->get_local_state(id())->cast<PartitionSortSourceLocalState>();
return local_state._dependency->read_blocked_by();
}

Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state,
vectorized::Block* output_block,
bool* current_eos,
PartitionSortSourceLocalState& local_state) {
SCOPED_TIMER(local_state._get_sorted_timer);
//sorter output data one by one
if (local_state._shared_state->_sort_idx < local_state._shared_state->_partition_sorts.size()) {
RETURN_IF_ERROR(
local_state._shared_state->_partition_sorts[local_state._shared_state->_sort_idx]
->get_next(state, output_block, current_eos));
}
if (*current_eos) {
//current sort have eos, so get next idx
local_state._shared_state->_previous_row->reset();
auto rows =
local_state._shared_state->_partition_sorts[local_state._shared_state->_sort_idx]
->get_output_rows();
local_state._num_rows_returned += rows;
local_state._shared_state->_partition_sorts[local_state._shared_state->_sort_idx].reset(
nullptr);
local_state._shared_state->_sort_idx++;
}

return Status::OK();
}

} // namespace pipeline
} // namespace doris
51 changes: 48 additions & 3 deletions be/src/pipeline/exec/partition_sort_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "common/status.h"
#include "operator.h"
#include "pipeline/pipeline_x/operator.h"
#include "vec/exec/vpartition_sort_node.h"

namespace doris {
Expand Down Expand Up @@ -48,9 +49,53 @@ class PartitionSortSourceOperator final
Status open(RuntimeState*) override { return Status::OK(); }
};

OperatorPtr PartitionSortSourceOperatorBuilder::build_operator() {
return std::make_shared<PartitionSortSourceOperator>(this, _node);
}
class PartitionSortSourceOperatorX;
class PartitionSortSourceLocalState final : public PipelineXLocalState<PartitionSortDependency> {
ENABLE_FACTORY_CREATOR(PartitionSortSourceLocalState);

public:
using Base = PipelineXLocalState<PartitionSortDependency>;
PartitionSortSourceLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<PartitionSortDependency>(state, parent),
_get_next_timer(nullptr) {}

Status init(RuntimeState* state, LocalStateInfo& info) override {
RETURN_IF_ERROR(PipelineXLocalState<PartitionSortDependency>::init(state, info));
_get_next_timer = ADD_TIMER(profile(), "GetResultTime");
_get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
_shared_state->_previous_row = std::make_unique<vectorized::SortCursorCmp>();
return Status::OK();
}

Status close(RuntimeState* state) override;

int64_t _num_rows_returned = 0;

private:
friend class PartitionSortSourceOperatorX;
RuntimeProfile::Counter* _get_sorted_timer;
RuntimeProfile::Counter* _get_next_timer = nullptr;
};

class PartitionSortSourceOperatorX final : public OperatorX<PartitionSortSourceLocalState> {
public:
using Base = OperatorX<PartitionSortSourceLocalState>;
PartitionSortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: OperatorX<PartitionSortSourceLocalState>(pool, tnode, descs) {}

Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;

Dependency* wait_for_dependency(RuntimeState* state) override;

bool is_source() const override { return true; }

private:
friend class PartitionSortSourceLocalState;
Status get_sorted_block(RuntimeState* state, vectorized::Block* output_block, bool* eos,
PartitionSortSourceLocalState& local_state);
};

} // namespace pipeline
} // namespace doris
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* bl
auto& local_state = state->get_local_state(id())->cast<SortLocalState>();
SCOPED_TIMER(local_state.profile()->total_time_counter());
SCOPED_TIMER(local_state._get_next_timer);
bool eos;
bool eos = false;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
local_state._shared_state->sorter->get_next(state, block, &eos));
if (eos) {
Expand Down
26 changes: 26 additions & 0 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

#pragma once

#include <mutex>

#include "pipeline/exec/data_queue.h"
#include "vec/common/sort/partition_sorter.h"
#include "vec/common/sort/sorter.h"
#include "vec/exec/join/process_hash_table_probe.h"
#include "vec/exec/join/vhash_join_node.h"
#include "vec/exec/vaggregation_node.h"
#include "vec/exec/vanalytic_eval_node.h"
#include "vec/exec/vpartition_sort_node.h"

namespace doris {
namespace pipeline {
Expand Down Expand Up @@ -64,6 +68,8 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
_ready_for_read = true;
}

bool is_ready_for_read() { return _ready_for_read; }

// Notify downstream pipeline tasks this dependency is blocked.
virtual void block_reading() { _ready_for_read = false; }

Expand Down Expand Up @@ -476,5 +482,25 @@ class NestedLoopJoinDependency final : public Dependency {
NestedLoopJoinSharedState _join_state;
};

struct PartitionSortNodeSharedState {
public:
std::queue<vectorized::Block> _blocks_buffer;
std::mutex _buffer_mutex;
std::vector<std::unique_ptr<vectorized::PartitionSorter>> _partition_sorts;
std::unique_ptr<vectorized::SortCursorCmp> _previous_row = nullptr;
int _sort_idx = 0;
};

class PartitionSortDependency final : public Dependency {
public:
using SharedState = PartitionSortNodeSharedState;
PartitionSortDependency(int id) : Dependency(id, "PartitionSortDependency") {}
~PartitionSortDependency() override = default;
void* shared_state() override { return (void*)&_partition_sort_state; };

private:
PartitionSortNodeSharedState _partition_sort_state;
};

} // namespace pipeline
} // namespace doris
6 changes: 6 additions & 0 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
#include "pipeline/exec/repeat_operator.h"
#include "pipeline/exec/result_sink_operator.h"
#include "pipeline/exec/select_operator.h"
Expand Down Expand Up @@ -317,6 +319,7 @@ DECLARE_OPERATOR_X(StreamingAggSinkLocalState)
DECLARE_OPERATOR_X(ExchangeSinkLocalState)
DECLARE_OPERATOR_X(NestedLoopJoinBuildSinkLocalState)
DECLARE_OPERATOR_X(UnionSinkLocalState)
DECLARE_OPERATOR_X(PartitionSortSinkLocalState)

#undef DECLARE_OPERATOR_X

Expand All @@ -332,6 +335,7 @@ DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState)
DECLARE_OPERATOR_X(AssertNumRowsLocalState)
DECLARE_OPERATOR_X(EmptySetLocalState)
DECLARE_OPERATOR_X(UnionSourceLocalState)
DECLARE_OPERATOR_X(PartitionSortSourceLocalState)

#undef DECLARE_OPERATOR_X

Expand All @@ -349,6 +353,7 @@ template class PipelineXSinkLocalState<AnalyticDependency>;
template class PipelineXSinkLocalState<AggDependency>;
template class PipelineXSinkLocalState<FakeDependency>;
template class PipelineXSinkLocalState<UnionDependency>;
template class PipelineXSinkLocalState<PartitionSortDependency>;

template class PipelineXLocalState<HashJoinDependency>;
template class PipelineXLocalState<SortDependency>;
Expand All @@ -357,5 +362,6 @@ template class PipelineXLocalState<AnalyticDependency>;
template class PipelineXLocalState<AggDependency>;
template class PipelineXLocalState<FakeDependency>;
template class PipelineXLocalState<UnionDependency>;
template class PipelineXLocalState<PartitionSortDependency>;

} // namespace doris::pipeline
Loading

0 comments on commit 7212924

Please sign in to comment.