From 648212a0260e7ba82269aa44dd7faddd5ea6349d Mon Sep 17 00:00:00 2001 From: yujun Date: Wed, 7 Feb 2024 07:02:41 +0800 Subject: [PATCH] [improvement](balance) fix multiple problems for balance on large cluster (#30713) --- .../java/org/apache/doris/common/Config.java | 31 ++- .../java/org/apache/doris/common/Pair.java | 2 +- .../doris/catalog/TabletInvertedIndex.java | 19 +- .../doris/clone/BackendLoadStatistic.java | 68 +++++-- .../apache/doris/clone/BeLoadRebalancer.java | 190 +++++++++++++----- .../doris/clone/LoadStatisticForTag.java | 166 ++++++++++++++- .../doris/clone/RootPathLoadStatistic.java | 30 ++- .../apache/doris/clone/TabletScheduler.java | 96 +++++++-- .../proc/BackendLoadStatisticProcNode.java | 2 +- .../proc/ClusterLoadStatisticProcDir.java | 2 +- .../clone/ClusterLoadStatisticsTest.java | 72 ++++--- 11 files changed, 544 insertions(+), 134 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index a7f642944ed52e..5a239a8c40714f 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1014,6 +1014,30 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static double balance_load_score_threshold = 0.1; // 10% + // if disk usage > balance_load_score_threshold + urgent_disk_usage_extra_threshold + // then this disk need schedule quickly + // this value could less than 0. + @ConfField(mutable = true, masterOnly = true) + public static double urgent_balance_disk_usage_extra_threshold = 0.05; + + // when run urgent disk balance, shuffle the top large tablets + // range: [ 0 ~ 100 ] + @ConfField(mutable = true, masterOnly = true) + public static int urgent_balance_shuffle_large_tablet_percentage = 1; + + @ConfField(mutable = true, masterOnly = true) + public static double urgent_balance_pick_large_tablet_num_threshold = 1000; + + // range: 0 ~ 100 + @ConfField(mutable = true, masterOnly = true) + public static int urgent_balance_pick_large_disk_usage_percentage = 80; + + // there's a case, all backend has a high disk, by default, it will not run urgent disk balance. + // if set this value to true, urgent disk balance will always run, + // the backends will exchange tablets among themselves. + @ConfField(mutable = true, masterOnly = true) + public static boolean enable_urgent_balance_no_low_backend = true; + /** * if set to true, TabletScheduler will not do balance. */ @@ -1024,7 +1048,7 @@ public class Config extends ConfigBase { * when be rebalancer idle, then disk balance will occurs. */ @ConfField(mutable = true, masterOnly = true) - public static int be_rebalancer_idle_seconds = 60; + public static int be_rebalancer_idle_seconds = 0; /** * if set to true, TabletScheduler will not do disk balance. @@ -1032,6 +1056,11 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static boolean disable_disk_balance = false; + // balance order + // ATTN: a temporary config, may delete later. + @ConfField(mutable = true, masterOnly = true) + public static boolean balance_be_then_disk = true; + /** * if set to false, TabletScheduler will not do disk balance for replica num = 1. */ diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Pair.java b/fe/fe-common/src/main/java/org/apache/doris/common/Pair.java index c3ae810582c6db..7d1736a0754b04 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Pair.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Pair.java @@ -78,7 +78,7 @@ public String toString() { return first.toString() + ":" + second.toString(); } - public static class PairComparator> implements Comparator { + public static class PairComparator> implements Comparator { @Override public int compare(T o1, T o2) { return o1.second.compareTo(o2.second); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index e01438e90c40f2..84be2175bcf81b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -644,19 +644,28 @@ public List getTabletIdsByBackendId(long backendId) { return tabletIds; } - public List getTabletIdsByBackendIdAndStorageMedium(long backendId, TStorageMedium storageMedium) { - List tabletIds = Lists.newArrayList(); + public List> getTabletSizeByBackendIdAndStorageMedium(long backendId, + TStorageMedium storageMedium) { + List> tabletIdSizes = Lists.newArrayList(); long stamp = readLock(); try { Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); if (replicaMetaWithBackend != null) { - tabletIds = replicaMetaWithBackend.keySet().stream().filter( - id -> tabletMetaMap.get(id).getStorageMedium() == storageMedium).collect(Collectors.toList()); + tabletIdSizes = replicaMetaWithBackend.entrySet().stream() + .filter(entry -> tabletMetaMap.get(entry.getKey()).getStorageMedium() == storageMedium) + .map(entry -> Pair.of(entry.getKey(), entry.getValue().getDataSize())) + .collect(Collectors.toList()); } } finally { readUnlock(stamp); } - return tabletIds; + return tabletIdSizes; + } + + public List getTabletIdsByBackendIdAndStorageMedium(long backendId, + TStorageMedium storageMedium) { + return getTabletSizeByBackendIdAndStorageMedium(backendId, storageMedium).stream() + .map(Pair::key).collect(Collectors.toList()); } public int getTabletNumByBackendId(long backendId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java index 9ac6fce7f2b9e5..f1f632c997a99d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java @@ -39,6 +39,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class BackendLoadStatistic { private static final Logger LOG = LogManager.getLogger(BackendLoadStatistic.class); @@ -166,6 +168,7 @@ public double getReplicaNumCoefficient() { private Map totalReplicaNumMap = Maps.newHashMap(); private Map loadScoreMap = Maps.newHashMap(); private Map clazzMap = Maps.newHashMap(); + private Map maxDiskClazzMap = Maps.newHashMap(); private List pathStatistics = Lists.newArrayList(); public BackendLoadStatistic(long beId, Tag tag, SystemInfoService infoService, @@ -227,6 +230,14 @@ public Classification getClazz(TStorageMedium medium) { return clazzMap.getOrDefault(medium, Classification.INIT); } + public void setMaxDiskClazz(TStorageMedium medium, Classification clazz) { + this.maxDiskClazzMap.put(medium, clazz); + } + + public Classification getMaxDiskClazz(TStorageMedium medium) { + return maxDiskClazzMap.getOrDefault(medium, Classification.INIT); + } + public void init() throws LoadBalanceException { Backend be = infoService.getBackend(beId); if (be == null) { @@ -246,9 +257,17 @@ public void init() throws LoadBalanceException { + (diskInfo.getTotalCapacityB() - diskInfo.getAvailableCapacityB())); } + // Doris-compose put test all backends' disks on the same physical disk. + // Make a little change here. + long usedCapacityB = diskInfo.getDiskUsedCapacityB(); + if (Config.be_rebalancer_fuzzy_test) { + usedCapacityB = Math.min(diskInfo.getTotalCapacityB(), + usedCapacityB + Math.abs(diskInfo.getPathHash()) % 10000); + } + RootPathLoadStatistic pathStatistic = new RootPathLoadStatistic(beId, diskInfo.getRootPath(), diskInfo.getPathHash(), diskInfo.getStorageMedium(), - diskInfo.getTotalCapacityB(), diskInfo.getDiskUsedCapacityB(), diskInfo.getState()); + diskInfo.getTotalCapacityB(), usedCapacityB, diskInfo.getState()); pathStatistics.add(pathStatistic); } @@ -295,14 +314,14 @@ private void classifyPathByLoad(TStorageMedium medium) { if (Math.abs(pathStat.getUsedPercent() - avgUsedPercent) > Math.max(avgUsedPercent * Config.balance_load_score_threshold, 0.025)) { if (pathStat.getUsedPercent() > avgUsedPercent) { - pathStat.setClazz(Classification.HIGH); + pathStat.setLocalClazz(Classification.HIGH); highCounter++; } else if (pathStat.getUsedPercent() < avgUsedPercent) { - pathStat.setClazz(Classification.LOW); + pathStat.setLocalClazz(Classification.LOW); lowCounter++; } } else { - pathStat.setClazz(Classification.MID); + pathStat.setLocalClazz(Classification.MID); midCounter++; } } @@ -422,14 +441,19 @@ public BalanceStatus isFit(long tabletSize, TStorageMedium medium, BalanceStatus bStatus = pathStatistic.isFit(tabletSize, isSupplement); if (!bStatus.ok()) { - status.addErrMsgs(bStatus.getErrMsgs()); + if (status != BalanceStatus.OK) { + status.addErrMsgs(bStatus.getErrMsgs()); + } continue; } - result.add(pathStatistic); + if (result != null) { + result.add(pathStatistic); + } + status = BalanceStatus.OK; } - return result.isEmpty() ? status : BalanceStatus.OK; + return status; } /** @@ -508,9 +532,9 @@ public void getPathStatisticByClass( continue; } - if (pathStat.getClazz() == Classification.LOW) { + if (pathStat.getLocalClazz() == Classification.LOW) { low.add(pathStat.getPathHash()); - } else if (pathStat.getClazz() == Classification.HIGH) { + } else if (pathStat.getLocalClazz() == Classification.HIGH) { high.add(pathStat.getPathHash()); } else { mid.add(pathStat.getPathHash()); @@ -529,9 +553,9 @@ public void getPathStatisticByClass(List low, continue; } - if (pathStat.getClazz() == Classification.LOW) { + if (pathStat.getLocalClazz() == Classification.LOW) { low.add(pathStat); - } else if (pathStat.getClazz() == Classification.HIGH) { + } else if (pathStat.getLocalClazz() == Classification.HIGH) { high.add(pathStat); } else { mid.add(pathStat); @@ -569,9 +593,22 @@ public List getPathStatistics() { return pathStatistics; } - public long getAvailPathNum(TStorageMedium medium) { - return pathStatistics.stream().filter( - p -> p.getDiskState() == DiskState.ONLINE && p.getStorageMedium() == medium).count(); + RootPathLoadStatistic getPathStatisticByPathHash(long pathHash) { + return pathStatistics.stream().filter(pathStat -> pathStat.getPathHash() == pathHash) + .findFirst().orElse(null); + } + + public List getAvailPaths(TStorageMedium medium) { + return getAvailPathStream(medium).collect(Collectors.toList()); + } + + public boolean hasAvailPathWithGlobalClazz(TStorageMedium medium, Classification globalClazz) { + return getAvailPathStream(medium).anyMatch(pathStat -> pathStat.getGlobalClazz() == globalClazz); + } + + private Stream getAvailPathStream(TStorageMedium medium) { + return pathStatistics.stream() + .filter(p -> p.getDiskState() == DiskState.ONLINE && p.getStorageMedium() == medium); } public boolean hasMedium(TStorageMedium medium) { @@ -603,6 +640,7 @@ public List getInfo(TStorageMedium medium) { long total = totalCapacityMap.getOrDefault(medium, 0L); info.add(String.valueOf(used)); info.add(String.valueOf(total)); + info.add(getMaxDiskClazz(medium).name()); info.add(String.valueOf(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(used * 100 / (double) total))); info.add(String.valueOf(totalReplicaNumMap.getOrDefault(medium, 0L))); @@ -610,7 +648,7 @@ public List getInfo(TStorageMedium medium) { info.add(String.valueOf(loadScore.capacityCoefficient)); info.add(String.valueOf(loadScore.getReplicaNumCoefficient())); info.add(String.valueOf(loadScore.score)); - info.add(clazzMap.getOrDefault(medium, Classification.INIT).name()); + info.add(getClazz(medium).name()); return info; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java index e7b6211bd79a54..b40d7f7a512356 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java @@ -25,12 +25,14 @@ import org.apache.doris.catalog.TabletMeta; import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair; import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPairComparator; +import org.apache.doris.clone.BackendLoadStatistic.Classification; import org.apache.doris.clone.SchedException.Status; import org.apache.doris.clone.SchedException.SubCode; import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; @@ -45,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /* * BeLoadRebalancer strategy: @@ -79,12 +82,9 @@ public BeLoadRebalancer(SystemInfoService infoService, TabletInvertedIndex inver protected List selectAlternativeTabletsForCluster( LoadStatisticForTag clusterStat, TStorageMedium medium) { List alternativeTablets = Lists.newArrayList(); - - // get classification of backends List lowBEs = Lists.newArrayList(); - List midBEs = Lists.newArrayList(); List highBEs = Lists.newArrayList(); - clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium); + boolean isUrgent = clusterStat.getLowHighBEsWithIsUrgent(lowBEs, highBEs, medium); if (lowBEs.isEmpty() && highBEs.isEmpty()) { LOG.debug("cluster is balance with medium: {}. skip", medium); @@ -117,6 +117,8 @@ protected List selectAlternativeTabletsForCluster( } LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium); + List alternativeTabletInfos = Lists.newArrayList(); + // Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread) // so in clone ut recycleBin need to set to null. CatalogRecycleBin recycleBin = null; @@ -125,6 +127,10 @@ protected List selectAlternativeTabletsForCluster( } int clusterAvailableBEnum = infoService.getAllBackendIds(true).size(); ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex(); + List> lowBETablets = lowBEs.stream() + .map(beStat -> Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId()))) + .collect(Collectors.toList()); + // choose tablets from high load backends. // BackendLoadStatistic is sorted by load score in ascend order, // so we need to traverse it from last to first @@ -136,37 +142,73 @@ protected List selectAlternativeTabletsForCluster( continue; } - // classify the paths. - Set pathLow = Sets.newHashSet(); - Set pathMid = Sets.newHashSet(); - Set pathHigh = Sets.newHashSet(); - beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium); - // we only select tablets from available mid and high load path - pathHigh.addAll(pathMid); - - // get all tablets on this backend, and shuffle them for random selection - List tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(beStat.getBeId(), medium); - Collections.shuffle(tabletIds); + boolean choseHighDisk = isUrgent && beStat.getMaxDiskClazz(medium) == Classification.HIGH; // for each path, we try to select at most BALANCE_SLOT_NUM_FOR_PATH tablets Map remainingPaths = Maps.newHashMap(); + Set pathHigh = null; + if (choseHighDisk) { + pathHigh = beStat.getAvailPaths(medium).stream().filter(RootPathLoadStatistic::isGlobalHighUsage) + .map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet()); + } else { + // classify the paths. + pathHigh = Sets.newHashSet(); + Set pathLow = Sets.newHashSet(); + Set pathMid = Sets.newHashSet(); + beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium); + // we only select tablets from available mid and high load path + pathHigh.addAll(pathMid); + } + + double highDiskMaxUsage = 0; for (Long pathHash : pathHigh) { int availBalanceNum = pathSlot.getAvailableBalanceNum(pathHash); if (availBalanceNum > 0) { remainingPaths.put(pathHash, availBalanceNum); } + + RootPathLoadStatistic pathStat = beStat.getPathStatisticByPathHash(pathHash); + if (pathStat != null) { + highDiskMaxUsage = Math.max(highDiskMaxUsage, pathStat.getUsedPercent()); + } } + LOG.debug("high be {}, medium: {}, path high: {}, remainingPaths: {}, chose high disk: {}", + beStat.getBeId(), medium, pathHigh, remainingPaths, choseHighDisk); + if (remainingPaths.isEmpty()) { continue; } + // get all tablets on this backend, and shuffle them for random selection + List> tabletIdSizes = invertedIndex.getTabletSizeByBackendIdAndStorageMedium( + beStat.getBeId(), medium); + if (!isUrgent + || tabletIdSizes.size() < Config.urgent_balance_pick_large_tablet_num_threshold + || highDiskMaxUsage < (double) Config.urgent_balance_pick_large_disk_usage_percentage / 100.0 + || Config.urgent_balance_shuffle_large_tablet_percentage >= 100 + || Config.urgent_balance_shuffle_large_tablet_percentage < 0) { + Collections.shuffle(tabletIdSizes); + } else { + Collections.sort(tabletIdSizes, new Pair.PairComparator>()); + if (Config.urgent_balance_shuffle_large_tablet_percentage > 0) { + int startIndex = (int) (tabletIdSizes.size() + * (1 - (double) Config.urgent_balance_shuffle_large_tablet_percentage / 100.0)); + Collections.shuffle(tabletIdSizes.subList(startIndex, tabletIdSizes.size())); + } + } + // select tablet from shuffled tablets - for (Long tabletId : tabletIds) { + for (int j = tabletIdSizes.size() - 1; j >= 0; j--) { + long tabletId = tabletIdSizes.get(j).key(); if (clusterAvailableBEnum <= invertedIndex.getReplicasByTabletId(tabletId).size()) { continue; } + if (alternativeTablets.stream().anyMatch(tabletCtx -> tabletId == tabletCtx.getTabletId())) { + continue; + } + Replica replica = invertedIndex.getReplica(tabletId, beStat.getBeId()); if (replica == null) { continue; @@ -186,20 +228,40 @@ protected List selectAlternativeTabletsForCluster( continue; } + // for urgent disk, pick tablets order by size, + // then it may always pick tablets that was on the low backends. + if (!lowBETablets.isEmpty() + && lowBETablets.stream().allMatch(tablets -> tablets.contains(tabletId))) { + continue; + } + if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId())) { continue; } + boolean isFit = lowBEs.stream().anyMatch(be -> be.isFit(replica.getDataSize(), + medium, null, false) == BalanceStatus.OK); + if (!isFit) { + if (LOG.isDebugEnabled()) { + LOG.debug("tablet {} with size {} medium {} not fit in low backends", + tabletId, replica.getDataSize(), medium); + } + continue; + } + TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(), tabletMeta.getIndexId(), tabletId, null /* replica alloc is not used for balance*/, System.currentTimeMillis()); tabletCtx.setTag(clusterStat.getTag()); // balance task's priority is always LOW - tabletCtx.setPriority(Priority.LOW); + tabletCtx.setPriority(isUrgent ? Priority.NORMAL : Priority.LOW); alternativeTablets.add(tabletCtx); + alternativeTabletInfos.add("{ tabletId=" + tabletId + ", beId=" + beStat.getBeId() + + ", pathHash=" + replica.getPathHash() + + ", replicaLocalSize=" + replica.getDataSize() + " }"); if (--numOfLowPaths <= 0) { // enough break OUTER; @@ -217,13 +279,13 @@ protected List selectAlternativeTabletsForCluster( } // end for high backends if (!alternativeTablets.isEmpty()) { - LOG.info("select alternative tablets, medium: {}, num: {}, detail: {}", - medium, alternativeTablets.size(), - alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray()); + LOG.info("select alternative tablets, medium: {}, is urgent: {}, num: {}, detail: {}", + medium, isUrgent, alternativeTablets.size(), alternativeTabletInfos); } return alternativeTablets; } + /* * Create a clone task of this selected tablet for balance. * 1. Check if this tablet has replica on high load backend. If not, the balance will be cancelled. @@ -239,17 +301,17 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { } // get classification of backends - List lowBe = Lists.newArrayList(); - List midBe = Lists.newArrayList(); - List highBe = Lists.newArrayList(); - clusterStat.getBackendStatisticByClass(lowBe, midBe, highBe, tabletCtx.getStorageMedium()); + List lowBEs = Lists.newArrayList(); + List highBEs = Lists.newArrayList(); + boolean isUrgent = clusterStat.getLowHighBEsWithIsUrgent(lowBEs, highBEs, tabletCtx.getStorageMedium()); + String isUrgentInfo = isUrgent ? " for urgent" : " for non-urgent"; - if (lowBe.isEmpty() && highBe.isEmpty()) { + if (lowBEs.isEmpty() && highBEs.isEmpty()) { throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "cluster is balance"); } // if all low backends is not available, return - if (lowBe.stream().noneMatch(BackendLoadStatistic::isAvailable)) { + if (lowBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) { throw new SchedException(Status.UNRECOVERABLE, "all low load backends is unavailable"); } @@ -258,21 +320,21 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { // Check if this tablet has replica on high load backend. // Also create a set to save hosts of this tablet. Set hosts = Sets.newHashSet(); - boolean hasHighReplica = false; - for (Replica replica : replicas) { - if (highBe.stream().anyMatch(b -> b.getBeId() == replica.getBackendId())) { - hasHighReplica = true; + List replicaHighBEs = Lists.newArrayList(); + for (BackendLoadStatistic beStat : highBEs) { + if (replicas.stream().anyMatch(replica -> beStat.getBeId() == replica.getBackendId())) { + replicaHighBEs.add(beStat); } - Backend be = infoService.getBackend(replica.getBackendId()); + Backend be = infoService.getBackend(beStat.getBeId()); if (be == null) { throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, - "backend is dropped: " + replica.getBackendId()); + "backend is dropped: " + beStat.getBeId()); } hosts.add(be.getHost()); } - if (!hasHighReplica) { + if (replicaHighBEs.isEmpty()) { throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, - "no replica on high load backend"); + "no replica on high load backend" + isUrgentInfo); } // select a replica as source @@ -290,12 +352,12 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { } } if (!setSource) { - throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "unable to take src backend slot"); + throw new SchedException(Status.UNRECOVERABLE, "unable to take src slot" + isUrgentInfo); } // Select a low load backend as destination. List candidates = Lists.newArrayList(); - for (BackendLoadStatistic beStat : lowBe) { + for (BackendLoadStatistic beStat : lowBEs) { if (beStat.isAvailable() && replicas.stream().noneMatch(r -> r.getBackendId() == beStat.getBeId())) { // check if on same host. Backend lowBackend = infoService.getBackend(beStat.getBeId()); @@ -308,18 +370,22 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { // no replica on this low load backend // 1. check if this clone task can make the cluster more balance. - List availPaths = Lists.newArrayList(); - BalanceStatus bs; - if ((bs = beStat.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), availPaths, - false /* not supplement */)) != BalanceStatus.OK) { - LOG.debug("tablet not fit in BE {}, reason: {}", beStat.getBeId(), bs.getErrMsgs()); + BalanceStatus bs = beStat.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), null, + false /* not supplement */); + if (bs != BalanceStatus.OK) { + LOG.debug("tablet not fit in BE {}, reason: {}, {}", + beStat.getBeId(), bs.getErrMsgs(), isUrgentInfo); continue; } - if (!Config.be_rebalancer_fuzzy_test && !clusterStat.isMoreBalanced( - tabletCtx.getSrcBackendId(), beStat.getBeId(), tabletCtx.getTabletId(), - tabletCtx.getTabletSize(), tabletCtx.getStorageMedium())) { - continue; + if (!Config.be_rebalancer_fuzzy_test && !isUrgent) { + boolean moreBalanced = replicaHighBEs.stream().anyMatch(highBeStat -> + clusterStat.isMoreBalanced(highBeStat.getBeId(), beStat.getBeId(), + tabletCtx.getTabletId(), tabletCtx.getTabletSize(), + tabletCtx.getStorageMedium())); + if (!moreBalanced) { + continue; + } } PathSlot slot = backendsWorkingSlots.get(beStat.getBeId()); @@ -333,7 +399,8 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { } if (candidates.isEmpty()) { - throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "unable to find low dest backend"); + throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, + "unable to find low dest backend" + isUrgentInfo); } List candFitPaths = Lists.newArrayList(); @@ -343,15 +410,27 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { continue; } - // classify the paths. - // And we only select path from 'low' and 'mid' paths - List pathLow = Lists.newArrayList(); - List pathMid = Lists.newArrayList(); - List pathHigh = Lists.newArrayList(); - beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium()); + List pathLow = null; + if (isUrgent) { + pathLow = beStat.getAvailPaths(tabletCtx.getStorageMedium()).stream() + .filter(RootPathLoadStatistic::isGlobalLowUsage) + .collect(Collectors.toList()); + } else { + // classify the paths. + // And we only select path from 'low' and 'mid' paths + pathLow = Lists.newArrayList(); + List pathMid = Lists.newArrayList(); + List pathHigh = Lists.newArrayList(); + beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium()); + + pathLow.addAll(pathMid); + } + pathLow.forEach(path -> candFitPaths.add(new BePathLoadStatPair(beStat, path))); + } - pathLow.addAll(pathMid); - pathLow.stream().forEach(path -> candFitPaths.add(new BePathLoadStatPair(beStat, path))); + if (candFitPaths.isEmpty()) { + throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, + "unable to find low dest backend to fit in paths" + isUrgentInfo); } BePathLoadStatPairComparator comparator = new BePathLoadStatPairComparator(candFitPaths); @@ -359,6 +438,7 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { for (BePathLoadStatPair bePathLoadStat : candFitPaths) { BackendLoadStatistic beStat = bePathLoadStat.getBackendLoadStatistic(); RootPathLoadStatistic pathStat = bePathLoadStat.getPathLoadStatistic(); + PathSlot slot = backendsWorkingSlots.get(beStat.getBeId()); if (slot == null) { continue; @@ -370,7 +450,7 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { } throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT, - "beload waiting for dest backend slot"); + "unable to take dest slot" + isUrgentInfo); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java index 158f2cde4af30a..faf9704a902b78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java @@ -21,6 +21,7 @@ import org.apache.doris.clone.BackendLoadStatistic.Classification; import org.apache.doris.clone.BackendLoadStatistic.LoadScore; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.resource.Tag; @@ -38,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.BiConsumer; import java.util.stream.Collectors; /* @@ -144,6 +146,7 @@ public void init() { // classify all backends for (TStorageMedium medium : TStorageMedium.values()) { classifyBackendByLoad(medium); + classifyBackendByMaxDiskUsage(medium); } // sort be stats by mix load score @@ -245,6 +248,84 @@ private void classifyBackendByLoad(TStorageMedium medium) { medium, avgLoadScore, lowCounter, midCounter, highCounter); } + private void classifyBackendByMaxDiskUsage(TStorageMedium medium) { + calcDiskGlobalUsages(medium); + Classification[] clazzs = { Classification.HIGH, Classification.LOW, Classification.MID }; + for (BackendLoadStatistic beStat : beLoadStatistics) { + if (!beStat.hasMedium(medium)) { + continue; + } + for (Classification clazz : clazzs) { + if (beStat.hasAvailPathWithGlobalClazz(medium, clazz)) { + beStat.setMaxDiskClazz(medium, clazz); + break; + } + } + } + } + + private void calcDiskGlobalUsages(TStorageMedium medium) { + double urgentDiffUsageThreshold; + if (Config.be_rebalancer_fuzzy_test) { + urgentDiffUsageThreshold = 0; + } else { + urgentDiffUsageThreshold = Config.balance_load_score_threshold + + Config.urgent_balance_disk_usage_extra_threshold; + if (urgentDiffUsageThreshold <= 0) { + return; + } + } + + double totalDiskUsages = 0; + int totalDiskNum = 0; + for (BackendLoadStatistic beStat : getBackendLoadStatistics()) { + if (!beStat.isAvailable()) { + continue; + } + for (RootPathLoadStatistic pathStat : beStat.getAvailPaths(medium)) { + if (pathStat.getCapacityB() > 1L) { + totalDiskUsages += pathStat.getUsedPercent(); + totalDiskNum++; + } + } + } + + if (totalDiskNum == 0) { + return; + } + + double avgDiskUsage = totalDiskUsages / totalDiskNum; + double urgentDiskUsage = avgDiskUsage + urgentDiffUsageThreshold; + + boolean hasHighDisk = false; + for (BackendLoadStatistic beStat : getBackendLoadStatistics()) { + if (!beStat.isAvailable()) { + continue; + } + for (RootPathLoadStatistic pathStat : beStat.getAvailPaths(medium)) { + if (pathStat.getCapacityB() > 1L) { + double usage = pathStat.getUsedPercent(); + if (usage > urgentDiskUsage) { + pathStat.setGlobalClazz(Classification.HIGH); + hasHighDisk = true; + } else if (usage > avgDiskUsage) { + pathStat.setGlobalClazz(Classification.MID); + } else { + pathStat.setGlobalClazz(Classification.LOW); + } + } + } + } + + if (!hasHighDisk) { + for (BackendLoadStatistic beStat : getBackendLoadStatistics()) { + for (RootPathLoadStatistic pathStat : beStat.getAvailPaths(medium)) { + pathStat.setGlobalClazz(Classification.MID); + } + } + } + } + private static void sortBeStats(List beStats, TStorageMedium medium) { if (medium == null) { Collections.sort(beStats, BackendLoadStatistic.MIX_COMPARATOR); @@ -353,7 +434,8 @@ public List> getBackendStatistic(long beId) { pathStat.add(String.valueOf(pathStatistic.getCapacityB())); pathStat.add(String.valueOf(DebugUtil.DECIMAL_FORMAT_SCALE_3.format( pathStatistic.getUsedCapacityB() * 100 / (double) pathStatistic.getCapacityB()))); - pathStat.add(pathStatistic.getClazz().name()); + pathStat.add(pathStatistic.getLocalClazz().name()); + pathStat.add(pathStatistic.getGlobalClazz().name()); pathStat.add(pathStatistic.getDiskState().name()); statistics.add(pathStat); } @@ -375,6 +457,88 @@ public List getBackendLoadStatistics() { return beLoadStatistics; } + public boolean getLowHighBEsWithIsUrgent(List lowBEs, List highBEs, + TStorageMedium medium) { + if (getUrgentLowHighBEs(lowBEs, highBEs, medium)) { + return true; + } else { + lowBEs.clear(); + highBEs.clear(); + List midBEs = Lists.newArrayList(); + getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium); + return false; + } + } + + private boolean getUrgentLowHighBEs(List lowBEs, List highBEs, + TStorageMedium medium) { + List midBEs = Lists.newArrayList(); + for (BackendLoadStatistic beStat : getBackendLoadStatistics()) { + if (!beStat.isAvailable()) { + continue; + } + switch (beStat.getMaxDiskClazz(medium)) { + case LOW: + lowBEs.add(beStat); + break; + case MID: + midBEs.add(beStat); + break; + case HIGH: + highBEs.add(beStat); + break; + default: + break; + } + } + + if (lowBEs.isEmpty()) { + lowBEs.addAll(midBEs); + } + + if (lowBEs.isEmpty() && highBEs.size() > 1 && Config.enable_urgent_balance_no_low_backend) { + // all backend will exchange tablets among themselves. + lowBEs.addAll(highBEs); + } + + if (lowBEs.isEmpty() || highBEs.isEmpty()) { + lowBEs.clear(); + highBEs.clear(); + return false; + } + + BiConsumer, Boolean> resortBeStats = (beStats, choseMinPathElseMaxPath) -> { + List> bePairs = Lists.newArrayList(); + for (BackendLoadStatistic beStat : beStats) { + double score = -1.0; + for (RootPathLoadStatistic pathStat : beStat.getAvailPaths(medium)) { + if (pathStat.getCapacityB() > 1) { + double usage = pathStat.getUsedPercent(); + if (score < 0 || (choseMinPathElseMaxPath && usage < score) + || (!choseMinPathElseMaxPath && usage > score)) { + score = usage; + } + } + } + bePairs.add(Pair.of(beStat, score)); + } + Collections.sort(bePairs, new Pair.PairComparator>()); + + beStats.clear(); + bePairs.forEach(pair -> beStats.add(pair.key())); + }; + + resortBeStats.accept(lowBEs, true); + resortBeStats.accept(highBEs, false); + + LOG.debug("urgent backends' classification lowBe {}, highBe {}, medium: {}", + lowBEs.stream().map(BackendLoadStatistic::getBeId).collect(Collectors.toList()), + highBEs.stream().map(BackendLoadStatistic::getBeId).collect(Collectors.toList()), + medium); + + return true; + } + /* * If cluster is balance, all Backends will be in 'mid', and 'high' and 'low' is empty * If both 'high' and 'low' has Backends, just return diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java index d2f1983a831748..3e94bebcfefa72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java @@ -34,7 +34,11 @@ public class RootPathLoadStatistic implements Comparable private long copingSizeB; private DiskState diskState; - private Classification clazz = Classification.INIT; + // localClazz is compare with other disks on the same backend + private Classification localClazz = Classification.INIT; + + // globalClazz is compare with other disks on all backends + private Classification globalClazz = Classification.INIT; public RootPathLoadStatistic(long beId, String path, Long pathHash, TStorageMedium storageMedium, long capacityB, long usedCapacityB, DiskState diskState) { @@ -80,12 +84,28 @@ public void incrCopingSizeB(long size) { copingSizeB += size; } - public void setClazz(Classification clazz) { - this.clazz = clazz; + public void setLocalClazz(Classification clazz) { + this.localClazz = clazz; + } + + public Classification getLocalClazz() { + return localClazz; + } + + public void setGlobalClazz(Classification clazz) { + this.globalClazz = clazz; + } + + public Classification getGlobalClazz() { + return globalClazz; + } + + public boolean isGlobalHighUsage() { + return globalClazz == Classification.HIGH; } - public Classification getClazz() { - return clazz; + public boolean isGlobalLowUsage() { + return globalClazz == Classification.LOW; } public DiskState getDiskState() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index e561b920943f18..4a4853169f7145 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -842,6 +842,7 @@ private void handleRedundantReplica(TabletSchedCtx tabletCtx, boolean force) thr || deleteReplicaOnSameHost(tabletCtx, force) || deleteReplicaNotInValidTag(tabletCtx, force) || deleteReplicaChosenByRebalancer(tabletCtx, force) + || deleteReplicaOnUrgentHighDisk(tabletCtx, force) || deleteReplicaOnHighLoadBackend(tabletCtx, force)) { // if we delete at least one redundant replica, we still throw a SchedException with status FINISHED // to remove this tablet from the pendingTablets(consider it as finished) @@ -990,6 +991,34 @@ private boolean deleteReplicaChosenByRebalancer(TabletSchedCtx tabletCtx, boolea return true; } + private boolean deleteReplicaOnUrgentHighDisk(TabletSchedCtx tabletCtx, boolean force) throws SchedException { + Tag tag = chooseProperTag(tabletCtx, false); + LoadStatisticForTag statistic = statisticMap.get(tag); + if (statistic == null) { + return false; + } + + Replica chosenReplica = null; + double maxUsages = -1; + for (Replica replica : tabletCtx.getReplicas()) { + BackendLoadStatistic beStatistic = statistic.getBackendLoadStatistic(replica.getBackendId()); + if (beStatistic == null) { + continue; + } + RootPathLoadStatistic path = beStatistic.getPathStatisticByPathHash(replica.getPathHash()); + if (path != null && path.isGlobalHighUsage() && path.getUsedPercent() > maxUsages) { + maxUsages = path.getUsedPercent(); + chosenReplica = replica; + } + } + + if (chosenReplica != null) { + deleteReplicaInternal(tabletCtx, chosenReplica, "high usage disk", force); + return true; + } + return false; + } + private boolean deleteReplicaOnHighLoadBackend(TabletSchedCtx tabletCtx, boolean force) throws SchedException { Tag tag = chooseProperTag(tabletCtx, false); LoadStatisticForTag statistic = statisticMap.get(tag); @@ -1037,7 +1066,7 @@ private boolean deleteFromHighLoadBackend(TabletSchedCtx tabletCtx, List alternativeTablets = rebalancer.selectAlternativeTablets(); Collections.shuffle(alternativeTablets); for (TabletSchedCtx tabletCtx : alternativeTablets) { - if (needAddBalanceNum > 0 && addTablet(tabletCtx, false) == AddResult.ADDED) { - needAddBalanceNum--; + if (addNum >= limit) { + break; + } + if (addTablet(tabletCtx, false) == AddResult.ADDED) { + addNum++; } else { rebalancer.onTabletFailed(tabletCtx); } } - if (needAddBalanceNum <= 0) { - return; - } + return addNum > 0; + } + + private void selectTabletsForDiskBalance(boolean hasBeBalance) { if (Config.disable_disk_balance) { LOG.info("disk balance is disabled. skip selecting tablets for disk balance"); return; } - List diskBalanceTablets = Lists.newArrayList(); - // if default rebalancer can not get new task or user given prio BEs, then use disk rebalancer to get task - if (diskRebalancer.hasPrioBackends() || alternativeTablets.isEmpty()) { - diskBalanceTablets = diskRebalancer.selectAlternativeTablets(); + + int limit = getBalanceSchedQuotoLeft(); + if (limit <= 0) { + return; } - for (TabletSchedCtx tabletCtx : diskBalanceTablets) { + + int addNum = 0; + for (TabletSchedCtx tabletCtx : diskRebalancer.selectAlternativeTablets()) { + if (addNum >= limit) { + break; + } // add if task from prio backend or cluster is balanced - if (alternativeTablets.isEmpty() || tabletCtx.getPriority() == TabletSchedCtx.Priority.NORMAL) { + if (!hasBeBalance || Config.be_rebalancer_idle_seconds <= 0 + || tabletCtx.getPriority() == TabletSchedCtx.Priority.NORMAL) { if (addTablet(tabletCtx, false) == AddResult.ADDED) { - needAddBalanceNum--; - if (needAddBalanceNum <= 0) { - break; - } + addNum++; } } } } + private int getBalanceSchedQuotoLeft() { + // No need to prefetch too many balance task to pending queue. + // Because for every sched, it will re select the balance task. + return Math.min(Config.schedule_batch_size - getPendingNum(), + Config.max_balancing_tablets - getBalanceTabletsNumber()); + } + /** * Try to create a balance task for a tablet. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendLoadStatisticProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendLoadStatisticProcNode.java index 12bf2aba169ea7..1bec8a5d7a0155 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendLoadStatisticProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendLoadStatisticProcNode.java @@ -27,7 +27,7 @@ public class BackendLoadStatisticProcNode implements ProcNodeInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("RootPath").add("PathHash").add("StorageMedium") .add("DataUsedCapacity").add("TotalCapacity").add("TotalUsedPct") - .add("Class").add("State") + .add("ClassInOneBE").add("ClassInAllBE").add("State") .build(); private final LoadStatisticForTag statistic; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java index 29eafabf158cd4..1c623effc101e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java @@ -29,7 +29,7 @@ // show proc "/cluster_balance/cluster_load_stat/location_default/HDD"; public class ClusterLoadStatisticProcDir implements ProcDirInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("BeId").add("Available").add("UsedCapacity").add("Capacity") + .add("BeId").add("Available").add("UsedCapacity").add("Capacity").add("MaxDisk") .add("UsedPercent").add("ReplicaNum").add("CapCoeff").add("ReplCoeff").add("Score") .add("Class") .build(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java index 81fdaafc969e7b..05abfacdce0c2d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.clone.BackendLoadStatistic.Classification; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -51,24 +52,28 @@ public class ClusterLoadStatisticsTest { @Before public void setUp() { // be1 + // 50%, 95%, 2% be1 = new Backend(10001, "192.168.0.1", 9051); Map disks = Maps.newHashMap(); DiskInfo diskInfo1 = new DiskInfo("/path1"); - diskInfo1.setTotalCapacityB(1000000); - diskInfo1.setAvailableCapacityB(500000); - diskInfo1.setDataUsedCapacityB(480000); + diskInfo1.setTotalCapacityB(1_000_000); + diskInfo1.setAvailableCapacityB(500_000); + diskInfo1.setDataUsedCapacityB(480_000); + diskInfo1.setPathHash(1001); disks.put(diskInfo1.getRootPath(), diskInfo1); DiskInfo diskInfo2 = new DiskInfo("/path2"); - diskInfo2.setTotalCapacityB(2000000); - diskInfo2.setAvailableCapacityB(100000); - diskInfo2.setDataUsedCapacityB(80000); + diskInfo2.setTotalCapacityB(2_000_000); + diskInfo2.setAvailableCapacityB(100_000); + diskInfo2.setDataUsedCapacityB(80_000); + diskInfo2.setPathHash(1002); disks.put(diskInfo2.getRootPath(), diskInfo2); DiskInfo diskInfo3 = new DiskInfo("/path3"); - diskInfo3.setTotalCapacityB(500000); - diskInfo3.setAvailableCapacityB(490000); - diskInfo3.setDataUsedCapacityB(10000); + diskInfo3.setTotalCapacityB(500_000); + diskInfo3.setAvailableCapacityB(490_000); + diskInfo3.setDataUsedCapacityB(10_000); + diskInfo3.setPathHash(1003); disks.put(diskInfo3.getRootPath(), diskInfo3); be1.setDisks(ImmutableMap.copyOf(disks)); @@ -78,15 +83,17 @@ public void setUp() { be2 = new Backend(10002, "192.168.0.2", 9052); disks = Maps.newHashMap(); diskInfo1 = new DiskInfo("/path1"); - diskInfo1.setTotalCapacityB(2000000); - diskInfo1.setAvailableCapacityB(1900000); - diskInfo1.setDataUsedCapacityB(480000); + diskInfo1.setTotalCapacityB(2_000_000); + diskInfo1.setAvailableCapacityB(1_900_000); + diskInfo1.setDataUsedCapacityB(480_000); + diskInfo1.setPathHash(2001); disks.put(diskInfo1.getRootPath(), diskInfo1); diskInfo2 = new DiskInfo("/path2"); - diskInfo2.setTotalCapacityB(20000000); - diskInfo2.setAvailableCapacityB(1000000); - diskInfo2.setDataUsedCapacityB(80000); + diskInfo2.setTotalCapacityB(20_000_000); + diskInfo2.setAvailableCapacityB(1_000_000); + diskInfo2.setDataUsedCapacityB(80_000); + diskInfo2.setPathHash(2002); disks.put(diskInfo2.getRootPath(), diskInfo2); be2.setDisks(ImmutableMap.copyOf(disks)); @@ -96,21 +103,24 @@ public void setUp() { be3 = new Backend(10003, "192.168.0.3", 9053); disks = Maps.newHashMap(); diskInfo1 = new DiskInfo("/path1"); - diskInfo1.setTotalCapacityB(4000000); - diskInfo1.setAvailableCapacityB(100000); - diskInfo1.setDataUsedCapacityB(80000); + diskInfo1.setTotalCapacityB(4_000_000); + diskInfo1.setAvailableCapacityB(100_000); + diskInfo1.setDataUsedCapacityB(80_000); + diskInfo1.setPathHash(3001); disks.put(diskInfo1.getRootPath(), diskInfo1); diskInfo2 = new DiskInfo("/path2"); - diskInfo2.setTotalCapacityB(2000000); - diskInfo2.setAvailableCapacityB(100000); - diskInfo2.setDataUsedCapacityB(80000); + diskInfo2.setTotalCapacityB(2_000_000); + diskInfo2.setAvailableCapacityB(100_000); + diskInfo2.setDataUsedCapacityB(80_000); + diskInfo2.setPathHash(3002); disks.put(diskInfo2.getRootPath(), diskInfo2); diskInfo3 = new DiskInfo("/path3"); - diskInfo3.setTotalCapacityB(500000); - diskInfo3.setAvailableCapacityB(490000); - diskInfo3.setDataUsedCapacityB(10000); + diskInfo3.setTotalCapacityB(500_000); + diskInfo3.setAvailableCapacityB(490_000); + diskInfo3.setDataUsedCapacityB(10_000); + diskInfo3.setPathHash(3003); disks.put(diskInfo3.getRootPath(), diskInfo3); be3.setDisks(ImmutableMap.copyOf(disks)); @@ -120,9 +130,9 @@ public void setUp() { be4 = new Backend(10004, "192.168.0.4", 9053); disks = Maps.newHashMap(); diskInfo1 = new DiskInfo("/path1"); - diskInfo1.setTotalCapacityB(4000000); - diskInfo1.setAvailableCapacityB(100000); - diskInfo1.setDataUsedCapacityB(80000); + diskInfo1.setTotalCapacityB(4_000_000); + diskInfo1.setAvailableCapacityB(100_000); + diskInfo1.setDataUsedCapacityB(80_000); disks.put(diskInfo1.getRootPath(), diskInfo1); be4.setDisks(ImmutableMap.copyOf(disks)); @@ -161,6 +171,14 @@ public void test() { loadStatistic.init(); List> infos = loadStatistic.getStatistic(TStorageMedium.HDD); Assert.assertEquals(3, infos.size()); + BackendLoadStatistic beStat1 = loadStatistic.getBackendLoadStatistic(be1.getId()); + Assert.assertNotNull(beStat1); + RootPathLoadStatistic path2 = beStat1.getPathStatisticByPathHash(1002); + RootPathLoadStatistic path3 = beStat1.getPathStatisticByPathHash(1003); + Assert.assertEquals(Classification.HIGH, path2.getLocalClazz()); + Assert.assertEquals(Classification.HIGH, path2.getGlobalClazz()); + Assert.assertEquals(Classification.LOW, path3.getLocalClazz()); + Assert.assertEquals(Classification.LOW, path3.getGlobalClazz()); } }