Skip to content

Commit

Permalink
[fix](profile) Should set is done when reporting profile (#33810)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh authored May 1, 2024
1 parent 555f1bd commit c7bbc69
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 25 deletions.
2 changes: 1 addition & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1556,7 +1556,7 @@ Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,
}

*exec_status = RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline(
query_id, instance_profiles, load_channel_profiles);
query_id, instance_profiles, load_channel_profiles, /*is_done=*/false);
}

return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ TReportExecStatusParams QueryContext::get_realtime_exec_status_x() const {
}

exec_status = RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
this->_query_id, realtime_query_profile, load_channel_profiles);
this->_query_id, realtime_query_profile, load_channel_profiles, /*is_done=*/false);
} else {
auto msg = fmt::format("Query {} is not pipelineX query", print_id(_query_id));
LOG_ERROR(msg);
Expand Down
12 changes: 8 additions & 4 deletions be/src/runtime/runtime_query_statistics_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ TReportExecStatusParams RuntimeQueryStatiticsMgr::create_report_exec_status_para
const TUniqueId& query_id,
const std::unordered_map<int32, std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
fragment_id_to_profile,
const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profiles) {
const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profiles,
bool is_done) {
TQueryProfile profile;
profile.__set_query_id(query_id);

Expand Down Expand Up @@ -162,6 +163,7 @@ TReportExecStatusParams RuntimeQueryStatiticsMgr::create_report_exec_status_para
req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
// invalid query id to avoid API compatibility during upgrade
req.__set_query_id(TUniqueId());
req.__set_done(is_done);

return req;
}
Expand All @@ -170,7 +172,8 @@ TReportExecStatusParams RuntimeQueryStatiticsMgr::create_report_exec_status_para
const TUniqueId& query_id,
const std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>&
instance_id_to_profile,
const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profile) {
const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profile,
bool is_done) {
TQueryProfile profile;
std::vector<TUniqueId> fragment_instance_ids;
std::vector<TRuntimeProfileTree> instance_profiles;
Expand Down Expand Up @@ -202,6 +205,7 @@ TReportExecStatusParams RuntimeQueryStatiticsMgr::create_report_exec_status_para
// invalid query id to avoid API compatibility during upgrade
res.__set_query_id(TUniqueId());
res.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
res.__set_done(is_done);
return res;
}

Expand Down Expand Up @@ -378,7 +382,7 @@ void RuntimeQueryStatiticsMgr::_report_query_profiles_non_pipeline() {
}

TReportExecStatusParams req = create_report_exec_status_params_non_pipeline(
query_id, instance_id_to_profile, load_channel_profiles);
query_id, instance_id_to_profile, load_channel_profiles, /*is_done=*/true);
TReportExecStatusResult res;
auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res);

Expand Down Expand Up @@ -431,7 +435,7 @@ void RuntimeQueryStatiticsMgr::_report_query_profiles_x() {
}

TReportExecStatusParams req = create_report_exec_status_params_x(
query_id, fragment_profile_map, load_channel_profiles);
query_id, fragment_profile_map, load_channel_profiles, /*is_done=*/true);
TReportExecStatusResult res;

auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res);
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/runtime_query_statistics_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ class RuntimeQueryStatiticsMgr {
const TUniqueId& q_id,
const std::unordered_map<int32, std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
fragment_id_to_profile,
const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profile);
const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profile,
bool is_done);

static TReportExecStatusParams create_report_exec_status_params_non_pipeline(
const TUniqueId& q_id,
const std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>&
instance_id_to_profile,
const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profile);
const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profile,
bool is_done);

void register_query_statistics(std::string query_id, std::shared_ptr<QueryStatistics> qs_ptr,
TNetworkAddress fe_addr);
Expand Down
1 change: 1 addition & 0 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,7 @@ void BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse
}

report_exec_status_params->__set_query_id(TUniqueId());
report_exec_status_params->__set_done(false);

response.__set_status(Status::OK().to_thrift());
response.__set_report_exec_status_params(*report_exec_status_params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public void update(long startTime, boolean isFinished) {
}
}

