Skip to content

Commit

Permalink
[feature](profile)Enable merging of incomplete profiles. (apache#39560)
Browse files Browse the repository at this point in the history
## Proposed changes

```
java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0
	at jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) ~[?:?]
	at jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) ~[?:?]
	at jdk.internal.util.Preconditions.checkIndex(Preconditions.java:266) ~[?:?]
	at java.util.Objects.checkIndex(Objects.java:359) ~[?:?]
	at java.util.ArrayList.get(ArrayList.java:427) ~[?:?]
	at org.apache.doris.common.profile.ExecutionProfile.getPipelineAggregatedProfile(ExecutionProfile.java:142) ~[doris-fe.jar:1.2-SNAPSHOT]
```

In the past, we needed to ensure that profiles were complete before
merging. Now, this allows incomplete profiles to be merged, with missing
profiles being marked in the merged profile.
```
                                -  ProjectionTime:  avg  0ns,  max  0ns,  min  0ns
                                -  RowsProduced:  sum  0,  avg  0,  max  0,  min  0
                                -  WaitForDependency[SORT_OPERATOR_DEPENDENCY]Time:  avg  15min2sec,  max  15min2sec,  min  15min2sec
                  Pipeline  :  3(miss  profile):
                  Pipeline  :  4(instance_num=48):
                      LOCAL_EXCHANGE_SINK_OPERATOR  (PASSTHROUGH)  (id=-14):
                            -  CloseTime:  avg  0ns,  max  0ns,  min  0ns
                            -  ExecTime:  avg  29.410us,  max  43.336us,  min  
```


<!--Describe your changes.-->
  • Loading branch information
Mryange authored Aug 30, 2024
1 parent f852385 commit ce25a19
Showing 1 changed file with 74 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* root is used to collect profile of a complete query plan(including query or load).
Expand Down Expand Up @@ -74,6 +74,7 @@ public class ExecutionProfile {

// use to merge profile from multi be
private Map<Integer, Map<TNetworkAddress, List<RuntimeProfile>>> multiBeProfile = null;
private ReentrantReadWriteLock multiBeProfileLock = new ReentrantReadWriteLock();

// Not serialize this property, it is only used to get profile id.
private SummaryProfile summaryProfile;
Expand All @@ -97,8 +98,7 @@ public ExecutionProfile(TUniqueId queryId, List<Integer> fragmentIds) {
RuntimeProfile runtimeProfile = new RuntimeProfile("Fragment " + i);
fragmentProfiles.put(fragmentId, runtimeProfile);
fragmentsProfile.addChild(runtimeProfile);
multiBeProfile.put(fragmentId,
new ConcurrentHashMap<TNetworkAddress, List<RuntimeProfile>>());
multiBeProfile.put(fragmentId, Maps.newHashMap());
fragmentIdBeNum.put(fragmentId, 0);
seqNoToFragmentId.put(i, fragmentId);
++i;
Expand All @@ -108,24 +108,54 @@ public ExecutionProfile(TUniqueId queryId, List<Integer> fragmentIds) {
}

private List<List<RuntimeProfile>> getMultiBeProfile(int fragmentId) {
Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline = multiBeProfile.get(fragmentId);
List<List<RuntimeProfile>> allPipelines = Lists.newArrayList();
int pipelineSize = 0;
for (List<RuntimeProfile> profiles : multiPipeline.values()) {
pipelineSize = profiles.size();
break;
}
for (int pipelineIdx = 0; pipelineIdx < pipelineSize; pipelineIdx++) {
List<RuntimeProfile> allPipelineTask = new ArrayList<RuntimeProfile>();
for (List<RuntimeProfile> pipelines : multiPipeline.values()) {
RuntimeProfile pipeline = pipelines.get(pipelineIdx);
for (Pair<RuntimeProfile, Boolean> runtimeProfile : pipeline.getChildList()) {
allPipelineTask.add(runtimeProfile.first);
multiBeProfileLock.readLock().lock();
try {
// A fragment in the BE contains multiple pipelines, and each pipeline contains
// multiple pipeline tasks.
Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline = multiBeProfile.get(fragmentId);
List<List<RuntimeProfile>> allPipelines = Lists.newArrayList();
int pipelineSize = -1;
for (TNetworkAddress beAddress : multiPipeline.keySet()) {
List<RuntimeProfile> profileSingleBE = multiPipeline.get(beAddress);
// Check that within the same fragment across all BEs, there should be the same
// number of pipelines.
if (pipelineSize == -1) {
pipelineSize = profileSingleBE.size();
} else {
if (pipelineSize != profileSingleBE.size()) {
LOG.warn("The profile sizes of the two BE are different, {} vs {}", pipelineSize,
profileSingleBE.size());
pipelineSize = Math.max(pipelineSize, profileSingleBE.size());
}
}
}
for (int pipelineIdx = 0; pipelineIdx < pipelineSize; pipelineIdx++) {
List<RuntimeProfile> allPipelineTask = new ArrayList<RuntimeProfile>();
for (List<RuntimeProfile> profileSingleBE : multiPipeline.values()) {
RuntimeProfile pipeline = profileSingleBE.get(pipelineIdx);
for (Pair<RuntimeProfile, Boolean> pipelineTaskProfile : pipeline.getChildList()) {
allPipelineTask.add(pipelineTaskProfile.first);
}
}
if (allPipelineTask.isEmpty()) {
LOG.warn("None of the BEs have pipeline task profiles in fragmentId:{} , pipelineIdx:{}",
fragmentId, pipelineIdx);
}
allPipelines.add(allPipelineTask);
}
allPipelines.add(allPipelineTask);
return allPipelines;
} finally {
multiBeProfileLock.readLock().unlock();
}
}

void setMultiBeProfile(int fragmentId, TNetworkAddress backendHBAddress, List<RuntimeProfile> taskProfile) {
multiBeProfileLock.writeLock().lock();
try {
multiBeProfile.get(fragmentId).put(backendHBAddress, taskProfile);
} finally {
multiBeProfileLock.writeLock().unlock();
}
return allPipelines;
}

private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> planNodeMap) {
Expand All @@ -136,11 +166,20 @@ private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> planNod
List<List<RuntimeProfile>> allPipelines = getMultiBeProfile(seqNoToFragmentId.get(i));
int pipelineIdx = 0;
for (List<RuntimeProfile> allPipelineTask : allPipelines) {
RuntimeProfile mergedpipelineProfile = new RuntimeProfile(
"Pipeline : " + pipelineIdx + "(instance_num="
+ allPipelineTask.size() + ")",
allPipelineTask.get(0).nodeId());
RuntimeProfile.mergeProfiles(allPipelineTask, mergedpipelineProfile, planNodeMap);
RuntimeProfile mergedpipelineProfile = null;
if (allPipelineTask.isEmpty()) {
// It is possible that the profile collection may be incomplete, so only part of
// the profile will be merged here.
mergedpipelineProfile = new RuntimeProfile(
"Pipeline : " + pipelineIdx + "(miss profile)",
-pipelineIdx);
} else {
mergedpipelineProfile = new RuntimeProfile(
"Pipeline : " + pipelineIdx + "(instance_num="
+ allPipelineTask.size() + ")",
allPipelineTask.get(0).nodeId());
RuntimeProfile.mergeProfiles(allPipelineTask, mergedpipelineProfile, planNodeMap);
}
newFragmentProfile.addChild(mergedpipelineProfile);
pipelineIdx++;
}
Expand Down Expand Up @@ -207,7 +246,11 @@ public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddr
pipelineIdx++;
}
RuntimeProfile profileNode = new RuntimeProfile(name);
taskProfile.add(profileNode);
// The taskprofile is used to save the profile of the pipeline, without
// considering the FragmentLevel.
if (!(pipelineProfile.isSetIsFragmentLevel() && pipelineProfile.is_fragment_level)) {
taskProfile.add(profileNode);
}
if (!pipelineProfile.isSetProfile()) {
LOG.warn("Profile is not set, {}", DebugUtil.printId(profile.getQueryId()));
return new Status(TStatusCode.INVALID_ARGUMENT, "Profile is not set");
Expand All @@ -217,7 +260,7 @@ public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddr
profileNode.setIsDone(isDone);
fragmentProfiles.get(fragmentId).addChild(profileNode);
}
multiBeProfile.get(fragmentId).put(backendHBAddress, taskProfile);
setMultiBeProfile(fragmentId, backendHBAddress, taskProfile);
}

if (profile.isSetLoadChannelProfiles()) {
Expand Down Expand Up @@ -255,7 +298,11 @@ public void updateProfile(TReportExecStatusParams params) {
pipelineIdx++;
}
RuntimeProfile profile = new RuntimeProfile(name);
taskProfile.add(profile);
// The taskprofile is used to save the profile of the pipeline, without
// considering the FragmentLevel.
if (!(param.isSetIsFragmentLevel() && param.is_fragment_level)) {
taskProfile.add(profile);
}
if (param.isSetProfile()) {
profile.update(param.profile);
}
Expand All @@ -270,8 +317,7 @@ public void updateProfile(TReportExecStatusParams params) {
if (params.isSetLoadChannelProfile()) {
loadChannelProfile.update(params.loadChannelProfile);
}

multiBeProfile.get(params.fragment_id).put(backend.getHeartbeatAddress(), taskProfile);
setMultiBeProfile(params.fragment_id, backend.getHeartbeatAddress(), taskProfile);
}

public synchronized void addFragmentBackend(PlanFragmentId fragmentId, Long backendId) {
Expand Down

0 comments on commit ce25a19

Please sign in to comment.