Skip to content

Commit

Permalink
[fix](cloud) Fix cloud auto start and add a regression case (#40027)
Browse files Browse the repository at this point in the history
1. Fix the cluster being suspended and select not waking up the cluster.
The reason is that all be nodes in the cluster are inactive, the cluster
is skipped, and the cluster that needs to be woken up cannot be found,
and the wake-up logic will not be reached. And delete the redundant
function getAuthorizedCloudCluster
2. Add check after resume cluster, there must be at least one alive be
in the cluster.
3. add auto start regression case
  • Loading branch information
deardeng authored Sep 10, 2024
1 parent fccecea commit eb4673f
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ public long getBackendId() {
}

private long getBackendIdImpl(String cluster) {
// if cluster is SUSPENDED, wait
try {
cluster = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).waitForAutoStart(cluster);
} catch (DdlException e) {
// this function cant throw exception. so just log it
LOG.warn("cant resume cluster {}, exception", cluster, e);
}
// check default cluster valid.
if (Strings.isNullOrEmpty(cluster)) {
LOG.warn("failed to get available be, clusterName: {}", cluster);
Expand All @@ -163,13 +170,6 @@ private long getBackendIdImpl(String cluster) {
return -1;
}

// if cluster is SUSPENDED, wait
try {
((CloudSystemInfoService) Env.getCurrentSystemInfo()).waitForAutoStart(cluster);
} catch (DdlException e) {
// this function cant throw exception. so just log it
LOG.warn("cant resume cluster {}, exception", cluster, e);
}
String clusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterIdByName(cluster);

if (isColocated()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,20 +737,20 @@ public String getClusterNameAutoStart(final String clusterName) {
return cloudClusterTypeAndName.clusterName;
}

public void waitForAutoStart(String clusterName) throws DdlException {
public String waitForAutoStart(String clusterName) throws DdlException {
if (Config.isNotCloudMode()) {
return;
return null;
}
clusterName = getClusterNameAutoStart(clusterName);
if (Strings.isNullOrEmpty(clusterName)) {
LOG.warn("auto start in cloud mode, but clusterName empty {}", clusterName);
return;
return null;
}
String clusterStatus = getCloudStatusByName(clusterName);
if (Strings.isNullOrEmpty(clusterStatus)) {
// for cluster rename or cluster dropped
LOG.warn("cant find clusterStatus in fe, clusterName {}", clusterName);
return;
return null;
}

if (Cloud.ClusterStatus.valueOf(clusterStatus) == Cloud.ClusterStatus.MANUAL_SHUTDOWN) {
Expand All @@ -765,7 +765,7 @@ public void waitForAutoStart(String clusterName) throws DdlException {
// root ? see StatisticsUtil.buildConnectContext
if (ConnectContext.get() != null && ConnectContext.get().getUserIdentity().isRootUser()) {
LOG.warn("auto start daemon thread run in root, not resume cluster {}-{}", clusterName, clusterStatus);
return;
return null;
}
Cloud.AlterClusterRequest.Builder builder = Cloud.AlterClusterRequest.newBuilder();
builder.setCloudUniqueId(Config.cloud_unique_id);
Expand Down Expand Up @@ -794,7 +794,8 @@ public void waitForAutoStart(String clusterName) throws DdlException {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
boolean hasAutoStart = false;
while (!String.valueOf(Cloud.ClusterStatus.NORMAL).equals(clusterStatus)
boolean existAliveBe = true;
while ((!String.valueOf(Cloud.ClusterStatus.NORMAL).equals(clusterStatus) || !existAliveBe)
&& retryTime < retryTimes) {
hasAutoStart = true;
++retryTime;
Expand All @@ -812,6 +813,8 @@ public void waitForAutoStart(String clusterName) throws DdlException {
LOG.info("change cluster sleep wait InterruptedException: ", e);
}
clusterStatus = getCloudStatusByName(clusterName);
// Check that the bes node in the cluster have at least one alive
existAliveBe = getBackendsByClusterName(clusterName).stream().anyMatch(Backend::isAlive);
}
if (retryTime >= retryTimes) {
// auto start timeout
Expand All @@ -824,5 +827,6 @@ public void waitForAutoStart(String clusterName) throws DdlException {
if (hasAutoStart) {
LOG.info("auto start cluster {}, start cost {} ms", clusterName, stopWatch.getTime());
}
return clusterName;
}
}
37 changes: 4 additions & 33 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@ public String getCloudCluster(boolean updateErr) {
String choseWay = null;
if (!Strings.isNullOrEmpty(this.cloudCluster)) {
cluster = this.cloudCluster;
choseWay = "use @cluster";
choseWay = "use context cluster";
LOG.debug("finally set context cluster name {} for user {} with chose way '{}'",
cloudCluster, getCurrentUserIdentity(), choseWay);
return cluster;
Expand All @@ -1260,9 +1260,9 @@ public String getCloudCluster(boolean updateErr) {
cluster = defaultCluster;
choseWay = "default cluster";
} else {
String authorizedCluster = getAuthorizedCloudCluster();
if (!Strings.isNullOrEmpty(authorizedCluster)) {
cluster = authorizedCluster;
CloudClusterResult cloudClusterTypeAndName = getCloudClusterByPolicy();
if (cloudClusterTypeAndName != null && !Strings.isNullOrEmpty(cloudClusterTypeAndName.clusterName)) {
cluster = cloudClusterTypeAndName.clusterName;
choseWay = "authorized cluster";
}
}
Expand Down Expand Up @@ -1293,35 +1293,6 @@ public String getDefaultCloudCluster() {
return null;
}

public String getAuthorizedCloudCluster() {
List<String> cloudClusterNames = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterNames();
// get all available cluster of the user
for (String cloudClusterName : cloudClusterNames) {
if (!Env.getCurrentEnv().getAuth().checkCloudPriv(getCurrentUserIdentity(),
cloudClusterName, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) {
continue;
}
// find a cluster has more than one alive be
List<Backend> bes = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getBackendsByClusterName(cloudClusterName);
AtomicBoolean hasAliveBe = new AtomicBoolean(false);
bes.stream().filter(Backend::isAlive).findAny().ifPresent(backend -> {
if (LOG.isDebugEnabled()) {
LOG.debug("get a clusterName {}, it's has more than one alive be {}", cloudClusterName, backend);
}
hasAliveBe.set(true);
});
if (hasAliveBe.get()) {
if (LOG.isDebugEnabled()) {
LOG.debug("set context cluster name {}", cloudClusterName);
}
return cloudClusterName;
}
}

return null;
}

public StatsErrorEstimator getStatsErrorEstimator() {
return statsErrorEstimator;
}
Expand Down
172 changes: 172 additions & 0 deletions regression-test/suites/cloud_p0/multi_cluster/test_auto_start.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.doris.regression.suite.ClusterOptions
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
import org.awaitility.Awaitility;
import org.apache.doris.regression.util.Http
import static java.util.concurrent.TimeUnit.SECONDS;

suite('test_auto_start_in_cloud', 'multi_cluster') {
if (!isCloudMode()) {
return;
}
def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
'cloud_pre_heating_time_limit_sec=1',
'sys_log_verbose_modules=org',
'heartbeat_interval_second=1'
]
options.setFeNum(3)
options.setBeNum(3)
options.cloudMode = true
options.connectToFollower = true

def getClusterFragementStatus = { def fe ->
def (feHost, feHttpPort) = fe.getHttpAddress()
// curl -X GET -u root: '128.1.1.1:8030/rest/v2/manager/cluster/cluster_info/cloud_cluster_status'
def url = 'http://' + feHost + ':' + feHttpPort + '/rest/v2/manager/cluster/cluster_info/cloud_cluster_status'
def result = Http.GET(url, true)
result
}


def set_cluster_status = { String unique_id , String cluster_id, String status, def ms ->
def jsonOutput = new JsonOutput()
def reqBody = [
cloud_unique_id: unique_id,
cluster : [
cluster_id : cluster_id,
cluster_status : status
]
]
def js = jsonOutput.toJson(reqBody)
log.info("drop cluster req: ${js} ".toString())

def set_cluster_status_api = { request_body, check_func ->
httpTest {
endpoint ms.host+':'+ms.httpPort
uri "/MetaService/http/set_cluster_status?token=greedisgood9999"
body request_body
check check_func
}
}

set_cluster_status_api.call(js) {
respCode, body ->
log.info("set cluster status resp: ${body} ${respCode}".toString())
def json = parseJson(body)
assertTrue(json.code.equalsIgnoreCase("OK"))
}
}

docker(options) {
sql """
CREATE TABLE table1 (
class INT,
id INT,
score INT SUM
)
AGGREGATE KEY(class, id)
DISTRIBUTED BY HASH(class) BUCKETS 48
"""

sql """INSERT INTO table1 VALUES (1, 1, 100)"""
// master
def fe1 = cluster.getFeByIndex(1)
// ms
def ms = cluster.getAllMetaservices().get(0)

def result = sql_return_maparray """SHOW CLUSTERS"""
String clusterName = result[0].cluster
def tag = getCloudBeTagByName(clusterName)
logger.info("tag = {}", tag)

def jsonSlurper = new JsonSlurper()
def jsonObject = jsonSlurper.parseText(tag)
String cloudClusterId = jsonObject.cloud_cluster_id
String uniqueId = jsonObject.cloud_unique_id

sleep(5 * 1000)

Map<String, Long> fragmentUpdateTimeMap = [:]

// no read,write,sc, 20s suspend cluster
boolean clusterCanSuspend = true
for (int i = 0; i < 20; i++) {
result = getClusterFragementStatus(fe1)
result.data.compute_cluster_id.each {
if (fragmentUpdateTimeMap[it.host] == null) {
fragmentUpdateTimeMap[it.host] = it.lastFragmentUpdateTime
} else if (fragmentUpdateTimeMap[it.host] != it.lastFragmentUpdateTime) {
log.info("fragment update time changed be: {} old time: {} new time: {}", it.host, fragmentUpdateTimeMap[it.host], it.lastFragmentUpdateTime)
clusterCanSuspend = false
}
}
sleep(1 * 1000)
}
assertTrue(clusterCanSuspend)

// cloud control set cluster status SUSPENDED
set_cluster_status(uniqueId, cloudClusterId, "SUSPENDED", ms)

dockerAwaitUntil(5) {
tag = getCloudBeTagByName(clusterName)
logger.info("tag = {}", tag)
jsonObject = jsonSlurper.parseText(tag)
String cluster_status = jsonObject.cloud_cluster_status
cluster_status == "SUSPENDED"
}

cluster.stopBackends(1,2,3)

// select
future1 = thread {
def begin = System.currentTimeMillis();
// root cant resume, due to deamon thread use root
def connInfo = context.threadLocalConn.get()
result = connect(user = 'admin', password = '', url = connInfo.conn.getMetaData().getURL()) {
sql 'SELECT * FROM table1'
}
def cost = System.currentTimeMillis() - begin;
log.info("result {} time cost: {}", result, cost)
assertTrue(cost > 5000)
assertEquals(1, result.size())
}
// insert

// cloud control
future2 = thread {
// check cluster "TO_RESUME"
dockerAwaitUntil(5) {
tag = getCloudBeTagByName(clusterName)
logger.info("tag = {}", tag)
jsonObject = jsonSlurper.parseText(tag)
String cluster_status = jsonObject.cloud_cluster_status
cluster_status == "TO_RESUME"
}
sleep(5 * 1000)
cluster.startBackends(1,2,3)
set_cluster_status(uniqueId, cloudClusterId, "NORMAL", ms)
}

future1.get()
future2.get()
}
}

0 comments on commit eb4673f

Please sign in to comment.