Skip to content

Commit

Permalink
[improvement](balance) fix multiple problems for balance on large clu…
Browse files Browse the repository at this point in the history
…ster (#35057)
  • Loading branch information
yujun777 authored May 27, 2024
1 parent 3aa1a87 commit 2f1bbed
Show file tree
Hide file tree
Showing 11 changed files with 546 additions and 134 deletions.
29 changes: 29 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,30 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static double balance_load_score_threshold = 0.1; // 10%

// if disk usage > balance_load_score_threshold + urgent_disk_usage_extra_threshold
// then this disk need schedule quickly
// this value could less than 0.
@ConfField(mutable = true, masterOnly = true)
public static double urgent_balance_disk_usage_extra_threshold = 0.05;

// when run urgent disk balance, shuffle the top large tablets
// range: [ 0 ~ 100 ]
@ConfField(mutable = true, masterOnly = true)
public static int urgent_balance_shuffle_large_tablet_percentage = 1;

@ConfField(mutable = true, masterOnly = true)
public static double urgent_balance_pick_large_tablet_num_threshold = 1000;

// range: 0 ~ 100
@ConfField(mutable = true, masterOnly = true)
public static int urgent_balance_pick_large_disk_usage_percentage = 80;

// there's a case, all backend has a high disk, by default, it will not run urgent disk balance.
// if set this value to true, urgent disk balance will always run,
// the backends will exchange tablets among themselves.
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_urgent_balance_no_low_backend = true;

/**
* if set to true, TabletScheduler will not do balance.
*/
Expand All @@ -1017,6 +1041,11 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static boolean disable_disk_balance = false;

// balance order
// ATTN: a temporary config, may delete later.
@ConfField(mutable = true, masterOnly = true)
public static boolean balance_be_then_disk = true;

/**
* if set to false, TabletScheduler will not do disk balance for replica num = 1.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public String toString() {
return first.toString() + ":" + second.toString();
}

public static class PairComparator<T extends Pair<?, Comparable>> implements Comparator<T> {
public static class PairComparator<T extends Pair<?, ? extends Comparable>> implements Comparator<T> {
@Override
public int compare(T o1, T o2) {
return o1.second.compareTo(o2.second);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,19 +626,28 @@ public List<Long> getTabletIdsByBackendId(long backendId) {
return tabletIds;
}

public List<Long> getTabletIdsByBackendIdAndStorageMedium(long backendId, TStorageMedium storageMedium) {
List<Long> tabletIds = Lists.newArrayList();
public List<Pair<Long, Long>> getTabletSizeByBackendIdAndStorageMedium(long backendId,
TStorageMedium storageMedium) {
List<Pair<Long, Long>> tabletIdSizes = Lists.newArrayList();
long stamp = readLock();
try {
Map<Long, Replica> replicaMetaWithBackend = backingReplicaMetaTable.row(backendId);
if (replicaMetaWithBackend != null) {
tabletIds = replicaMetaWithBackend.keySet().stream().filter(
id -> tabletMetaMap.get(id).getStorageMedium() == storageMedium).collect(Collectors.toList());
tabletIdSizes = replicaMetaWithBackend.entrySet().stream()
.filter(entry -> tabletMetaMap.get(entry.getKey()).getStorageMedium() == storageMedium)
.map(entry -> Pair.of(entry.getKey(), entry.getValue().getDataSize()))
.collect(Collectors.toList());
}
} finally {
readUnlock(stamp);
}
return tabletIds;
return tabletIdSizes;
}

public List<Long> getTabletIdsByBackendIdAndStorageMedium(long backendId,
TStorageMedium storageMedium) {
return getTabletSizeByBackendIdAndStorageMedium(backendId, storageMedium).stream()
.map(Pair::key).collect(Collectors.toList());
}

public int getTabletNumByBackendId(long backendId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class BackendLoadStatistic {
private static final Logger LOG = LogManager.getLogger(BackendLoadStatistic.class);
Expand Down Expand Up @@ -166,6 +168,7 @@ public double getReplicaNumCoefficient() {
private Map<TStorageMedium, Long> totalReplicaNumMap = Maps.newHashMap();
private Map<TStorageMedium, LoadScore> loadScoreMap = Maps.newHashMap();
private Map<TStorageMedium, Classification> clazzMap = Maps.newHashMap();
private Map<TStorageMedium, Classification> maxDiskClazzMap = Maps.newHashMap();
private List<RootPathLoadStatistic> pathStatistics = Lists.newArrayList();

public BackendLoadStatistic(long beId, Tag tag, SystemInfoService infoService,
Expand Down Expand Up @@ -227,6 +230,14 @@ public Classification getClazz(TStorageMedium medium) {
return clazzMap.getOrDefault(medium, Classification.INIT);
}

public void setMaxDiskClazz(TStorageMedium medium, Classification clazz) {
this.maxDiskClazzMap.put(medium, clazz);
}

public Classification getMaxDiskClazz(TStorageMedium medium) {
return maxDiskClazzMap.getOrDefault(medium, Classification.INIT);
}

public void init() throws LoadBalanceException {
Backend be = infoService.getBackend(beId);
if (be == null) {
Expand All @@ -246,9 +257,17 @@ public void init() throws LoadBalanceException {
+ (diskInfo.getTotalCapacityB() - diskInfo.getAvailableCapacityB()));
}

// Doris-compose put test all backends' disks on the same physical disk.
// Make a little change here.
long usedCapacityB = diskInfo.getDiskUsedCapacityB();
if (Config.be_rebalancer_fuzzy_test) {
usedCapacityB = Math.min(diskInfo.getTotalCapacityB(),
usedCapacityB + Math.abs(diskInfo.getPathHash()) % 10000);
}

RootPathLoadStatistic pathStatistic = new RootPathLoadStatistic(beId, diskInfo.getRootPath(),
diskInfo.getPathHash(), diskInfo.getStorageMedium(),
diskInfo.getTotalCapacityB(), diskInfo.getDiskUsedCapacityB(), diskInfo.getState());
diskInfo.getTotalCapacityB(), usedCapacityB, diskInfo.getState());
pathStatistics.add(pathStatistic);
}

Expand Down Expand Up @@ -295,14 +314,14 @@ private void classifyPathByLoad(TStorageMedium medium) {
if (Math.abs(pathStat.getUsedPercent() - avgUsedPercent)
> Math.max(avgUsedPercent * Config.balance_load_score_threshold, 0.025)) {
if (pathStat.getUsedPercent() > avgUsedPercent) {
pathStat.setClazz(Classification.HIGH);
pathStat.setLocalClazz(Classification.HIGH);
highCounter++;
} else if (pathStat.getUsedPercent() < avgUsedPercent) {
pathStat.setClazz(Classification.LOW);
pathStat.setLocalClazz(Classification.LOW);
lowCounter++;
}
} else {
pathStat.setClazz(Classification.MID);
pathStat.setLocalClazz(Classification.MID);
midCounter++;
}
}
Expand Down Expand Up @@ -422,14 +441,19 @@ public BalanceStatus isFit(long tabletSize, TStorageMedium medium,

BalanceStatus bStatus = pathStatistic.isFit(tabletSize, isSupplement);
if (!bStatus.ok()) {
status.addErrMsgs(bStatus.getErrMsgs());
if (status != BalanceStatus.OK) {
status.addErrMsgs(bStatus.getErrMsgs());
}
continue;
}

result.add(pathStatistic);
if (result != null) {
result.add(pathStatistic);
}
status = BalanceStatus.OK;
}

return result.isEmpty() ? status : BalanceStatus.OK;
return status;
}

/**
Expand Down Expand Up @@ -508,9 +532,9 @@ public void getPathStatisticByClass(
continue;
}

if (pathStat.getClazz() == Classification.LOW) {
if (pathStat.getLocalClazz() == Classification.LOW) {
low.add(pathStat.getPathHash());
} else if (pathStat.getClazz() == Classification.HIGH) {
} else if (pathStat.getLocalClazz() == Classification.HIGH) {
high.add(pathStat.getPathHash());
} else {
mid.add(pathStat.getPathHash());
Expand All @@ -529,9 +553,9 @@ public void getPathStatisticByClass(List<RootPathLoadStatistic> low,
continue;
}

if (pathStat.getClazz() == Classification.LOW) {
if (pathStat.getLocalClazz() == Classification.LOW) {
low.add(pathStat);
} else if (pathStat.getClazz() == Classification.HIGH) {
} else if (pathStat.getLocalClazz() == Classification.HIGH) {
high.add(pathStat);
} else {
mid.add(pathStat);
Expand Down Expand Up @@ -569,9 +593,22 @@ public List<RootPathLoadStatistic> getPathStatistics() {
return pathStatistics;
}

public long getAvailPathNum(TStorageMedium medium) {
return pathStatistics.stream().filter(
p -> p.getDiskState() == DiskState.ONLINE && p.getStorageMedium() == medium).count();
RootPathLoadStatistic getPathStatisticByPathHash(long pathHash) {
return pathStatistics.stream().filter(pathStat -> pathStat.getPathHash() == pathHash)
.findFirst().orElse(null);
}

public List<RootPathLoadStatistic> getAvailPaths(TStorageMedium medium) {
return getAvailPathStream(medium).collect(Collectors.toList());
}

public boolean hasAvailPathWithGlobalClazz(TStorageMedium medium, Classification globalClazz) {
return getAvailPathStream(medium).anyMatch(pathStat -> pathStat.getGlobalClazz() == globalClazz);
}

private Stream<RootPathLoadStatistic> getAvailPathStream(TStorageMedium medium) {
return pathStatistics.stream()
.filter(p -> p.getDiskState() == DiskState.ONLINE && p.getStorageMedium() == medium);
}

public boolean hasMedium(TStorageMedium medium) {
Expand Down Expand Up @@ -603,14 +640,15 @@ public List<String> getInfo(TStorageMedium medium) {
long total = totalCapacityMap.getOrDefault(medium, 0L);
info.add(String.valueOf(used));
info.add(String.valueOf(total));
info.add(getMaxDiskClazz(medium).name());
info.add(String.valueOf(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(used * 100
/ (double) total)));
info.add(String.valueOf(totalReplicaNumMap.getOrDefault(medium, 0L)));
LoadScore loadScore = loadScoreMap.getOrDefault(medium, new LoadScore());
info.add(String.valueOf(loadScore.capacityCoefficient));
info.add(String.valueOf(loadScore.getReplicaNumCoefficient()));
info.add(String.valueOf(loadScore.score));
info.add(clazzMap.getOrDefault(medium, Classification.INIT).name());
info.add(getClazz(medium).name());
return info;
}

Expand Down
Loading

0 comments on commit 2f1bbed

Please sign in to comment.