Skip to content

Commit

Permalink
[improvement](cloud) manage node via sql like non cloud mode
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Sep 2, 2024
1 parent c85b2a6 commit f43e7e7
Show file tree
Hide file tree
Showing 25 changed files with 353 additions and 61 deletions.
13 changes: 13 additions & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <ostream>
#include <string>

#include "cloud/config.h"
#include "common/config.h"
#include "common/status.h"
#include "olap/storage_engine.h"
Expand Down Expand Up @@ -244,6 +245,18 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
_engine.notify_listeners();
}

if (master_info.__isset.meta_service_endpoint && config::meta_service_endpoint.empty()) {
auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint,
true);
LOG(INFO) << "set config meta_service_endpoing " << st;
}

if (master_info.__isset.cloud_instance_id && config::cloud_instance_id.empty()) {
auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id, true);
LOG(INFO) << "set config cloud_instance_id " << st;
config::set_cloud_unique_id();
}

return Status::OK();
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

namespace doris::config {

DEFINE_String(cloud_unique_id, "");
DEFINE_String(meta_service_endpoint, "");
DEFINE_mString(cloud_instance_id, "");
DEFINE_mString(cloud_unique_id, "");
DEFINE_mString(meta_service_endpoint, "");
DEFINE_Bool(meta_service_use_load_balancer, "false");
DEFINE_mInt32(meta_service_rpc_timeout_ms, "10000");
DEFINE_Bool(meta_service_connection_pooled, "true");
Expand Down
16 changes: 13 additions & 3 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,20 @@

namespace doris::config {

DECLARE_String(cloud_unique_id);
DECLARE_mString(cloud_instance_id);

DECLARE_mString(cloud_unique_id);

static inline bool is_cloud_mode() {
return !cloud_unique_id.empty();
return !cloud_unique_id.empty() || !cloud_instance_id.empty();
}

static inline void set_cloud_unique_id() {
if (cloud_unique_id.empty()) {
if (!cloud_instance_id.empty()) {
cloud_unique_id = "1:" + cloud_instance_id + ":compute";
}
}
}

// Set the endpoint of meta service.
Expand All @@ -40,7 +50,7 @@ static inline bool is_cloud_mode() {
// If you want to access a group of meta services directly, set the addresses of meta services to this config,
// separated by a comma, like "host:port,host:port,host:port", then BE will choose a server to connect in randomly.
// In this mode, The config meta_service_connection_pooled is still useful, but the other two configs will be ignored.
DECLARE_String(meta_service_endpoint);
DECLARE_mString(meta_service_endpoint);
// Set the underlying connection type to pooled.
DECLARE_Bool(meta_service_connection_pooled);
DECLARE_mInt64(meta_service_connection_pool_size);
Expand Down
4 changes: 3 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
#include <utility>
#include <vector>

#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "config.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "util/cpu_info.h"
Expand Down Expand Up @@ -1658,6 +1658,8 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t
SET_FIELD(it.second, std::vector<std::string>, fill_conf_map, set_to_default);
}

set_cloud_unique_id();

return true;
}

Expand Down
5 changes: 2 additions & 3 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2336,9 +2336,8 @@ void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller,
std::find_if(instance.storage_vault_names().begin(), instance.storage_vault_names().end(),
[](const std::string& name) { return name == BUILT_IN_STORAGE_VAULT_NAME; }) ==
instance.storage_vault_names().end()) {
code = MetaServiceCode::STORAGE_VAULT_NOT_FOUND;
msg = "instance has no built in storage vault";
return;
LOG_EVERY_N(INFO, 100) << "There is no builtin vault in instance "
<< instance.instance_id();
}

