Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[only test 2] #41097

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,8 @@ enum class ExchangeType : uint8_t {
PASS_TO_ONE = 6,
// merge all data to one channel.
LOCAL_MERGE_SORT = 7,
// Send all data to the first channel.
PASS_TO_ONE_EXCHANGE = 8,
};

inline std::string get_exchange_type_name(ExchangeType idx) {
Expand All @@ -805,6 +807,8 @@ inline std::string get_exchange_type_name(ExchangeType idx) {
return "PASS_TO_ONE";
case ExchangeType::LOCAL_MERGE_SORT:
return "LOCAL_MERGE_SORT";
case ExchangeType::PASS_TO_ONE_EXCHANGE:
return "PASS_TO_ONE_EXCHANGE";
}
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id)
_instance_to_request[id]->set_node_id(_dest_node_id);
_instance_to_request[id]->set_sender_id(_sender_id);
_instance_to_request[id]->set_be_number(_be_number);
_instance_to_request[id]->set_close_sender_number(_parent->close_sender_number());
}

void ExchangeSinkBuffer::_ended(InstanceLoId id) {
Expand Down
10 changes: 8 additions & 2 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
fmt::format("WaitForLocalExchangeBuffer{}", i), TUnit ::TIME_NS, timer_name, 1));
}
_wait_broadcast_buffer_timer = ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name);
_close_sender_number = p._close_sender_number;
return Status::OK();
}

Expand Down Expand Up @@ -289,7 +290,8 @@ segment_v2::CompressionTypePB ExchangeSinkLocalState::compression_type() const {

ExchangeSinkOperatorX::ExchangeSinkOperatorX(
RuntimeState* state, const RowDescriptor& row_desc, int operator_id,
const TDataStreamSink& sink, const std::vector<TPlanFragmentDestination>& destinations)
const TDataStreamSink& sink, const std::vector<TPlanFragmentDestination>& destinations,
bool is_multi_cast)
: DataSinkOperatorX(operator_id, sink.dest_node_id),
_texprs(sink.output_partition.partition_exprs),
_row_desc(row_desc),
Expand All @@ -303,7 +305,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_tablet_sink_tuple_id(sink.tablet_sink_tuple_id),
_tablet_sink_txn_id(sink.tablet_sink_txn_id),
_t_tablet_sink_exprs(&sink.tablet_sink_exprs),
_enable_local_merge_sort(state->enable_local_merge_sort()) {
_enable_local_merge_sort(state->enable_local_merge_sort()),
_is_multi_cast(is_multi_cast),
_enable_pass_to_one_exchange(state->enable_pass_to_one_exchange()) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
Expand Down Expand Up @@ -680,6 +684,8 @@ DataDistribution ExchangeSinkOperatorX::required_data_distribution() const {
sort_source && sort_source->use_local_merge()) {
// Sort the data local
return ExchangeType::LOCAL_MERGE_SORT;
} else if (!_is_multi_cast && _enable_pass_to_one_exchange) {
return ExchangeType::PASS_TO_ONE_EXCHANGE;
}
}
return DataSinkOperatorX<ExchangeSinkLocalState>::required_data_distribution();
Expand Down
14 changes: 13 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
int current_channel_idx; // index of current channel to send to if _random == true
bool only_local_exchange;

int close_sender_number() const { return _close_sender_number; }

// for external table sink hash partition
std::unique_ptr<vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>
scale_writer_partitioning_exchanger;
Expand Down Expand Up @@ -200,13 +202,15 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
// for external table sink hash partition
std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
std::atomic<bool> _reach_limit = false;
int _close_sender_number = 1;
};

class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> {
public:
ExchangeSinkOperatorX(RuntimeState* state, const RowDescriptor& row_desc, int operator_id,
const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations);
const std::vector<TPlanFragmentDestination>& destinations,
bool is_multi_cast = false);
Status init(const TDataSink& tsink) override;

RuntimeState* state() { return _state; }
Expand All @@ -219,6 +223,10 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
int num_receivers = 1);
DataDistribution required_data_distribution() const override;

bool is_exchange_sink() const override { return true; };

void set_close_sender_number(int number) { _close_sender_number = number; }

private:
friend class ExchangeSinkLocalState;

