From b8915017ee70a3211d04a1ffcf476061d768df67 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sat, 11 May 2024 11:29:36 +0800 Subject: [PATCH] [pipeline](fix) Avoid to use a freed dependency when cancelled (#34584) --- be/src/pipeline/dependency.cpp | 29 ++++++--------- be/src/pipeline/dependency.h | 37 ++++++++----------- .../pipeline/exec/exchange_sink_operator.cpp | 10 ++--- be/src/pipeline/exec/exchange_sink_operator.h | 6 +-- .../exec/exchange_source_operator.cpp | 2 +- be/src/pipeline/exec/hashjoin_build_sink.cpp | 3 +- be/src/pipeline/exec/operator.cpp | 10 ++--- be/src/pipeline/exec/operator.h | 6 +-- .../partitioned_aggregation_sink_operator.cpp | 6 +-- be/src/pipeline/exec/scan_operator.cpp | 5 +-- .../exec/spill_sort_sink_operator.cpp | 6 +-- .../pipeline/exec/union_source_operator.cpp | 3 +- be/src/pipeline/pipeline_fragment_context.cpp | 6 +-- be/src/pipeline/pipeline_task.h | 19 ++++++++-- be/src/runtime/query_context.cpp | 3 +- be/src/vec/exec/runtime_filter_consumer.cpp | 2 +- be/src/vec/runtime/vdata_stream_recvr.cpp | 3 +- 17 files changed, 73 insertions(+), 83 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 2508040ea3ffe5..7b25abc5c61be3 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -31,17 +31,14 @@ namespace doris::pipeline { Dependency* BasicSharedState::create_source_dependency(int operator_id, int node_id, - std::string name, QueryContext* ctx) { - source_deps.push_back( - std::make_shared(operator_id, node_id, name + "_DEPENDENCY", ctx)); + std::string name) { + source_deps.push_back(std::make_shared(operator_id, node_id, name + "_DEPENDENCY")); source_deps.back()->set_shared_state(this); return source_deps.back().get(); } -Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, std::string name, - QueryContext* ctx) { - sink_deps.push_back( - std::make_shared(dest_id, node_id, name + "_DEPENDENCY", true, ctx)); +Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, std::string name) { + sink_deps.push_back(std::make_shared(dest_id, node_id, name + "_DEPENDENCY", true)); sink_deps.back()->set_shared_state(this); return sink_deps.back().get(); } @@ -73,7 +70,7 @@ void Dependency::set_ready() { Dependency* Dependency::is_blocked_by(PipelineTask* task) { std::unique_lock lc(_task_lock); - auto ready = _ready.load() || _is_cancelled(); + auto ready = _ready.load(); if (!ready && task) { _add_block_task(task); } @@ -82,20 +79,18 @@ Dependency* Dependency::is_blocked_by(PipelineTask* task) { std::string Dependency::debug_string(int indentation_level) { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, - "{}{}: id={}, block task = {}, ready={}, _always_ready={}, is cancelled={}", + fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {}, ready={}, _always_ready={}", std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(), - _ready, _always_ready, _is_cancelled()); + _ready, _always_ready); return fmt::to_string(debug_string_buffer); } std::string CountedFinishDependency::debug_string(int indentation_level) { fmt::memory_buffer debug_string_buffer; - fmt::format_to( - debug_string_buffer, - "{}{}: id={}, block task = {}, ready={}, _always_ready={}, is cancelled={}, count={}", - std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(), _ready, - _always_ready, _is_cancelled(), _counter); + fmt::format_to(debug_string_buffer, + "{}{}: id={}, block task = {}, ready={}, _always_ready={}, count={}", + std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(), + _ready, _always_ready, _counter); return fmt::to_string(debug_string_buffer); } @@ -108,7 +103,7 @@ std::string RuntimeFilterDependency::debug_string(int indentation_level) { Dependency* RuntimeFilterDependency::is_blocked_by(PipelineTask* task) { std::unique_lock lc(_task_lock); - auto ready = _ready.load() || _is_cancelled(); + auto ready = _ready.load(); if (!ready && task) { _add_block_task(task); task->_blocked_dep = this; diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 76c66a087252fd..1fb361441dc6cd 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -79,30 +79,26 @@ struct BasicSharedState { virtual ~BasicSharedState() = default; - Dependency* create_source_dependency(int operator_id, int node_id, std::string name, - QueryContext* ctx); + Dependency* create_source_dependency(int operator_id, int node_id, std::string name); - Dependency* create_sink_dependency(int dest_id, int node_id, std::string name, - QueryContext* ctx); + Dependency* create_sink_dependency(int dest_id, int node_id, std::string name); }; class Dependency : public std::enable_shared_from_this { public: ENABLE_FACTORY_CREATOR(Dependency); - Dependency(int id, int node_id, std::string name, QueryContext* query_ctx) + Dependency(int id, int node_id, std::string name) : _id(id), _node_id(node_id), _name(std::move(name)), _is_write_dependency(false), - _ready(false), - _query_ctx(query_ctx) {} - Dependency(int id, int node_id, std::string name, bool ready, QueryContext* query_ctx) + _ready(false) {} + Dependency(int id, int node_id, std::string name, bool ready) : _id(id), _node_id(node_id), _name(std::move(name)), _is_write_dependency(true), - _ready(ready), - _query_ctx(query_ctx) {} + _ready(ready) {} virtual ~Dependency() = default; bool is_write_dependency() const { return _is_write_dependency; } @@ -165,14 +161,12 @@ class Dependency : public std::enable_shared_from_this { protected: void _add_block_task(PipelineTask* task); - bool _is_cancelled() const { return _query_ctx->is_cancelled(); } const int _id; const int _node_id; const std::string _name; const bool _is_write_dependency; std::atomic _ready; - const QueryContext* _query_ctx = nullptr; BasicSharedState* _shared_state = nullptr; MonotonicStopWatch _watcher; @@ -192,8 +186,8 @@ struct FakeSharedState final : public BasicSharedState { struct CountedFinishDependency final : public Dependency { public: using SharedState = FakeSharedState; - CountedFinishDependency(int id, int node_id, std::string name, QueryContext* query_ctx) - : Dependency(id, node_id, name, true, query_ctx) {} + CountedFinishDependency(int id, int node_id, std::string name) + : Dependency(id, node_id, name, true) {} void add() { std::unique_lock l(_mtx); @@ -283,9 +277,8 @@ struct RuntimeFilterTimerQueue { class RuntimeFilterDependency final : public Dependency { public: - RuntimeFilterDependency(int id, int node_id, std::string name, QueryContext* query_ctx, - IRuntimeFilter* runtime_filter) - : Dependency(id, node_id, name, query_ctx), _runtime_filter(runtime_filter) {} + RuntimeFilterDependency(int id, int node_id, std::string name, IRuntimeFilter* runtime_filter) + : Dependency(id, node_id, name), _runtime_filter(runtime_filter) {} std::string debug_string(int indentation_level = 0) override; Dependency* is_blocked_by(PipelineTask* task) override; @@ -585,8 +578,8 @@ class AsyncWriterDependency final : public Dependency { public: using SharedState = BasicSharedState; ENABLE_FACTORY_CREATOR(AsyncWriterDependency); - AsyncWriterDependency(int id, int node_id, QueryContext* query_ctx) - : Dependency(id, node_id, "AsyncWriterDependency", true, query_ctx) {} + AsyncWriterDependency(int id, int node_id) + : Dependency(id, node_id, "AsyncWriterDependency", true) {} ~AsyncWriterDependency() override = default; }; @@ -739,10 +732,10 @@ struct LocalExchangeSharedState : public BasicSharedState { std::vector mem_trackers; std::atomic mem_usage = 0; std::mutex le_lock; - void create_source_dependencies(int operator_id, int node_id, QueryContext* ctx) { + void create_source_dependencies(int operator_id, int node_id) { for (size_t i = 0; i < source_deps.size(); i++) { - source_deps[i] = std::make_shared( - operator_id, node_id, "LOCAL_EXCHANGE_OPERATOR_DEPENDENCY", ctx); + source_deps[i] = std::make_shared(operator_id, node_id, + "LOCAL_EXCHANGE_OPERATOR_DEPENDENCY"); source_deps[i]->set_shared_state(this); } }; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 5b584c3685ab6c..69cd139714fcf0 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -135,15 +135,13 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { _state->be_number(), state); register_channels(_sink_buffer.get()); - _queue_dependency = - Dependency::create_shared(_parent->operator_id(), _parent->node_id(), - "ExchangeSinkQueueDependency", true, state->get_query_ctx()); + _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "ExchangeSinkQueueDependency", true); _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && !only_local_exchange) { - _broadcast_dependency = - Dependency::create_shared(_parent->operator_id(), _parent->node_id(), - "BroadcastDependency", true, state->get_query_ctx()); + _broadcast_dependency = Dependency::create_shared( + _parent->operator_id(), _parent->node_id(), "BroadcastDependency", true); _sink_buffer->set_broadcast_dependency(_broadcast_dependency); _broadcast_pb_blocks = vectorized::BroadcastPBlockHolderQueue::create_shared(_broadcast_dependency); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index df68c8de74b80f..72fdbc3354d0fc 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -58,9 +58,9 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { current_channel_idx(0), only_local_exchange(false), _serializer(this) { - _finish_dependency = std::make_shared(parent->operator_id(), parent->node_id(), - parent->get_name() + "_FINISH_DEPENDENCY", - true, state->get_query_ctx()); + _finish_dependency = + std::make_shared(parent->operator_id(), parent->node_id(), + parent->get_name() + "_FINISH_DEPENDENCY", true); } std::vector dependencies() const override { diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 116014da6f2ad2..e37233f641b803 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -72,7 +72,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, timer_name, 1); for (size_t i = 0; i < queues.size(); i++) { deps[i] = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), - "SHUFFLE_DATA_DEPENDENCY", state->get_query_ctx()); + "SHUFFLE_DATA_DEPENDENCY"); queues[i]->set_dependency(deps[i]); metrics[i] = _runtime_profile->add_nonzero_counter(fmt::format("WaitForData{}", i), TUnit ::TIME_NS, timer_name, 1); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 9cfe060736dd04..90aeb7070b6d70 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -32,8 +32,7 @@ HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase* RuntimeState* state) : JoinBuildSinkLocalState(parent, state) { _finish_dependency = std::make_shared( - parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", - state->get_query_ctx()); + parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY"); } Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 45bad17ed01e01..04d2541277e13f 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -429,8 +429,7 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState _shared_state = info.shared_state->template cast(); _dependency = _shared_state->create_source_dependency( - _parent->operator_id(), _parent->node_id(), _parent->get_name(), - state->get_query_ctx()); + _parent->operator_id(), _parent->node_id(), _parent->get_name()); _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); } @@ -507,8 +506,7 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, LocalSink } else { _shared_state = info.shared_state->template cast(); _dependency = _shared_state->create_sink_dependency( - _parent->dests_id().front(), _parent->node_id(), _parent->get_name(), - state->get_query_ctx()); + _parent->dests_id().front(), _parent->node_id(), _parent->get_name()); } _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( _profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); @@ -588,8 +586,8 @@ template Status AsyncWriterSink::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs)); - _async_writer_dependency = AsyncWriterDependency::create_shared( - _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); + _async_writer_dependency = + AsyncWriterDependency::create_shared(_parent->operator_id(), _parent->node_id()); _writer->set_dependency(_async_writer_dependency.get(), _finish_dependency.get()); _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 1b99b849d2630f..ee1b05a603f76e 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -877,9 +877,9 @@ class AsyncWriterSink : public PipelineXSinkLocalState { using Base = PipelineXSinkLocalState; AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state), _async_writer_dependency(nullptr) { - _finish_dependency = std::make_shared(parent->operator_id(), parent->node_id(), - parent->get_name() + "_FINISH_DEPENDENCY", - true, state->get_query_ctx()); + _finish_dependency = + std::make_shared(parent->operator_id(), parent->node_id(), + parent->get_name() + "_FINISH_DEPENDENCY", true); } Status init(RuntimeState* state, LocalSinkStateInfo& info) override; diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 4d007ae9dc4e31..89d8408bbd1cc8 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -28,9 +28,9 @@ namespace doris::pipeline { PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) { - _finish_dependency = std::make_shared(parent->operator_id(), parent->node_id(), - parent->get_name() + "_SPILL_DEPENDENCY", - true, state->get_query_ctx()); + _finish_dependency = + std::make_shared(parent->operator_id(), parent->node_id(), + parent->get_name() + "_SPILL_DEPENDENCY", true); } Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state, doris::pipeline::LocalSinkStateInfo& info) { diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index e5ba6dbf06d427..ddb379e0977f3f 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -68,9 +68,8 @@ bool ScanLocalState::should_run_serial() const { template Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); - _scan_dependency = - Dependency::create_shared(_parent->operator_id(), _parent->node_id(), - _parent->get_name() + "_DEPENDENCY", state->get_query_ctx()); + _scan_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + _parent->get_name() + "_DEPENDENCY"); _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( _runtime_profile, "WaitForDependency[" + _scan_dependency->name() + "]Time", 1); SCOPED_TIMER(exec_time_counter()); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index c6a943c59b53ca..e20ad943742d97 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -23,9 +23,9 @@ namespace doris::pipeline { SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) { - _finish_dependency = std::make_shared(parent->operator_id(), parent->node_id(), - parent->get_name() + "_SPILL_DEPENDENCY", - true, state->get_query_ctx()); + _finish_dependency = + std::make_shared(parent->operator_id(), parent->node_id(), + parent->get_name() + "_SPILL_DEPENDENCY", true); } Status SpillSortSinkLocalState::init(doris::RuntimeState* state, diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index c94fda822175a4..ecaaf22922b657 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -44,8 +44,7 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { ->data_queue.set_source_dependency(_shared_state->source_deps.front()); } else { _only_const_dependency = Dependency::create_shared( - _parent->operator_id(), _parent->node_id(), _parent->get_name() + "_DEPENDENCY", - state->get_query_ctx()); + _parent->operator_id(), _parent->node_id(), _parent->get_name() + "_DEPENDENCY"); _dependency = _only_const_dependency.get(); _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 9b5b3addfa2a83..860dd87c8c1ab5 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -756,8 +756,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( std::to_string((int)data_distribution.distribution_type)); } auto sink_dep = std::make_shared(sink_id, local_exchange_id, - "LOCAL_EXCHANGE_SINK_DEPENDENCY", true, - _runtime_state->get_query_ctx()); + "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); sink_dep->set_shared_state(shared_state.get()); shared_state->sink_deps.push_back(sink_dep); _op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}}); @@ -782,8 +781,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( } operator_xs.insert(operator_xs.begin(), source_op); - shared_state->create_source_dependencies(source_op->operator_id(), source_op->node_id(), - _query_ctx.get()); + shared_state->create_source_dependencies(source_op->operator_id(), source_op->node_id()); // 5. Set children for two pipelines separately. std::vector> new_children; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 4ca3abbc4c5441..8eeb8e7a3edbbe 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -141,9 +141,22 @@ class PipelineTask { int task_id() const { return _index; }; void clear_blocking_state() { - if (!_finished && _blocked_dep) { - _blocked_dep->set_ready(); - _blocked_dep = nullptr; + // We use a lock to assure all dependencies are not deconstructed here. + std::unique_lock lc(_release_lock); + if (!_finished) { + _execution_dep->set_always_ready(); + for (auto* dep : _filter_dependencies) { + dep->set_always_ready(); + } + for (auto* dep : _read_dependencies) { + dep->set_always_ready(); + } + for (auto* dep : _write_dependencies) { + dep->set_always_ready(); + } + for (auto* dep : _finish_dependencies) { + dep->set_always_ready(); + } } } diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index b1d90ce29a9f82..3ebb329b5f2af9 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -71,8 +71,7 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* _query_watcher.start(); _shared_hash_table_controller.reset(new vectorized::SharedHashTableController()); _shared_scanner_controller.reset(new vectorized::SharedScannerController()); - _execution_dependency = - pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", this); + _execution_dependency = pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency"); _runtime_filter_mgr = std::make_unique( TUniqueId(), RuntimeFilterParamsContext::create(this), query_mem_tracker); diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index 4e2177e3f1be38..e4250f45dccb88 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -84,7 +84,7 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency( for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; runtime_filter_dependencies[i] = std::make_shared( - id, node_id, name, _state->get_query_ctx(), runtime_filter); + id, node_id, name, runtime_filter); _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get(); auto filter_timer = std::make_shared( runtime_filter->registration_time(), runtime_filter->wait_time_ms(), diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 483a21e2545751..c2535da2cf2122 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -361,8 +361,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* sta _sender_to_local_channel_dependency.resize(num_queues); for (size_t i = 0; i < num_queues; i++) { _sender_to_local_channel_dependency[i] = pipeline::Dependency::create_shared( - _dest_node_id, _dest_node_id, "LocalExchangeChannelDependency", true, - state->get_query_ctx()); + _dest_node_id, _dest_node_id, "LocalExchangeChannelDependency", true); } } _sender_queues.reserve(num_queues);