auto get_cluster_mysql_user = [](const ClusterPB& c, std::set<std::string>* mysql_users) {
Expand Down
4 changes: 3 additions & 1 deletion cloud/src/resource-manager/resource_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std::
if (ClusterPB::COMPUTE == cluster.type() && n.has_heartbeat_port() && n.heartbeat_port()) {
continue;
}
ss << "check cluster params failed, node : " << proto_to_json(n);
ss << "check cluster params failed, edit_log_port is required for frontends while "
"heatbeat_port is required for banckens, node : "
<< proto_to_json(n);
*err = ss.str();
no_err = false;
break;
Expand Down
12 changes: 9 additions & 3 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1831,7 +1831,7 @@ public class Config extends ConfigBase {
public static boolean enable_date_conversion = true;

@ConfField(mutable = false, masterOnly = true)
public static boolean enable_multi_tags = false;
public static boolean enable_multi_tags = true;

/**
* If set to TRUE, FE will convert DecimalV2 to DecimalV3 automatically.
Expand Down Expand Up @@ -2799,15 +2799,21 @@ public class Config extends ConfigBase {
@ConfField public static int warn_sys_accumulated_file_size = 2;
@ConfField public static int audit_sys_accumulated_file_size = 4;

// compatibily with elder version.
// cloud_unique_id is introduced before cloud_instance_id, so it has higher priority.
@ConfField
public static String cloud_unique_id = "";

// If cloud_unique_id is empty, cloud_instance_id works, otherwise cloud_unique_id works.
@ConfField
public static String cloud_instance_id = "";

public static boolean isCloudMode() {
return !cloud_unique_id.isEmpty();
return !cloud_unique_id.isEmpty() || !cloud_instance_id.isEmpty();
}

public static boolean isNotCloudMode() {
return cloud_unique_id.isEmpty();
return !isCloudMode();
}

/**
Expand Down
14 changes: 7 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,18 @@ public class Alter {

private AlterHandler schemaChangeHandler;
private AlterHandler materializedViewHandler;
private SystemHandler clusterHandler;
private SystemHandler systemHandler;

public Alter() {
schemaChangeHandler = Config.isCloudMode() ? new CloudSchemaChangeHandler() : new SchemaChangeHandler();
materializedViewHandler = new MaterializedViewHandler();
clusterHandler = new SystemHandler();
systemHandler = new SystemHandler();
}

public void start() {
schemaChangeHandler.start();
materializedViewHandler.start();
clusterHandler.start();
systemHandler.start();
}

public void processCreateMaterializedView(CreateMaterializedViewStmt stmt)
Expand Down Expand Up @@ -710,8 +710,8 @@ public void replayModifyViewDef(AlterViewInfo alterViewInfo) throws MetaNotFound
}
}

public void processAlterCluster(AlterSystemStmt stmt) throws UserException {
clusterHandler.process(Collections.singletonList(stmt.getAlterClause()), null, null);
public void processAlterSystem(AlterSystemStmt stmt) throws UserException {
systemHandler.process(Collections.singletonList(stmt.getAlterClause()), null, null);
}

private void processRename(Database db, OlapTable table, List<AlterClause> alterClauses) throws DdlException {
Expand Down Expand Up @@ -958,8 +958,8 @@ public AlterHandler getMaterializedViewHandler() {
return materializedViewHandler;
}

public AlterHandler getClusterHandler() {
return clusterHandler;
public AlterHandler getSystemHandler() {
return systemHandler;
}

public void processAlterMTMV(AlterMTMV alterMTMV, boolean isReplay) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class SystemHandler extends AlterHandler {
private static final Logger LOG = LogManager.getLogger(SystemHandler.class);

public SystemHandler() {
super("cluster");
super("system");
}

@Override
Expand Down Expand Up @@ -164,15 +164,15 @@ public synchronized void process(String rawSql, List<AlterClause> alterClauses,
} else if (alterClause instanceof AddObserverClause) {
AddObserverClause clause = (AddObserverClause) alterClause;
Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER, clause.getHost(),
clause.getPort(), "");
clause.getPort());
} else if (alterClause instanceof DropObserverClause) {
DropObserverClause clause = (DropObserverClause) alterClause;
Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER, clause.getHost(),
clause.getPort());
} else if (alterClause instanceof AddFollowerClause) {
AddFollowerClause clause = (AddFollowerClause) alterClause;
Env.getCurrentEnv().addFrontend(FrontendNodeType.FOLLOWER, clause.getHost(),
clause.getPort(), "");
clause.getPort());
} else if (alterClause instanceof DropFollowerClause) {
DropFollowerClause clause = (DropFollowerClause) alterClause;
Env.getCurrentEnv().dropFrontend(FrontendNodeType.FOLLOWER, clause.getHost(),
Expand Down
32 changes: 18 additions & 14 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ public class Env {
// node name is used for bdbje NodeName.
protected String nodeName;
protected FrontendNodeType role;
private FrontendNodeType feType;
protected FrontendNodeType feType;
// replica and observer use this value to decide provide read service or not
private long synchronizedTimeMs;
private MasterInfo masterInfo;
Expand Down Expand Up @@ -2977,11 +2977,19 @@ protected void runAfterCatalogReady() {
};
}

public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException {
addFrontend(role, host, editLogPort, "");
}

public void addFrontend(FrontendNodeType role, String host, int editLogPort, String nodeName) throws DdlException {
if (!tryLock(false)) {
throw new DdlException("Failed to acquire env lock. Try again");
}
try {
if (Strings.isNullOrEmpty(nodeName)) {
nodeName = genFeNodeName(host, editLogPort, true /* new name style */);
}