Expand Down Expand Up @@ -274,6 +282,10 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
size_t _data_processed = 0;
int _writer_count = 1;
const bool _enable_local_merge_sort;
const bool _is_multi_cast;
const bool _enable_pass_to_one_exchange;

int _close_sender_number = 1;
};

} // namespace pipeline
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class OperatorBase {

virtual bool is_source() const { return false; }

virtual bool is_exchange_sink() const { return false; }

[[nodiscard]] virtual const RowDescriptor& row_desc() const;

[[nodiscard]] virtual Status init(const TDataSink& tsink) { return Status::OK(); }
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/result_file_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class ResultFileSinkLocalState final
RuntimeProfile::Counter* split_block_distribute_by_channel_timer() {
return _split_block_distribute_by_channel_timer;
}
int close_sender_number() const { return 1; };

private:
friend class ResultFileSinkOperatorX;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c
size_t i = 0;
fmt::format_to(debug_string_buffer, ", MemTrackers: ");
for (auto* mem_tracker : _shared_state->mem_trackers) {
fmt::format_to(debug_string_buffer, "{}: {}, ", i, mem_tracker->consumption());
if (mem_tracker) {
fmt::format_to(debug_string_buffer, "{}: {}, ", i, mem_tracker->consumption());
}
i++;
}
return fmt::to_string(debug_string_buffer);
Expand Down
24 changes: 20 additions & 4 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,14 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
: 0);
}
break;
case ExchangeType::PASS_TO_ONE_EXCHANGE: {
shared_state->exchanger = PassToOneExchanger::create_unique(
cur_pipe->num_tasks(), _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
? _runtime_state->query_options().local_exchange_free_blocks_limit
: 0);
break;
}
case ExchangeType::LOCAL_MERGE_SORT: {
auto child_op = cur_pipe->sink()->child();
auto sort_source = std::dynamic_pointer_cast<SortSourceOperatorX>(child_op);
Expand Down Expand Up @@ -923,6 +931,14 @@ Status PipelineFragmentContext::_add_local_exchange_impl(

// 7. Inherit properties from current pipeline.
_inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);

if (data_distribution.distribution_type == ExchangeType::PASS_TO_ONE_EXCHANGE) {
if (auto* ex_sink = typeid_cast<ExchangeSinkOperatorX*>(cur_pipe->sink())) {
ex_sink->set_close_sender_number(cur_pipe->num_tasks());
cur_pipe->set_num_tasks(1);
}
}

return Status::OK();
}

Expand Down Expand Up @@ -1176,10 +1192,10 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
// 2. create and set sink operator of data stream sender for new pipeline

DataSinkOperatorPtr sink_op;
sink_op.reset(
new ExchangeSinkOperatorX(state, *_row_desc, next_sink_operator_id(),
thrift_sink.multi_cast_stream_sink.sinks[i],
thrift_sink.multi_cast_stream_sink.destinations[i]));
sink_op.reset(new ExchangeSinkOperatorX(
state, *_row_desc, next_sink_operator_id(),
thrift_sink.multi_cast_stream_sink.sinks[i],
thrift_sink.multi_cast_stream_sink.destinations[i], true));

RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
{
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,11 @@ class RuntimeState {
_query_options.enable_local_merge_sort;
}

bool enable_pass_to_one_exchange() const {
return _query_options.__isset.enable_pass_to_one_exchange &&
_query_options.enable_pass_to_one_exchange;
}

int64_t min_revocable_mem() const {
if (_query_options.__isset.min_revocable_mem) {
return std::max(_query_options.min_revocable_mem, (int64_t)1);
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/runtime/vdata_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,
if (eos) {
Status exec_status =
request->has_exec_status() ? Status::create(request->exec_status()) : Status::OK();
recvr->remove_sender(request->sender_id(), request->be_number(), exec_status);
int close_sender_number =
request->has_close_sender_number() ? request->close_sender_number() : 1;
recvr->remove_sender(request->sender_id(), request->be_number(), close_sender_number,
exec_status);
}
return Status::OK();
}
Expand Down
12 changes: 7 additions & 5 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,15 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
}
}