public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddress) {
public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddress, boolean isDone) {
if (isPipelineXProfile) {
if (!profile.isSetFragmentIdToProfile()) {
return new Status(TStatusCode.INVALID_ARGUMENT, "FragmentIdToProfile is not set");
Expand All @@ -270,6 +270,7 @@ public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddr
}

profileNode.update(pipelineProfile.profile);
profileNode.setIsDone(isDone);
pipelineIdx++;
fragmentProfiles.get(fragmentId).addChild(profileNode);
}
Expand Down Expand Up @@ -310,7 +311,7 @@ public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddr
LOG.warn("Could not find related profile {}", DebugUtil.printId(instanceId));
return new Status(TStatusCode.INVALID_ARGUMENT, "Could not find related instance");
}

curInstanceProfile.setIsDone(isDone);
curInstanceProfile.update(instanceProfile);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,13 @@ public void addChild(RuntimeProfile child) {
childLock.writeLock().lock();
try {
if (childMap.containsKey(child.name)) {
childList.removeIf(e -> e.first.name.equals(child.name));
// Pipeline/Instance has alread finished.
// This could happen because the report profile rpc is async.
if (childMap.get(child.name).getIsDone() || childMap.get(child.name).getIsCancel()) {
return;
} else {
childList.removeIf(e -> e.first.name.equals(child.name));
}
}
this.childMap.put(child.name, child);
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, true);
Expand Down
35 changes: 21 additions & 14 deletions fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryProfile;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
import org.apache.doris.thrift.TStatus;
Expand Down Expand Up @@ -74,7 +73,7 @@ private QeProcessorImpl() {
"profile-write-pool", true);
}

private Status processQueryProfile(TQueryProfile profile, TNetworkAddress address) {
private Status processQueryProfile(TQueryProfile profile, TNetworkAddress address, boolean isDone) {
LOG.info("New profile processing API, query {}", DebugUtil.printId(profile.query_id));

ExecutionProfile executionProfile = ProfileManager.getInstance().getExecutionProfile(profile.query_id);
Expand All @@ -88,7 +87,7 @@ private Status processQueryProfile(TQueryProfile profile, TNetworkAddress addres
writeProfileExecutor.submit(new Runnable() {
@Override
public void run() {
executionProfile.updateProfile(profile, address);
executionProfile.updateProfile(profile, address, isDone);
}
});

Expand Down Expand Up @@ -231,11 +230,24 @@ public Map<String, QueryStatisticsItem> getQueryStatistics() {
@Override
public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr) {
if (params.isSetQueryProfile()) {
if (params.isSetBackendId()) {
// Why not return response when process new profile failed?
// First of all, we will do a refactor for report exec status in the future.
// In that refactor, we will combine the report of exec status with query profile in a single rpc.
// If we return error response in this pr, we will have problem when doing cluster upgrading.
// For example, FE will return directly if it receives profile, but BE actually report exec status
// with profile in a single rpc, this will make FE ignore the exec status and may lead to bug in query
// like insert into select.
if (params.isSetBackendId() && params.isSetDone()) {
Backend backend = Env.getCurrentSystemInfo().getBackend(params.getBackendId());
boolean isDone = params.isDone();
if (backend != null) {
processQueryProfile(params.getQueryProfile(), backend.getHeartbeatAddress());
// the process status is ignored by design.
// actually be does not care the process status of profile on fe.
processQueryProfile(params.getQueryProfile(), backend.getHeartbeatAddress(), isDone);
}
} else {
LOG.warn("Invalid report profile req, this is a logical error, BE must set backendId and isDone"
+ " at same time, query id: {}" + DebugUtil.printId(params.query_id));
}
}

Expand Down Expand Up @@ -270,16 +282,11 @@ public void run() {
}

final QueryInfo info = coordinatorMap.get(params.query_id);

result.setStatus(new TStatus(TStatusCode.OK));
if (info == null) {
// There is no QueryInfo for StreamLoad, so we return OK
if (params.query_type == TQueryType.LOAD) {
result.setStatus(new TStatus(TStatusCode.OK));
} else {
result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR));
}
LOG.warn("ReportExecStatus() runtime error, query {} with type {} does not exist",
DebugUtil.printId(params.query_id), params.query_type);
// Currently, the execution of query is splited from the exec status process.
// So, it is very likely that when exec status arrived on FE asynchronously, coordinator
// has been removed from coordinatorMap.
return result;
}
try {
Expand Down

0 comments on commit c7bbc69

Please sign in to comment.