Skip to content

Commit

Permalink
[refactor](query) Make query finish with context released safely (#35248
Browse files Browse the repository at this point in the history
)

We rely on a count number to decide if we can release a query context. However, count is not always reliable. If query is failed in preparation phase, the fragment number computed by FE may not be equal to its actual initialized fragments. So we here make query context a weak ptr which is release when all fragments finish instead of relying a reference count.
  • Loading branch information
Gabriel39 authored and dataroaring committed May 26, 2024
1 parent b36f52e commit b581649
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 125 deletions.
206 changes: 107 additions & 99 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,11 +480,6 @@ void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
}

std::shared_ptr<QueryContext> query_ctx = fragment_executor->get_query_ctx();
bool all_done = false;
if (query_ctx != nullptr) {
// decrease the number of unfinished fragments
all_done = query_ctx->countdown(1);
}

// remove exec state after this fragment finished
{
Expand All @@ -499,11 +494,6 @@ void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
g_fragment_last_active_time.set_value(now);

LOG_INFO("Instance {} finished", print_id(fragment_executor->fragment_instance_id()));

if (all_done && query_ctx) {
_query_ctx_map.erase(query_ctx->query_id());
LOG_INFO("Query {} finished", print_id(query_ctx->query_id()));
}
}

// Callback after remove from this id
Expand Down Expand Up @@ -590,42 +580,50 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r
TUniqueId query_id;
query_id.__set_hi(request->query_id().hi());
query_id.__set_lo(request->query_id().lo());
auto search = _query_ctx_map.find(query_id);
if (search == _query_ctx_map.end()) {
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
q_ctx->set_ready_to_execute(Status::OK());
} else {
return Status::InternalError(
"Failed to get query fragments context. Query may be "
"timeout or be cancelled. host: {}",
BackendOptions::get_localhost());
}
search->second->set_ready_to_execute(Status::OK());
return Status::OK();
}

void FragmentMgr::remove_pipeline_context(
std::shared_ptr<pipeline::PipelineFragmentContext> f_context) {
auto* q_context = f_context->get_query_ctx();
{
std::lock_guard<std::mutex> lock(_lock);
auto query_id = f_context->get_query_id();
std::vector<TUniqueId> ins_ids;
f_context->instance_ids(ins_ids);
bool all_done = q_context->countdown(ins_ids.size());
int64 now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_fragment_executing_count << -1;
g_fragment_last_active_time.set_value(now);
for (const auto& ins_id : ins_ids) {
LOG_INFO("Removing query {} instance {}, all done? {}", print_id(query_id),
print_id(ins_id), all_done);
LOG_INFO("Removing query {} instance {}", print_id(query_id), print_id(ins_id));
_pipeline_map.erase(ins_id);
g_pipeline_fragment_instances_count << -1;
}
if (all_done) {
LOG_INFO("Query {} finished", print_id(query_id));
_query_ctx_map.erase(query_id);
}
}

std::shared_ptr<QueryContext> FragmentMgr::_get_or_erase_query_ctx(TUniqueId query_id) {
auto search = _query_ctx_map.find(query_id);
if (search != _query_ctx_map.end()) {
if (auto q_ctx = search->second.lock()) {
return q_ctx;
} else {
LOG(WARNING) << "Query context (query id = " << print_id(query_id)
<< ") has been released.";
_query_ctx_map.erase(search);
return nullptr;
}
}
return nullptr;
}

template <typename Params>
Expand All @@ -634,21 +632,20 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
if (params.is_simplified_param) {
// Get common components from _query_ctx_map
std::lock_guard<std::mutex> lock(_lock);
auto search = _query_ctx_map.find(query_id);
if (search == _query_ctx_map.end()) {
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::InternalError(
"Failed to get query fragments context. Query may be "
"timeout or be cancelled. host: {}",
BackendOptions::get_localhost());
}
query_ctx = search->second;
} else {
// Find _query_ctx_map, in case some other request has already
// create the query fragments context.
std::lock_guard<std::mutex> lock(_lock);
auto search = _query_ctx_map.find(query_id);
if (search != _query_ctx_map.end()) {
query_ctx = search->second;
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
return Status::OK();
}

Expand All @@ -659,9 +656,8 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo

// This may be a first fragment request of the query.
// Create the query fragments context.
query_ctx = QueryContext::create_shared(query_id, params.fragment_num_on_host, _exec_env,
params.query_options, params.coord, pipeline,
params.is_nereids);
query_ctx = QueryContext::create_shared(query_id, _exec_env, params.query_options,
params.coord, pipeline, params.is_nereids);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl,
&(query_ctx->desc_tbl)));
Expand Down Expand Up @@ -924,32 +920,32 @@ void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext* query
#endif
}

