Skip to content

Commit

Permalink
[improvement](decommission be) decommission check replica num (#32748)
Browse files Browse the repository at this point in the history
  • Loading branch information
yujun777 authored Mar 28, 2024
1 parent 742a3f6 commit 496befd
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 11 deletions.
84 changes: 82 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,34 @@
import org.apache.doris.analysis.ModifyFrontendHostNameClause;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MysqlCompatibleDatabase;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/*
* SystemHandler is for
Expand Down Expand Up @@ -235,7 +245,8 @@ public static List<Backend> checkDecommission(List<HostInfo> hostInfos)
decommissionBackends.add(backend);
}

// TODO(cmy): check if replication num can be met
checkDecommissionWithReplicaAllocation(decommissionBackends);

// TODO(cmy): check remaining space

return decommissionBackends;
Expand All @@ -258,12 +269,81 @@ public static List<Backend> checkDecommissionByIds(List<String> ids)
decommissionBackends.add(backend);
}

// TODO(cmy): check if replication num can be met
checkDecommissionWithReplicaAllocation(decommissionBackends);

// TODO(cmy): check remaining space

return decommissionBackends;
}

private static void checkDecommissionWithReplicaAllocation(List<Backend> decommissionBackends)
throws DdlException {
if (Config.isCloudMode() || decommissionBackends.isEmpty()
|| DebugPointUtil.isEnable("SystemHandler.decommission_no_check_replica_num")) {
return;
}

Set<Tag> decommissionTags = decommissionBackends.stream().map(be -> be.getLocationTag())
.collect(Collectors.toSet());
Map<Tag, Integer> tagAvailBackendNums = Maps.newHashMap();
for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
long beId = backend.getId();
if (!backend.isScheduleAvailable()
|| decommissionBackends.stream().anyMatch(be -> be.getId() == beId)) {
continue;
}

Tag tag = backend.getLocationTag();
if (tag != null) {
tagAvailBackendNums.put(tag, tagAvailBackendNums.getOrDefault(tag, 0) + 1);
}
}

Env env = Env.getCurrentEnv();
List<Long> dbIds = env.getInternalCatalog().getDbIds();
for (Long dbId : dbIds) {
Database db = env.getInternalCatalog().getDbNullable(dbId);
if (db == null) {
continue;
}

if (db instanceof MysqlCompatibleDatabase) {
continue;
}

for (Table table : db.getTables()) {
table.readLock();
try {
if (!table.needSchedule()) {
continue;
}

OlapTable tbl = (OlapTable) table;
for (Partition partition : tbl.getAllPartitions()) {
ReplicaAllocation replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
for (Map.Entry<Tag, Short> entry : replicaAlloc.getAllocMap().entrySet()) {
Tag tag = entry.getKey();
if (!decommissionTags.contains(tag)) {
continue;
}
int replicaNum = (int) entry.getValue();
int backendNum = tagAvailBackendNums.getOrDefault(tag, 0);
if (replicaNum > backendNum) {
throw new DdlException("After decommission, partition " + partition.getName()
+ " of table " + db.getName() + "." + tbl.getName()
+ " 's replication allocation { " + replicaAlloc
+ " } > available backend num " + backendNum + " on tag " + tag
+ ", otherwise need to decrease the partition's replication num.");
}
}
}
} finally {
table.readUnlock();
}
}
}
}

@Override
public synchronized void cancel(CancelStmt stmt) throws DdlException {
CancelAlterSystemStmt cancelAlterSystemStmt = (CancelAlterSystemStmt) stmt;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite('test_decommission_with_replica_num_fail') {
if (isCloudMode()) {
return
}

def tbl = 'test_decommission_with_replica_num_fail'
def backends = sql_return_maparray('show backends')
def replicaNum = 0
def targetBackend = null
for (def be : backends) {
def alive = be.Alive.toBoolean()
def decommissioned = be.SystemDecommissioned.toBoolean()
if (alive && !decommissioned) {
replicaNum++
targetBackend = be
}
}
assertTrue(replicaNum > 0)

sql "DROP TABLE IF EXISTS ${tbl} FORCE"
sql """
CREATE TABLE ${tbl}
(
k1 int,
k2 int
)
DISTRIBUTED BY HASH(k1) BUCKETS 6
PROPERTIES
(
"replication_num" = "${replicaNum}"
);
"""
try {
test {
sql "ALTER SYSTEM DECOMMISSION BACKEND '${targetBackend.Host}:${targetBackend.HeartbeatPort}'"
exception "otherwise need to decrease the partition's replication num"
}
} finally {
sql "CANCEL DECOMMISSION BACKEND '${targetBackend.Host}:${targetBackend.HeartbeatPort}'"
}
sql "DROP TABLE IF EXISTS ${tbl} FORCE"
}
26 changes: 17 additions & 9 deletions regression-test/suites/node_p0/test_backend.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ suite("test_backend", "nonConcurrent") {
}

if (context.config.jdbcUser.equals("root")) {
def beId1 = null
try {
GetDebugPoint().enableDebugPointForAllFEs("SystemHandler.decommission_no_check_replica_num");
try_sql """admin set frontend config("drop_backend_after_decommission" = "false")"""
def result = sql_return_maparray """SHOW BACKENDS;"""
logger.info("show backends result:${result}")
def beId1 = null
for (def res : result) {
beId1 = res.BackendId
break
Expand All @@ -58,16 +59,23 @@ suite("test_backend", "nonConcurrent") {
assertTrue(res.SystemDecommissioned.toBoolean())
}
}
result = sql """CANCEL DECOMMISSION BACKEND "${beId1}" """
logger.info("CANCEL DECOMMISSION BACKEND ${result}")
result = sql_return_maparray """SHOW BACKENDS;"""
for (def res : result) {
if (res.BackendId == "${beId1}") {
assertFalse(res.SystemDecommissioned.toBoolean())
} finally {
try {
if (beId1 != null) {
def result = sql """CANCEL DECOMMISSION BACKEND "${beId1}" """
logger.info("CANCEL DECOMMISSION BACKEND ${result}")

result = sql_return_maparray """SHOW BACKENDS;"""
for (def res : result) {
if (res.BackendId == "${beId1}") {
assertFalse(res.SystemDecommissioned.toBoolean())
}
}
}
} finally {
GetDebugPoint().disableDebugPointForAllFEs('SystemHandler.decommission_no_check_replica_num');
try_sql """admin set frontend config("drop_backend_after_decommission" = "true")"""
}
} finally {
try_sql """admin set frontend config("drop_backend_after_decommission" = "true")"""
}
}
}

0 comments on commit 496befd

Please sign in to comment.