Skip to content

Commit

Permalink
[refactor](partitioner) refine get channel id logics (apache#33765)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Apr 18, 2024
1 parent d7c472f commit 048bc99
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 12 deletions.
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (_part_type == TPartitionType::HASH_PARTITIONED) {
RETURN_IF_ERROR(channel_add_rows(
state, local_state.channels, local_state._partition_count,
(uint32_t*)local_state._partitioner->get_channel_ids(), rows, block, eos));
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
} else {
RETURN_IF_ERROR(channel_add_rows(
state, local_state.channel_shared_ptrs, local_state._partition_count,
(uint32_t*)local_state._partitioner->get_channel_ids(), rows, block, eos));
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
}
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
// check out of limit
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
: _partitioner(partitioner) {}

int get_partition(vectorized::Block* block, int position) {
uint32_t* partition_ids = (uint32_t*)_partitioner->get_channel_ids();
return partition_ids[position];
return _partitioner->get_channel_ids().get<uint32_t>()[position];
}

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
}

std::vector<uint32_t> partition_indexes[_partition_count];
auto* channel_ids = reinterpret_cast<uint32_t*>(local_state._partitioner->get_channel_ids());
auto* channel_ids = local_state._partitioner->get_channel_ids().get<uint32_t>();
for (uint32_t i = 0; i != rows; ++i) {
partition_indexes[channel_ids[i]].emplace_back(i);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
}
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
SCOPED_TIMER(_partition_shuffle_timer);
auto* channel_ids = reinterpret_cast<uint32_t*>(_partitioner->get_channel_ids());
auto* channel_ids = _partitioner->get_channel_ids().get<uint32_t>();

auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
std::vector<uint32_t> partition_indices;
Expand Down Expand Up @@ -293,7 +293,7 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,

auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
SCOPED_TIMER(_partition_shuffle_timer);
auto* channel_ids = reinterpret_cast<uint32_t*>(_partitioner->get_channel_ids());
auto* channel_ids = _partitioner->get_channel_ids().get<uint32_t>();
std::vector<uint32_t> partition_indexes[p._partition_count];
DCHECK_LT(begin, end);
for (size_t i = begin; i != end; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block,
{
SCOPED_TIMER(local_state._distribute_timer);
RETURN_IF_ERROR(_split_rows(state,
(const uint32_t*)local_state._partitioner->get_channel_ids(),
local_state._partitioner->get_channel_ids().get<uint32_t>(),
in_block, eos, local_state));
}

Expand Down
17 changes: 15 additions & 2 deletions be/src/vec/runtime/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ class MemTracker;

namespace vectorized {

struct ChannelField {
const void* channel_id;
const uint32_t len;

template <typename T>
const T* get() const {
CHECK_EQ(sizeof(T), len) << " sizeof(T): " << sizeof(T) << " len: " << len;
return reinterpret_cast<const T*>(channel_id);
}
};

class PartitionerBase {
public:
PartitionerBase(size_t partition_count) : _partition_count(partition_count) {}
Expand All @@ -40,7 +51,7 @@ class PartitionerBase {
virtual Status do_partitioning(RuntimeState* state, Block* block,
MemTracker* mem_tracker) const = 0;

virtual void* get_channel_ids() const = 0;
virtual ChannelField get_channel_ids() const = 0;

virtual Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) = 0;

Expand All @@ -67,7 +78,9 @@ class Partitioner : public PartitionerBase {
Status do_partitioning(RuntimeState* state, Block* block,
MemTracker* mem_tracker) const override;

void* get_channel_ids() const override { return _hash_vals.data(); }
ChannelField get_channel_ids() const override {
return {_hash_vals.data(), sizeof(HashValueType)};
}

protected:
Status _get_partition_column_result(Block* block, std::vector<int>& result) const {
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -739,11 +739,11 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
}
if (_part_type == TPartitionType::HASH_PARTITIONED) {
RETURN_IF_ERROR(channel_add_rows(state, _channels, _partition_count,
(uint64_t*)_partitioner->get_channel_ids(), rows,
_partitioner->get_channel_ids().get<uint64_t>(), rows,
block, _enable_pipeline_exec ? eos : false));
} else {
RETURN_IF_ERROR(channel_add_rows(state, _channel_shared_ptrs, _partition_count,
(uint32_t*)_partitioner->get_channel_ids(), rows,
_partitioner->get_channel_ids().get<uint32_t>(), rows,
block, _enable_pipeline_exec ? eos : false));
}
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
Expand Down

0 comments on commit 048bc99

Please sign in to comment.