Skip to content

Commit

Permalink
[pipeline](fix) Avoid to use a freed dependency when cancelled (#34584)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored May 11, 2024
1 parent 25e0e29 commit b891501
Show file tree
Hide file tree
Showing 17 changed files with 73 additions and 83 deletions.
29 changes: 12 additions & 17 deletions be/src/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,14 @@
namespace doris::pipeline {

Dependency* BasicSharedState::create_source_dependency(int operator_id, int node_id,
std::string name, QueryContext* ctx) {
source_deps.push_back(
std::make_shared<Dependency>(operator_id, node_id, name + "_DEPENDENCY", ctx));
std::string name) {
source_deps.push_back(std::make_shared<Dependency>(operator_id, node_id, name + "_DEPENDENCY"));
source_deps.back()->set_shared_state(this);
return source_deps.back().get();
}

Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, std::string name,
QueryContext* ctx) {
sink_deps.push_back(
std::make_shared<Dependency>(dest_id, node_id, name + "_DEPENDENCY", true, ctx));
Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, std::string name) {
sink_deps.push_back(std::make_shared<Dependency>(dest_id, node_id, name + "_DEPENDENCY", true));
sink_deps.back()->set_shared_state(this);
return sink_deps.back().get();
}
Expand Down Expand Up @@ -73,7 +70,7 @@ void Dependency::set_ready() {

Dependency* Dependency::is_blocked_by(PipelineTask* task) {
std::unique_lock<std::mutex> lc(_task_lock);
auto ready = _ready.load() || _is_cancelled();
auto ready = _ready.load();
if (!ready && task) {
_add_block_task(task);
}
Expand All @@ -82,20 +79,18 @@ Dependency* Dependency::is_blocked_by(PipelineTask* task) {

std::string Dependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}{}: id={}, block task = {}, ready={}, _always_ready={}, is cancelled={}",
fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {}, ready={}, _always_ready={}",
std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(),
_ready, _always_ready, _is_cancelled());
_ready, _always_ready);
return fmt::to_string(debug_string_buffer);
}

std::string CountedFinishDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(
debug_string_buffer,
"{}{}: id={}, block task = {}, ready={}, _always_ready={}, is cancelled={}, count={}",
std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(), _ready,
_always_ready, _is_cancelled(), _counter);
fmt::format_to(debug_string_buffer,
"{}{}: id={}, block task = {}, ready={}, _always_ready={}, count={}",
std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(),
_ready, _always_ready, _counter);
return fmt::to_string(debug_string_buffer);
}

