Skip to content

Commit

Permalink
[fix](pipelinex) fix fragment instance progress reports (part 2) (apa…
Browse files Browse the repository at this point in the history
…che#40694)

This PR is a followup of apache#40325. Because PipelineX has deprecated the
old report.
This PR fixed the `ScannedRows` and `LoadBytes` in the progress of the
`SHOW LOAD` result.

Currently the progress will only be updated when a fragment instance is
done.
Timely progress updates will be supported in a later PR.
  • Loading branch information
kaijchen committed Oct 10, 2024
1 parent aa541fd commit 95e9531
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 1 deletion.
4 changes: 4 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
TFragmentInstanceReport t;
t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
t.__set_num_finished_range(req.runtime_state->num_finished_range());
t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
params.fragment_instance_reports.push_back(t);
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
Expand All @@ -427,6 +429,8 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
TFragmentInstanceReport t;
t.__set_fragment_instance_id(rs->fragment_instance_id());
t.__set_num_finished_range(rs->num_finished_range());
t.__set_loaded_rows(rs->num_rows_load_total());
t.__set_loaded_bytes(rs->num_bytes_load_total());
params.fragment_instance_reports.push_back(t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2818,7 +2818,7 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) {
for (TFragmentInstanceReport report : params.getFragmentInstanceReports()) {
Env.getCurrentEnv().getLoadManager().updateJobProgress(
jobId, params.getBackendId(), params.getQueryId(), report.getFragmentInstanceId(),
params.getLoadedRows(), params.getLoadedBytes(), params.isDone());
report.getLoadedRows(), report.getLoadedBytes(), params.isDone());
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
params.getQueryId(), report.getFragmentInstanceId(), report.getNumFinishedRange());
}
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ struct TReportWorkloadRuntimeStatusParams {
struct TFragmentInstanceReport {
1: optional Types.TUniqueId fragment_instance_id;
2: optional i32 num_finished_range;
3: optional i64 loaded_rows
4: optional i64 loaded_bytes
}

// The results of an INSERT query, sent to the coordinator as part of
Expand Down

0 comments on commit 95e9531

Please sign in to comment.