Skip to content

Commit

Permalink
[fix](pipelinex) fix fragment instance progress reports (part 2)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Sep 11, 2024
1 parent 73bf91a commit c134446
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
18 changes: 17 additions & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,18 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
// this is a load plan, and load is not finished, just make a brief report
params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
for (auto* rs : req.runtime_states) {
if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
rs->num_finished_range() > 0) {
params.__isset.fragment_instance_reports = true;
TFragmentInstanceReport t;
t.__set_fragment_instance_id(rs->fragment_instance_id());
t.__set_num_finished_range(rs->num_finished_range());
t.__set_loaded_rows(rs->num_rows_load_total());
t.__set_loaded_bytes(rs->num_bytes_load_total());
params.fragment_instance_reports.push_back(t);
}
}
} else {
if (req.runtime_state->query_type() == TQueryType::LOAD) {
params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
Expand Down Expand Up @@ -405,11 +417,13 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
TFragmentInstanceReport t;
t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
t.__set_num_finished_range(req.runtime_state->num_finished_range());
t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
params.fragment_instance_reports.push_back(t);
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
req.runtime_state->num_finished_range() > 0) {
rs->num_finished_range() > 0) {
params.__isset.load_counters = true;
num_rows_load_success += rs->num_rows_load_success();
num_rows_load_filtered += rs->num_rows_load_filtered();
Expand All @@ -418,6 +432,8 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
TFragmentInstanceReport t;
t.__set_fragment_instance_id(rs->fragment_instance_id());
t.__set_num_finished_range(rs->num_finished_range());
t.__set_loaded_rows(rs->num_rows_load_total());
t.__set_loaded_bytes(rs->num_bytes_load_total());
params.fragment_instance_reports.push_back(t);
}
}
Expand Down
6 changes: 3 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -2464,7 +2464,7 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) {
for (TFragmentInstanceReport report : params.getFragmentInstanceReports()) {
Env.getCurrentEnv().getLoadManager().updateJobProgress(
jobId, params.getBackendId(), params.getQueryId(), report.getFragmentInstanceId(),
params.getLoadedRows(), params.getLoadedBytes(), params.isDone());
report.getLoadedRows(), report.getLoadedBytes(), params.isDone());
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
params.getQueryId(), report.getFragmentInstanceId(), report.getNumFinishedRange());
}
Expand Down Expand Up @@ -2888,9 +2888,9 @@ public void unsetFields() {
// Has to use synchronized to ensure there are not concurrent update threads. Or the done
// state maybe update wrong and will lose data. see https://github.com/apache/doris/pull/29802/files.
public synchronized boolean updatePipelineStatus(TReportExecStatusParams params) {
// The fragment or instance is not finished, not need update
// The fragment or instance is not finished, still need update progress
if (!params.done) {
return false;
return true;
}
if (this.done) {
// duplicate packet
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ struct TQueryProfile {
struct TFragmentInstanceReport {
1: optional Types.TUniqueId fragment_instance_id;
2: optional i32 num_finished_range;
3: optional i64 loaded_rows
4: optional i64 loaded_bytes
}


Expand Down

0 comments on commit c134446

Please sign in to comment.