Frontend fe = checkFeExist(host, editLogPort);
if (fe != null) {
throw new DdlException("frontend already exists " + fe);
Expand All @@ -2990,10 +2998,6 @@ public void addFrontend(FrontendNodeType role, String host, int editLogPort, Str
throw new DdlException("frontend's hostName should not be empty while enable_fqdn_mode is true");
}

if (Strings.isNullOrEmpty(nodeName)) {
nodeName = genFeNodeName(host, editLogPort, false /* new name style */);
}

if (removedFrontends.contains(nodeName)) {
throw new DdlException("frontend name already exists " + nodeName + ". Try again");
}
Expand Down Expand Up @@ -3026,7 +3030,7 @@ public void modifyFrontendHostName(String srcHost, int srcPort, String destHost)
modifyFrontendHost(fe.getNodeName(), destHost);
}

public void modifyFrontendHost(String nodeName, String destHost) throws DdlException {
private void modifyFrontendHost(String nodeName, String destHost) throws DdlException {
if (!tryLock(false)) {
throw new DdlException("Failed to acquire env lock. Try again");
}
Expand Down Expand Up @@ -4303,8 +4307,8 @@ public CooldownConfHandler getCooldownConfHandler() {
return cooldownConfHandler;
}

public SystemHandler getClusterHandler() {
return (SystemHandler) this.alter.getClusterHandler();
public SystemHandler getSystemHandler() {
return (SystemHandler) this.alter.getSystemHandler();
}

public BackupHandler getBackupHandler() {
Expand Down Expand Up @@ -5425,15 +5429,15 @@ public void replayModifyTableDefaultDistributionBucketNum(ModifyTableDefaultDist
}

/*
* used for handling AlterClusterStmt
* (for client is the ALTER CLUSTER command).
* used for handling AlterSystemStmt
* (for client is the ALTER SYSTEM command).
*/
public void alterCluster(AlterSystemStmt stmt) throws DdlException, UserException {
this.alter.processAlterCluster(stmt);
public void alterSystem(AlterSystemStmt stmt) throws DdlException, UserException {
this.alter.processAlterSystem(stmt);
}

public void cancelAlterCluster(CancelAlterSystemStmt stmt) throws DdlException {
this.alter.getClusterHandler().cancel(stmt);
public void cancelAlterSystem(CancelAlterSystemStmt stmt) throws DdlException {
this.alter.getSystemHandler().cancel(stmt);
}

// Switch catalog of this session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.InternalCatalog;
Expand Down Expand Up @@ -186,6 +187,22 @@ public static void createDb() {
}
}

private static void trySetStorageVault(Map<String, String> properties) throws UserException {
CloudEnv cloudEnv = (CloudEnv) Env.getCurrentEnv();
if (Config.isCloudMode() && cloudEnv.getEnableStorageVault()) {
String storageVaultName;
Pair<String, String> info = cloudEnv.getStorageVaultMgr().getDefaultStorageVaultInfo();
if (info != null) {
storageVaultName = info.first;
} else {
throw new UserException("No default storage vault."
+ " You can use `SHOW STORAGE VAULT` to get all available vaults,"
+ " and pick one set default vault with `SET <vault_name> AS DEFAULT STORAGE VAULT`");
}
properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, storageVaultName);
}
}

private static CreateTableStmt buildStatisticsTblStmt(String statsTableName, List<String> uniqueKeys)
throws UserException {
TableName tableName = new TableName("", FeConstants.INTERNAL_DB_NAME, statsTableName);
Expand All @@ -200,9 +217,7 @@ private static CreateTableStmt buildStatisticsTblStmt(String statsTableName, Lis
}
};

if (Config.isCloudMode() && ((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) {
properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, FeConstants.BUILT_IN_STORAGE_VAULT_NAME);
}
trySetStorageVault(properties);

PropertyAnalyzer.getInstance().rewriteForceProperties(properties);
CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
Expand Down Expand Up @@ -238,9 +253,7 @@ private static CreateTableStmt buildAuditTblStmt() throws UserException {
}
};

if (Config.isCloudMode() && ((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) {
properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, FeConstants.BUILT_IN_STORAGE_VAULT_NAME);
}
trySetStorageVault(properties);

PropertyAnalyzer.getInstance().rewriteForceProperties(properties);
CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
Expand Down
Loading

0 comments on commit f43e7e7

Please sign in to comment.