void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number, int close_sender_number) {
std::lock_guard<std::mutex> l(_lock);
if (_sender_eos_set.end() != _sender_eos_set.find(be_number)) {
return;
}
_sender_eos_set.insert(be_number);
DCHECK_GT(_num_remaining_senders, 0);
_num_remaining_senders--;
DCHECK_GE(_num_remaining_senders, close_sender_number);
_num_remaining_senders -= close_sender_number;

_record_debug_info();
VLOG_FILE << "decremented senders: fragment_instance_id="
<< print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id()
Expand Down Expand Up @@ -426,13 +427,14 @@ Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
}
}

void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status exec_status) {
void VDataStreamRecvr::remove_sender(int sender_id, int be_number, int close_sender_number,
Status exec_status) {
if (!exec_status.ok()) {
cancel_stream(exec_status);
return;
}
int use_sender_id = _is_merging ? sender_id : 0;
_sender_queues[use_sender_id]->decrement_senders(be_number);
_sender_queues[use_sender_id]->decrement_senders(be_number, close_sender_number);
}

void VDataStreamRecvr::cancel_stream(Status exec_status) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class VDataStreamRecvr : public HasTaskExecutionCtx {

// Indicate that a particular sender is done. Delegated to the appropriate
// sender queue. Called from DataStreamMgr.
void remove_sender(int sender_id, int be_number, Status exec_status);
void remove_sender(int sender_id, int be_number, int close_sender_number, Status exec_status);

void cancel_stream(Status exec_status);

Expand Down Expand Up @@ -185,7 +185,7 @@ class VDataStreamRecvr::SenderQueue {

void add_block(Block* block, bool use_move);

void decrement_senders(int sender_id);
void decrement_senders(int be_number, int close_sender_number);

void cancel(Status cancel_status);

Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {

_local_recvr->add_block(&block, _parent->sender_id(), true);
if (eos) {
_local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status);
_local_recvr->remove_sender(_parent->sender_id(), _be_number,
_parent->close_sender_number(), exec_status);
}
return Status::OK();
} else {
Expand Down Expand Up @@ -326,7 +327,8 @@ Status Channel<Parent>::close_internal(Status exec_status) {
SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
if (is_local()) {
if (_recvr_is_valid()) {
_local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status);
_local_recvr->remove_sender(_parent->sender_id(), _be_number,
_parent->close_sender_number(), exec_status);
}
} else {
// Non pipeline engine will send an empty eos block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_LOCAL_MERGE_SORT = "enable_local_merge_sort";

public static final String ENABLE_PASS_TO_ONE_EXCHANGE = "enable_pass_to_one_exchange";

public static final String ENABLE_AGG_STATE = "enable_agg_state";

public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline";
Expand Down Expand Up @@ -1068,6 +1070,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_LOCAL_MERGE_SORT)
private boolean enableLocalMergeSort = true;

@VariableMgr.VarAttr(name = ENABLE_PASS_TO_ONE_EXCHANGE)
private boolean enablePassToOneExchange = false;

@VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
needForward = true)
public boolean enableAggState = false;
Expand Down Expand Up @@ -2215,6 +2220,7 @@ public void initFuzzyModeVariables() {
this.parallelPrepareThreshold = random.nextInt(32) + 1;
this.enableCommonExprPushdown = random.nextBoolean();
this.enableLocalExchange = random.nextBoolean();
this.enablePassToOneExchange = random.nextBoolean();
// This will cause be dead loop, disable it first
// this.disableJoinReorder = random.nextBoolean();
this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
Expand Down Expand Up @@ -3774,6 +3780,7 @@ public TQueryOptions toThrift() {
tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);

tResult.setEnableLocalMergeSort(enableLocalMergeSort);
tResult.setEnablePassToOneExchange(enablePassToOneExchange);
tResult.setEnableParallelResultSink(enableParallelResultSink);
tResult.setEnableShortCircuitQueryAccessColumnStore(enableShortCircuitQueryAcessColumnStore);
tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull);
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ message PTransmitDataParams {
optional bool transfer_by_attachment = 10 [default = false];
optional PUniqueId query_id = 11;
optional PStatus exec_status = 12;
optional int32 close_sender_number = 13;
};

message PTransmitDataResult {
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ struct TQueryOptions {
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
135: optional bool enable_pass_to_one_exchange = true;
1000: optional bool disable_file_cache = false
}

Expand Down
Loading