Expand All @@ -108,7 +103,7 @@ std::string RuntimeFilterDependency::debug_string(int indentation_level) {

Dependency* RuntimeFilterDependency::is_blocked_by(PipelineTask* task) {
std::unique_lock<std::mutex> lc(_task_lock);
auto ready = _ready.load() || _is_cancelled();
auto ready = _ready.load();
if (!ready && task) {
_add_block_task(task);
task->_blocked_dep = this;
Expand Down
37 changes: 15 additions & 22 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,30 +79,26 @@ struct BasicSharedState {

virtual ~BasicSharedState() = default;

Dependency* create_source_dependency(int operator_id, int node_id, std::string name,
QueryContext* ctx);
Dependency* create_source_dependency(int operator_id, int node_id, std::string name);

Dependency* create_sink_dependency(int dest_id, int node_id, std::string name,
QueryContext* ctx);
Dependency* create_sink_dependency(int dest_id, int node_id, std::string name);
};

class Dependency : public std::enable_shared_from_this<Dependency> {
public:
ENABLE_FACTORY_CREATOR(Dependency);
Dependency(int id, int node_id, std::string name, QueryContext* query_ctx)
Dependency(int id, int node_id, std::string name)
: _id(id),
_node_id(node_id),
_name(std::move(name)),
_is_write_dependency(false),
_ready(false),
_query_ctx(query_ctx) {}
Dependency(int id, int node_id, std::string name, bool ready, QueryContext* query_ctx)
_ready(false) {}
Dependency(int id, int node_id, std::string name, bool ready)
: _id(id),
_node_id(node_id),
_name(std::move(name)),
_is_write_dependency(true),
_ready(ready),
_query_ctx(query_ctx) {}
_ready(ready) {}
virtual ~Dependency() = default;

bool is_write_dependency() const { return _is_write_dependency; }
Expand Down Expand Up @@ -165,14 +161,12 @@ class Dependency : public std::enable_shared_from_this<Dependency> {

protected:
void _add_block_task(PipelineTask* task);
bool _is_cancelled() const { return _query_ctx->is_cancelled(); }

const int _id;
const int _node_id;
const std::string _name;
const bool _is_write_dependency;
std::atomic<bool> _ready;
const QueryContext* _query_ctx = nullptr;

BasicSharedState* _shared_state = nullptr;
MonotonicStopWatch _watcher;
Expand All @@ -192,8 +186,8 @@ struct FakeSharedState final : public BasicSharedState {
struct CountedFinishDependency final : public Dependency {
public:
using SharedState = FakeSharedState;
CountedFinishDependency(int id, int node_id, std::string name, QueryContext* query_ctx)
: Dependency(id, node_id, name, true, query_ctx) {}
CountedFinishDependency(int id, int node_id, std::string name)
: Dependency(id, node_id, name, true) {}

void add() {
std::unique_lock<std::mutex> l(_mtx);
Expand Down Expand Up @@ -283,9 +277,8 @@ struct RuntimeFilterTimerQueue {

class RuntimeFilterDependency final : public Dependency {
public:
RuntimeFilterDependency(int id, int node_id, std::string name, QueryContext* query_ctx,
IRuntimeFilter* runtime_filter)
: Dependency(id, node_id, name, query_ctx), _runtime_filter(runtime_filter) {}
RuntimeFilterDependency(int id, int node_id, std::string name, IRuntimeFilter* runtime_filter)
: Dependency(id, node_id, name), _runtime_filter(runtime_filter) {}
std::string debug_string(int indentation_level = 0) override;

Dependency* is_blocked_by(PipelineTask* task) override;
Expand Down Expand Up @@ -585,8 +578,8 @@ class AsyncWriterDependency final : public Dependency {
public:
using SharedState = BasicSharedState;
ENABLE_FACTORY_CREATOR(AsyncWriterDependency);
AsyncWriterDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "AsyncWriterDependency", true, query_ctx) {}
AsyncWriterDependency(int id, int node_id)
: Dependency(id, node_id, "AsyncWriterDependency", true) {}
~AsyncWriterDependency() override = default;
};

Expand Down Expand Up @@ -739,10 +732,10 @@ struct LocalExchangeSharedState : public BasicSharedState {
std::vector<MemTracker*> mem_trackers;
std::atomic<size_t> mem_usage = 0;
std::mutex le_lock;
void create_source_dependencies(int operator_id, int node_id, QueryContext* ctx) {
void create_source_dependencies(int operator_id, int node_id) {
for (size_t i = 0; i < source_deps.size(); i++) {
source_deps[i] = std::make_shared<Dependency>(
operator_id, node_id, "LOCAL_EXCHANGE_OPERATOR_DEPENDENCY", ctx);
source_deps[i] = std::make_shared<Dependency>(operator_id, node_id,
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
source_deps[i]->set_shared_state(this);
}
};
Expand Down
10 changes: 4 additions & 6 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,13 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
_state->be_number(), state);

register_channels(_sink_buffer.get());
_queue_dependency =
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"ExchangeSinkQueueDependency", true, state->get_query_ctx());
_queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"ExchangeSinkQueueDependency", true);
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) &&
!only_local_exchange) {
_broadcast_dependency =
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"BroadcastDependency", true, state->get_query_ctx());
_broadcast_dependency = Dependency::create_shared(
_parent->operator_id(), _parent->node_id(), "BroadcastDependency", true);
_sink_buffer->set_broadcast_dependency(_broadcast_dependency);
_broadcast_pb_blocks =
vectorized::BroadcastPBlockHolderQueue::create_shared(_broadcast_dependency);
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
current_channel_idx(0),
only_local_exchange(false),
_serializer(this) {
_finish_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY",
true, state->get_query_ctx());
_finish_dependency =
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY", true);
}

std::vector<Dependency*> dependencies() const override {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, timer_name, 1);
for (size_t i = 0; i < queues.size(); i++) {
deps[i] = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"SHUFFLE_DATA_DEPENDENCY", state->get_query_ctx());
"SHUFFLE_DATA_DEPENDENCY");
queues[i]->set_dependency(deps[i]);
metrics[i] = _runtime_profile->add_nonzero_counter(fmt::format("WaitForData{}", i),
TUnit ::TIME_NS, timer_name, 1);
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase*
RuntimeState* state)
: JoinBuildSinkLocalState(parent, state) {
_finish_dependency = std::make_shared<CountedFinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx());
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY");
}

Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
Expand Down
10 changes: 4 additions & 6 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,7 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
_shared_state = info.shared_state->template cast<SharedStateArg>();

_dependency = _shared_state->create_source_dependency(
_parent->operator_id(), _parent->node_id(), _parent->get_name(),
state->get_query_ctx());
_parent->operator_id(), _parent->node_id(), _parent->get_name());
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
}
Expand Down Expand Up @@ -507,8 +506,7 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
} else {
_shared_state = info.shared_state->template cast<SharedState>();
_dependency = _shared_state->create_sink_dependency(
_parent->dests_id().front(), _parent->node_id(), _parent->get_name(),
state->get_query_ctx());
_parent->dests_id().front(), _parent->node_id(), _parent->get_name());
}
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
Expand Down Expand Up @@ -588,8 +586,8 @@ template <typename Writer, typename Parent>
Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
_writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
_async_writer_dependency = AsyncWriterDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
_async_writer_dependency =
AsyncWriterDependency::create_shared(_parent->operator_id(), _parent->node_id());
_writer->set_dependency(_async_writer_dependency.get(), _finish_dependency.get());