std::shared_ptr<QueryContext> FragmentMgr::get_query_context(const TUniqueId& query_id) {
Status FragmentMgr::get_query_context(const TUniqueId& query_id,
std::shared_ptr<QueryContext>* query_ctx) {
std::lock_guard<std::mutex> state_lock(_lock);
auto ctx = _query_ctx_map.find(query_id);
if (ctx != _query_ctx_map.end()) {
return ctx->second;
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
*query_ctx = q_ctx;
} else {
return nullptr;
return Status::InternalError("Query context not found for query {}", print_id(query_id));
}
return Status::OK();
}

void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
std::shared_ptr<QueryContext> query_ctx;
std::shared_ptr<QueryContext> query_ctx = nullptr;
std::vector<TUniqueId> all_instance_ids;
{
std::lock_guard<std::mutex> state_lock(_lock);
auto ctx_iter = _query_ctx_map.find(query_id);

if (ctx_iter == _query_ctx_map.end()) {
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
// Copy instanceids to avoid concurrent modification.
// And to reduce the scope of lock.
all_instance_ids = query_ctx->fragment_instance_ids;
} else {
LOG(WARNING) << "Query " << print_id(query_id)
<< " does not exists, failed to cancel it";
return;
}
query_ctx = ctx_iter->second;
// Copy instanceids to avoid concurrent modification.
// And to reduce the scope of lock.
all_instance_ids = query_ctx->fragment_instance_ids;
}
if (query_ctx->enable_pipeline_x_exec()) {
query_ctx->cancel_all_pipeline_context(reason);
Expand Down Expand Up @@ -1006,10 +1002,7 @@ void FragmentMgr::cancel_instance(const TUniqueId instance_id, const Status reas
void FragmentMgr::cancel_fragment(const TUniqueId query_id, int32_t fragment_id,
const Status reason) {
std::unique_lock<std::mutex> lock(_lock);
auto q_ctx_iter = _query_ctx_map.find(query_id);
if (q_ctx_iter != _query_ctx_map.end()) {
// Has to use value to keep the shared ptr not deconstructed.
std::shared_ptr<QueryContext> q_ctx = q_ctx_iter->second;
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
// the lock should only be used to protect the map, not scope query ctx
lock.unlock();
WARN_IF_ERROR(q_ctx->cancel_pipeline_context(fragment_id, reason),
Expand Down Expand Up @@ -1045,11 +1038,18 @@ void FragmentMgr::cancel_worker() {
}
}
for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) {
if (it->second->is_timeout(now)) {
LOG_WARNING("Query {} is timeout", print_id(it->first));
it = _query_ctx_map.erase(it);
if (auto q_ctx = it->second.lock()) {
if (q_ctx->is_timeout(now)) {
LOG_WARNING("Query {} is timeout", print_id(it->first));
it = _query_ctx_map.erase(it);
} else {
++it;
}
} else {
++it;
LOG_WARNING(
"Query context for {} is released, just erase it from _query_ctx_map",
print_id(it->first));
it = _query_ctx_map.erase(it);
}
}

Expand All @@ -1065,32 +1065,34 @@ void FragmentMgr::cancel_worker() {
<< "Could not find any running frontends, maybe we are upgrading? "
<< "We will not cancel any running queries in this situation.";
} else {
for (const auto& q : _query_ctx_map) {
if (q.second->get_fe_process_uuid() == 0) {
// zero means this query is from a older version fe or
// this fe is starting
continue;
}

auto itr = running_fes.find(q.second->coord_addr);
if (itr != running_fes.end()) {
if (q.second->get_fe_process_uuid() == itr->second.info.process_uuid ||
itr->second.info.process_uuid == 0) {
for (const auto& it : _query_ctx_map) {
if (auto q_ctx = it.second.lock()) {
if (q_ctx->get_fe_process_uuid() == 0) {
// zero means this query is from a older version fe or
// this fe is starting
continue;
}

auto itr = running_fes.find(q_ctx->coord_addr);
if (itr != running_fes.end()) {
if (q_ctx->get_fe_process_uuid() == itr->second.info.process_uuid ||
itr->second.info.process_uuid == 0) {
continue;
} else {
LOG_WARNING(
"Coordinator of query {} restarted, going to cancel it.",
print_id(q_ctx->query_id()));
}
} else {
LOG_WARNING("Coordinator of query {} restarted, going to cancel it.",
print_id(q.second->query_id()));
LOG_WARNING(
"Could not find target coordinator {}:{} of query {}, going to "
"cancel it.",
q_ctx->coord_addr.hostname, q_ctx->coord_addr.port,
print_id(q_ctx->query_id()));
}
} else {
LOG_WARNING(
"Could not find target coordinator {}:{} of query {}, going to "
"cancel it.",
q.second->coord_addr.hostname, q.second->coord_addr.port,
print_id(q.second->query_id()));
}

// Coorninator of this query has already dead.
queries_to_cancel.push_back(q.first);
// Coordinator of this query has already dead or query context has been released.
queries_to_cancel.push_back(it.first);
}
}
}
Expand Down Expand Up @@ -1377,12 +1379,12 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
std::lock_guard<std::mutex> lock(_lock);
auto iter = _query_ctx_map.find(query_id);
if (iter == _query_ctx_map.end()) {
return Status::InvalidArgument("query-id: {}", queryid.to_string());
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::InvalidArgument("Query context (query-id: {}) not found",
queryid.to_string());
}

query_ctx = iter->second;
}
auto merge_status = filter_controller->send_filter_size(request);
return merge_status;
Expand All @@ -1396,12 +1398,12 @@ Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
std::lock_guard<std::mutex> lock(_lock);
auto iter = _query_ctx_map.find(query_id);
if (iter == _query_ctx_map.end()) {
return Status::InvalidArgument("query-id: {}", queryid.to_string());
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::InvalidArgument("Query context (query-id: {}) not found",
queryid.to_string());
}

query_ctx = iter->second;
}
return query_ctx->runtime_filter_mgr()->sync_filter_size(request);
}
Expand All @@ -1418,14 +1420,12 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
std::lock_guard<std::mutex> lock(_lock);
auto iter = _query_ctx_map.find(query_id);
if (iter == _query_ctx_map.end()) {
return Status::InvalidArgument("query-id: {}", queryid.to_string());
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::InvalidArgument("Query context (query-id: {}) not found",
queryid.to_string());
}

// hold reference to pip_context, or else runtime_state can be destroyed
// when filter_controller->merge is still in progress
query_ctx = iter->second;
}
SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, query_ctx->query_id());
auto merge_status = filter_controller->merge(request, attach_data);
Expand Down Expand Up @@ -1513,13 +1513,20 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag
void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_info_list) {
{
std::lock_guard<std::mutex> lock(_lock);
for (const auto& q : _query_ctx_map) {
WorkloadQueryInfo workload_query_info;
workload_query_info.query_id = print_id(q.first);
workload_query_info.tquery_id = q.first;
workload_query_info.wg_id =
q.second->workload_group() == nullptr ? -1 : q.second->workload_group()->id();
query_info_list->push_back(workload_query_info);
for (auto iter = _query_ctx_map.begin(); iter != _query_ctx_map.end();) {
if (auto q_ctx = iter->second.lock()) {
WorkloadQueryInfo workload_query_info;
workload_query_info.query_id = print_id(iter->first);
workload_query_info.tquery_id = iter->first;
workload_query_info.wg_id =
q_ctx->workload_group() == nullptr ? -1 : q_ctx->workload_group()->id();
query_info_list->push_back(workload_query_info);
iter++;
} else {
LOG_WARNING("Query context for {} is released, just erase it from _query_ctx_map",
print_id(iter->first));
iter = _query_ctx_map.erase(iter);
}
}
}
}
Expand All @@ -1534,9 +1541,10 @@ Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,

{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _query_ctx_map.find(query_id);
if (iter != _query_ctx_map.end()) {
query_context = iter->second;
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_context = q_ctx;
} else {
return Status::NotFound("Query {} has been released", print_id(query_id));
}
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class FragmentMgr : public RestMonitorIface {

ThreadPool* get_thread_pool() { return _thread_pool.get(); }

std::shared_ptr<QueryContext> get_query_context(const TUniqueId& query_id);
Status get_query_context(const TUniqueId& query_id, std::shared_ptr<QueryContext>* query_ctx);

int32_t running_query_num() {
std::unique_lock<std::mutex> ctx_lock(_lock);
Expand Down Expand Up @@ -175,6 +175,7 @@ class FragmentMgr : public RestMonitorIface {
template <typename Params>
Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline,
std::shared_ptr<QueryContext>& query_ctx);
std::shared_ptr<QueryContext> _get_or_erase_query_ctx(TUniqueId query_id);

// This is input params
ExecEnv* _exec_env = nullptr;
Expand All @@ -192,7 +193,7 @@ class FragmentMgr : public RestMonitorIface {
std::unordered_map<TUniqueId, std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map;

// query id -> QueryContext
std::unordered_map<TUniqueId, std::shared_ptr<QueryContext>> _query_ctx_map;
std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctx_map;
std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> _bf_size_map;

CountDownLatch _stop_background_threads_latch;
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig
_sender_ip(sender_ip),
_backend_id(backend_id),
_enable_profile(enable_profile) {
std::shared_ptr<QueryContext> query_context =
ExecEnv::GetInstance()->fragment_mgr()->get_query_context(_load_id.to_thrift());
std::shared_ptr<QueryContext> query_context = nullptr;
WARN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_query_context(_load_id.to_thrift(),
&query_context),
"");
if (query_context != nullptr) {
_query_thread_context = {_load_id.to_thrift(), query_context->query_mem_tracker};
} else {
Expand Down
Loading

0 comments on commit b581649

Please sign in to comment.