_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -877,9 +877,9 @@ class AsyncWriterSink : public PipelineXSinkLocalState<FakeSharedState> {
using Base = PipelineXSinkLocalState<FakeSharedState>;
AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state), _async_writer_dependency(nullptr) {
_finish_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY",
true, state->get_query_ctx());
_finish_dependency =
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY", true);
}

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ namespace doris::pipeline {
PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
: Base(parent, state) {
_finish_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_SPILL_DEPENDENCY",
true, state->get_query_ctx());
_finish_dependency =
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_SPILL_DEPENDENCY", true);
}
Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
doris::pipeline::LocalSinkStateInfo& info) {
Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ bool ScanLocalState<Derived>::should_run_serial() const {
template <typename Derived>
Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
_scan_dependency =
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
_parent->get_name() + "_DEPENDENCY", state->get_query_ctx());
_scan_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
_parent->get_name() + "_DEPENDENCY");
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _scan_dependency->name() + "]Time", 1);
SCOPED_TIMER(exec_time_counter());
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
namespace doris::pipeline {
SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {
_finish_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_SPILL_DEPENDENCY",
true, state->get_query_ctx());
_finish_dependency =
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_SPILL_DEPENDENCY", true);
}

Status SpillSortSinkLocalState::init(doris::RuntimeState* state,
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/union_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
->data_queue.set_source_dependency(_shared_state->source_deps.front());
} else {
_only_const_dependency = Dependency::create_shared(
_parent->operator_id(), _parent->node_id(), _parent->get_name() + "_DEPENDENCY",
state->get_query_ctx());
_parent->operator_id(), _parent->node_id(), _parent->get_name() + "_DEPENDENCY");
_dependency = _only_const_dependency.get();
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
Expand Down
6 changes: 2 additions & 4 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -756,8 +756,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
std::to_string((int)data_distribution.distribution_type));
}
auto sink_dep = std::make_shared<Dependency>(sink_id, local_exchange_id,
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true,
_runtime_state->get_query_ctx());
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
sink_dep->set_shared_state(shared_state.get());
shared_state->sink_deps.push_back(sink_dep);
_op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}});
Expand All @@ -782,8 +781,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
}
operator_xs.insert(operator_xs.begin(), source_op);

shared_state->create_source_dependencies(source_op->operator_id(), source_op->node_id(),
_query_ctx.get());
shared_state->create_source_dependencies(source_op->operator_id(), source_op->node_id());

// 5. Set children for two pipelines separately.
std::vector<std::shared_ptr<Pipeline>> new_children;
Expand Down
19 changes: 16 additions & 3 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,22 @@ class PipelineTask {
int task_id() const { return _index; };

void clear_blocking_state() {
if (!_finished && _blocked_dep) {
_blocked_dep->set_ready();
_blocked_dep = nullptr;
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_release_lock);
if (!_finished) {
_execution_dep->set_always_ready();
for (auto* dep : _filter_dependencies) {
dep->set_always_ready();
}
for (auto* dep : _read_dependencies) {
dep->set_always_ready();
}
for (auto* dep : _write_dependencies) {
dep->set_always_ready();
}
for (auto* dep : _finish_dependencies) {
dep->set_always_ready();
}
}
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv*
_query_watcher.start();
_shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
_shared_scanner_controller.reset(new vectorized::SharedScannerController());
_execution_dependency =
pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", this);
_execution_dependency = pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency");
_runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
TUniqueId(), RuntimeFilterParamsContext::create(this), query_mem_tracker);

Expand Down
Loading

0 comments on commit b891501

Please sign in to comment.