From f43e7e79ed0e42fac56a3385f307d66b8e982f50 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Fri, 30 Aug 2024 12:28:41 +0800 Subject: [PATCH] [improvement](cloud) manage node via sql like non cloud mode --- be/src/agent/heartbeat_server.cpp | 13 ++ be/src/cloud/config.cpp | 5 +- be/src/cloud/config.h | 16 +- be/src/common/config.cpp | 4 +- .../meta-service/meta_service_resource.cpp | 5 +- .../src/resource-manager/resource_manager.cpp | 4 +- .../java/org/apache/doris/common/Config.java | 12 +- .../java/org/apache/doris/alter/Alter.java | 14 +- .../org/apache/doris/alter/SystemHandler.java | 6 +- .../java/org/apache/doris/catalog/Env.java | 32 ++-- .../catalog/InternalSchemaInitializer.java | 25 ++- .../apache/doris/cloud/catalog/CloudEnv.java | 56 +++++- .../doris/cloud/rpc/MetaServiceClient.java | 5 + .../doris/cloud/rpc/MetaServiceProxy.java | 10 + .../cloud/system/CloudSystemInfoService.java | 172 ++++++++++++++++++ .../doris/common/util/PropertyAnalyzer.java | 2 +- .../apache/doris/deploy/DeployManager.java | 4 +- .../doris/httpv2/rest/manager/NodeAction.java | 2 +- .../java/org/apache/doris/qe/DdlExecutor.java | 7 +- .../java/org/apache/doris/resource/Tag.java | 3 + .../doris/statistics/util/StatisticsUtil.java | 3 + .../org/apache/doris/system/HeartbeatMgr.java | 2 + .../apache/doris/clone/DecommissionTest.java | 2 +- .../cluster/DecommissionBackendTest.java | 8 +- gensrc/thrift/HeartbeatService.thrift | 2 + 25 files changed, 353 insertions(+), 61 deletions(-) diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index 02c9bda41b66696..1203127941394bf 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -26,6 +26,7 @@ #include #include +#include "cloud/config.h" #include "common/config.h" #include "common/status.h" #include "olap/storage_engine.h" @@ -244,6 +245,18 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { _engine.notify_listeners(); } + if (master_info.__isset.meta_service_endpoint && config::meta_service_endpoint.empty()) { + auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint, + true); + LOG(INFO) << "set config meta_service_endpoing " << st; + } + + if (master_info.__isset.cloud_instance_id && config::cloud_instance_id.empty()) { + auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id, true); + LOG(INFO) << "set config cloud_instance_id " << st; + config::set_cloud_unique_id(); + } + return Status::OK(); } diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 1b8256bf932135c..bd74d3a4f3d6032 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -19,8 +19,9 @@ namespace doris::config { -DEFINE_String(cloud_unique_id, ""); -DEFINE_String(meta_service_endpoint, ""); +DEFINE_mString(cloud_instance_id, ""); +DEFINE_mString(cloud_unique_id, ""); +DEFINE_mString(meta_service_endpoint, ""); DEFINE_Bool(meta_service_use_load_balancer, "false"); DEFINE_mInt32(meta_service_rpc_timeout_ms, "10000"); DEFINE_Bool(meta_service_connection_pooled, "true"); diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 104ead04996dd28..1855fc1bffab273 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -21,10 +21,20 @@ namespace doris::config { -DECLARE_String(cloud_unique_id); +DECLARE_mString(cloud_instance_id); + +DECLARE_mString(cloud_unique_id); static inline bool is_cloud_mode() { - return !cloud_unique_id.empty(); + return !cloud_unique_id.empty() || !cloud_instance_id.empty(); +} + +static inline void set_cloud_unique_id() { + if (cloud_unique_id.empty()) { + if (!cloud_instance_id.empty()) { + cloud_unique_id = "1:" + cloud_instance_id + ":compute"; + } + } } // Set the endpoint of meta service. @@ -40,7 +50,7 @@ static inline bool is_cloud_mode() { // If you want to access a group of meta services directly, set the addresses of meta services to this config, // separated by a comma, like "host:port,host:port,host:port", then BE will choose a server to connect in randomly. // In this mode, The config meta_service_connection_pooled is still useful, but the other two configs will be ignored. -DECLARE_String(meta_service_endpoint); +DECLARE_mString(meta_service_endpoint); // Set the underlying connection type to pooled. DECLARE_Bool(meta_service_connection_pooled); DECLARE_mInt64(meta_service_connection_pool_size); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 6c38a95bdc91c6b..d39b583686abe3d 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -37,10 +37,10 @@ #include #include +#include "cloud/config.h" #include "common/config.h" #include "common/logging.h" #include "common/status.h" -#include "config.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" #include "util/cpu_info.h" @@ -1658,6 +1658,8 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t SET_FIELD(it.second, std::vector, fill_conf_map, set_to_default); } + set_cloud_unique_id(); + return true; } diff --git a/cloud/src/meta-service/meta_service_resource.cpp b/cloud/src/meta-service/meta_service_resource.cpp index 90a88f86006643f..283bbe7392cd48e 100644 --- a/cloud/src/meta-service/meta_service_resource.cpp +++ b/cloud/src/meta-service/meta_service_resource.cpp @@ -2336,9 +2336,8 @@ void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller, std::find_if(instance.storage_vault_names().begin(), instance.storage_vault_names().end(), [](const std::string& name) { return name == BUILT_IN_STORAGE_VAULT_NAME; }) == instance.storage_vault_names().end()) { - code = MetaServiceCode::STORAGE_VAULT_NOT_FOUND; - msg = "instance has no built in storage vault"; - return; + LOG_EVERY_N(INFO, 100) << "There is no builtin vault in instance " + << instance.instance_id(); } auto get_cluster_mysql_user = [](const ClusterPB& c, std::set* mysql_users) { diff --git a/cloud/src/resource-manager/resource_manager.cpp b/cloud/src/resource-manager/resource_manager.cpp index 25848294aa78733..282be20baab3959 100644 --- a/cloud/src/resource-manager/resource_manager.cpp +++ b/cloud/src/resource-manager/resource_manager.cpp @@ -168,7 +168,9 @@ bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std:: if (ClusterPB::COMPUTE == cluster.type() && n.has_heartbeat_port() && n.heartbeat_port()) { continue; } - ss << "check cluster params failed, node : " << proto_to_json(n); + ss << "check cluster params failed, edit_log_port is required for frontends while " + "heatbeat_port is required for banckens, node : " + << proto_to_json(n); *err = ss.str(); no_err = false; break; diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 81e2c602c6e1726..c1152c81aa7859f 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1831,7 +1831,7 @@ public class Config extends ConfigBase { public static boolean enable_date_conversion = true; @ConfField(mutable = false, masterOnly = true) - public static boolean enable_multi_tags = false; + public static boolean enable_multi_tags = true; /** * If set to TRUE, FE will convert DecimalV2 to DecimalV3 automatically. @@ -2799,15 +2799,21 @@ public class Config extends ConfigBase { @ConfField public static int warn_sys_accumulated_file_size = 2; @ConfField public static int audit_sys_accumulated_file_size = 4; + // compatibily with elder version. + // cloud_unique_id is introduced before cloud_instance_id, so it has higher priority. @ConfField public static String cloud_unique_id = ""; + // If cloud_unique_id is empty, cloud_instance_id works, otherwise cloud_unique_id works. + @ConfField + public static String cloud_instance_id = ""; + public static boolean isCloudMode() { - return !cloud_unique_id.isEmpty(); + return !cloud_unique_id.isEmpty() || !cloud_instance_id.isEmpty(); } public static boolean isNotCloudMode() { - return cloud_unique_id.isEmpty(); + return !isCloudMode(); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 95ad5ae824bebcc..51ed811cc063368 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -99,18 +99,18 @@ public class Alter { private AlterHandler schemaChangeHandler; private AlterHandler materializedViewHandler; - private SystemHandler clusterHandler; + private SystemHandler systemHandler; public Alter() { schemaChangeHandler = Config.isCloudMode() ? new CloudSchemaChangeHandler() : new SchemaChangeHandler(); materializedViewHandler = new MaterializedViewHandler(); - clusterHandler = new SystemHandler(); + systemHandler = new SystemHandler(); } public void start() { schemaChangeHandler.start(); materializedViewHandler.start(); - clusterHandler.start(); + systemHandler.start(); } public void processCreateMaterializedView(CreateMaterializedViewStmt stmt) @@ -710,8 +710,8 @@ public void replayModifyViewDef(AlterViewInfo alterViewInfo) throws MetaNotFound } } - public void processAlterCluster(AlterSystemStmt stmt) throws UserException { - clusterHandler.process(Collections.singletonList(stmt.getAlterClause()), null, null); + public void processAlterSystem(AlterSystemStmt stmt) throws UserException { + systemHandler.process(Collections.singletonList(stmt.getAlterClause()), null, null); } private void processRename(Database db, OlapTable table, List alterClauses) throws DdlException { @@ -958,8 +958,8 @@ public AlterHandler getMaterializedViewHandler() { return materializedViewHandler; } - public AlterHandler getClusterHandler() { - return clusterHandler; + public AlterHandler getSystemHandler() { + return systemHandler; } public void processAlterMTMV(AlterMTMV alterMTMV, boolean isReplay) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index 28b3684ed04602b..6e221bf5ce7cb06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -72,7 +72,7 @@ public class SystemHandler extends AlterHandler { private static final Logger LOG = LogManager.getLogger(SystemHandler.class); public SystemHandler() { - super("cluster"); + super("system"); } @Override @@ -164,7 +164,7 @@ public synchronized void process(String rawSql, List alterClauses, } else if (alterClause instanceof AddObserverClause) { AddObserverClause clause = (AddObserverClause) alterClause; Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER, clause.getHost(), - clause.getPort(), ""); + clause.getPort()); } else if (alterClause instanceof DropObserverClause) { DropObserverClause clause = (DropObserverClause) alterClause; Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER, clause.getHost(), @@ -172,7 +172,7 @@ public synchronized void process(String rawSql, List alterClauses, } else if (alterClause instanceof AddFollowerClause) { AddFollowerClause clause = (AddFollowerClause) alterClause; Env.getCurrentEnv().addFrontend(FrontendNodeType.FOLLOWER, clause.getHost(), - clause.getPort(), ""); + clause.getPort()); } else if (alterClause instanceof DropFollowerClause) { DropFollowerClause clause = (DropFollowerClause) alterClause; Env.getCurrentEnv().dropFrontend(FrontendNodeType.FOLLOWER, clause.getHost(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 839072556413849..b3a565c50951c0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -423,7 +423,7 @@ public class Env { // node name is used for bdbje NodeName. protected String nodeName; protected FrontendNodeType role; - private FrontendNodeType feType; + protected FrontendNodeType feType; // replica and observer use this value to decide provide read service or not private long synchronizedTimeMs; private MasterInfo masterInfo; @@ -2977,11 +2977,19 @@ protected void runAfterCatalogReady() { }; } + public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { + addFrontend(role, host, editLogPort, ""); + } + public void addFrontend(FrontendNodeType role, String host, int editLogPort, String nodeName) throws DdlException { if (!tryLock(false)) { throw new DdlException("Failed to acquire env lock. Try again"); } try { + if (Strings.isNullOrEmpty(nodeName)) { + nodeName = genFeNodeName(host, editLogPort, true /* new name style */); + } + Frontend fe = checkFeExist(host, editLogPort); if (fe != null) { throw new DdlException("frontend already exists " + fe); @@ -2990,10 +2998,6 @@ public void addFrontend(FrontendNodeType role, String host, int editLogPort, Str throw new DdlException("frontend's hostName should not be empty while enable_fqdn_mode is true"); } - if (Strings.isNullOrEmpty(nodeName)) { - nodeName = genFeNodeName(host, editLogPort, false /* new name style */); - } - if (removedFrontends.contains(nodeName)) { throw new DdlException("frontend name already exists " + nodeName + ". Try again"); } @@ -3026,7 +3030,7 @@ public void modifyFrontendHostName(String srcHost, int srcPort, String destHost) modifyFrontendHost(fe.getNodeName(), destHost); } - public void modifyFrontendHost(String nodeName, String destHost) throws DdlException { + private void modifyFrontendHost(String nodeName, String destHost) throws DdlException { if (!tryLock(false)) { throw new DdlException("Failed to acquire env lock. Try again"); } @@ -4303,8 +4307,8 @@ public CooldownConfHandler getCooldownConfHandler() { return cooldownConfHandler; } - public SystemHandler getClusterHandler() { - return (SystemHandler) this.alter.getClusterHandler(); + public SystemHandler getSystemHandler() { + return (SystemHandler) this.alter.getSystemHandler(); } public BackupHandler getBackupHandler() { @@ -5425,15 +5429,15 @@ public void replayModifyTableDefaultDistributionBucketNum(ModifyTableDefaultDist } /* - * used for handling AlterClusterStmt - * (for client is the ALTER CLUSTER command). + * used for handling AlterSystemStmt + * (for client is the ALTER SYSTEM command). */ - public void alterCluster(AlterSystemStmt stmt) throws DdlException, UserException { - this.alter.processAlterCluster(stmt); + public void alterSystem(AlterSystemStmt stmt) throws DdlException, UserException { + this.alter.processAlterSystem(stmt); } - public void cancelAlterCluster(CancelAlterSystemStmt stmt) throws DdlException { - this.alter.getClusterHandler().cancel(stmt); + public void cancelAlterSystem(CancelAlterSystemStmt stmt) throws DdlException { + this.alter.getSystemHandler().cancel(stmt); } // Switch catalog of this session diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index 87e8a0fc3b0ce85..f07ea1bf5a12ea6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -34,6 +34,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.datasource.InternalCatalog; @@ -186,6 +187,22 @@ public static void createDb() { } } + private static void trySetStorageVault(Map properties) throws UserException { + CloudEnv cloudEnv = (CloudEnv) Env.getCurrentEnv(); + if (Config.isCloudMode() && cloudEnv.getEnableStorageVault()) { + String storageVaultName; + Pair info = cloudEnv.getStorageVaultMgr().getDefaultStorageVaultInfo(); + if (info != null) { + storageVaultName = info.first; + } else { + throw new UserException("No default storage vault." + + " You can use `SHOW STORAGE VAULT` to get all available vaults," + + " and pick one set default vault with `SET AS DEFAULT STORAGE VAULT`"); + } + properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, storageVaultName); + } + } + private static CreateTableStmt buildStatisticsTblStmt(String statsTableName, List uniqueKeys) throws UserException { TableName tableName = new TableName("", FeConstants.INTERNAL_DB_NAME, statsTableName); @@ -200,9 +217,7 @@ private static CreateTableStmt buildStatisticsTblStmt(String statsTableName, Lis } }; - if (Config.isCloudMode() && ((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) { - properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, FeConstants.BUILT_IN_STORAGE_VAULT_NAME); - } + trySetStorageVault(properties); PropertyAnalyzer.getInstance().rewriteForceProperties(properties); CreateTableStmt createTableStmt = new CreateTableStmt(true, false, @@ -238,9 +253,7 @@ private static CreateTableStmt buildAuditTblStmt() throws UserException { } }; - if (Config.isCloudMode() && ((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) { - properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, FeConstants.BUILT_IN_STORAGE_VAULT_NAME); - } + trySetStorageVault(properties); PropertyAnalyzer.getInstance().rewriteForceProperties(properties); CreateTableStmt createTableStmt = new CreateTableStmt(true, false, diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java index 535b8ea582f8452..565bf0b928a3952 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java @@ -38,6 +38,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.CountingDataOutputStream; +import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.SystemInfoService.HostInfo; @@ -88,6 +89,18 @@ public CloudUpgradeMgr getCloudUpgradeMgr() { return this.upgradeMgr; } + @Override + public void initialize(String[] args) throws Exception { + if (Config.cloud_unique_id == null || Config.cloud_unique_id.isEmpty()) { + LOG.info("cloud_unique_id is not set, setting it using instance_id"); + Config.cloud_unique_id = "1:" + Config.cloud_instance_id + ":sql_server00"; + } + + LOG.info("Initializing CloudEnv with cloud_unique_id: {}", Config.cloud_unique_id); + + super.initialize(args); + } + @Override protected void startMasterOnlyDaemonThreads() { LOG.info("start cloud Master only daemon threads"); @@ -116,9 +129,13 @@ public CacheHotspotManager getCacheHotspotMgr() { return cacheHotspotMgr; } + private CloudSystemInfoService getCloudSystemInfoService() { + return (CloudSystemInfoService) systemInfo; + } + private Cloud.NodeInfoPB getLocalTypeFromMetaService() { // get helperNodes from ms - Cloud.GetClusterResponse response = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + Cloud.GetClusterResponse response = getCloudSystemInfoService() .getCloudCluster(Config.cloud_sql_server_cluster_name, Config.cloud_sql_server_cluster_id, ""); if (!response.hasStatus() || !response.getStatus().hasCode() || response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { @@ -167,9 +184,18 @@ protected void getClusterIdAndRole() throws IOException { if (nodeInfoPB == null) { LOG.warn("failed to get local fe's type, sleep 5 s, try again."); try { - Thread.sleep(5000); + try { + getCloudSystemInfoService().tryCreateInstance(Config.cloud_instance_id, + Config.cloud_instance_id, false); + } catch (Exception e) { + Thread.sleep(5000); + throw e; + } + addFrontend(FrontendNodeType.MASTER, selfNode.getHost(), selfNode.getPort()); } catch (InterruptedException e) { LOG.warn("thread sleep Exception", e); + } catch (DdlException e) { + LOG.warn("get ddl exception ", e); } continue; } @@ -207,7 +233,7 @@ public void checkCloudClusterPriv(String clusterName) throws DdlException { + "' for cloud cluster '" + clusterName + "'", ErrorCode.ERR_CLUSTER_NO_PERMISSIONS); } - if (!((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterNames().contains(clusterName)) { + if (!getCloudSystemInfoService().getCloudClusterNames().contains(clusterName)) { if (LOG.isDebugEnabled()) { LOG.debug("current instance does not have a cluster name :{}", clusterName); } @@ -218,9 +244,9 @@ public void checkCloudClusterPriv(String clusterName) throws DdlException { public void changeCloudCluster(String clusterName, ConnectContext ctx) throws DdlException { checkCloudClusterPriv(clusterName); - ((CloudSystemInfoService) Env.getCurrentSystemInfo()).waitForAutoStart(clusterName); + getCloudSystemInfoService().waitForAutoStart(clusterName); try { - ((CloudSystemInfoService) Env.getCurrentSystemInfo()).addCloudCluster(clusterName, ""); + getCloudSystemInfoService().addCloudCluster(clusterName, ""); } catch (UserException e) { throw new DdlException(e.getMessage(), e.getMysqlErrorCode()); } @@ -330,4 +356,24 @@ public long saveCloudWarmUpJob(CountingDataOutputStream dos, long checksum) thro public void cancelCloudWarmUp(CancelCloudWarmUpStmt stmt) throws DdlException { getCacheHotspotMgr().cancel(stmt); } + + @Override + public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { + getCloudSystemInfoService().addFrontend(role, host, editLogPort); + } + + @Override + public void dropFrontend(FrontendNodeType role, String host, int port) throws DdlException { + if (port == selfNode.getPort() && feType == FrontendNodeType.MASTER + && selfNode.getHost().equals(host)) { + throw new DdlException("can not drop current master node."); + } + + getCloudSystemInfoService().dropFrontend(role, host, port); + } + + @Override + public void modifyFrontendHostName(String srcHost, int srcPort, String destHost) throws DdlException { + throw new DdlException("modify frontend host name is not supported in cloud mode"); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index c9c799687abb612..47108b147f5bc5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -403,4 +403,9 @@ public Cloud. ResetRLProgressResponse resetRLProgress(Cloud. ResetRLProgressRequ } return blockingStub.finishTabletJob(request); } + + public Cloud.CreateInstanceResponse + createInstance(Cloud.CreateInstanceRequest request) { + return blockingStub.createInstance(request); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index 7d47ec70c1badaa..ec295902f88e05f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -106,6 +106,7 @@ private MetaServiceClient getProxy() { } String address = Config.meta_service_endpoint; + address = address.replaceAll("^[\"']|[\"']$", ""); MetaServiceClient service = serviceMap.get(address); if (service != null && service.isNormalState()) { return service; @@ -547,4 +548,13 @@ public Cloud.ResetRLProgressResponse resetRLProgress(Cloud.ResetRLProgressReques throw new RpcException("", e.getMessage(), e); } } + + public Cloud.CreateInstanceResponse createInstance(Cloud.CreateInstanceRequest request) throws RpcException { + try { + final MetaServiceClient client = getProxy(); + return client.createInstance(request); + } catch (Exception e) { + throw new RpcException("", e.getMessage(), e); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index 48728efb003aba6..3254fdead693ff9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -29,6 +29,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.common.RandomIdentifierGenerator; import org.apache.doris.common.UserException; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.metric.MetricRepo; @@ -299,6 +300,75 @@ public synchronized void updateFrontends(List toAdd, List to } } + private void alterBackendCluster(List hostInfos, String clusterName, + Cloud.AlterClusterRequest.Operation operation) throws DdlException { + // Issue rpc to meta to alter node, then fe master would add this node to its frontends + Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder() + .setClusterName(clusterName) + .setType(Cloud.ClusterPB.Type.COMPUTE) + .build(); + + for (HostInfo hostInfo : hostInfos) { + Cloud.NodeInfoPB nodeInfoPB = Cloud.NodeInfoPB.newBuilder() + .setIp(hostInfo.getHost()) + .setHost(hostInfo.getHost()) + .setHeartbeatPort(hostInfo.getPort()) + .setCtime(System.currentTimeMillis() / 1000) + .build(); + clusterPB = clusterPB.toBuilder().addNodes(nodeInfoPB).build(); + } + + Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder() + .setInstanceId(Config.cloud_instance_id) + .setOp(operation) + .setCluster(clusterPB) + .build(); + + Cloud.AlterClusterResponse response; + try { + response = MetaServiceProxy.getInstance().alterCluster(request); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("alter backends not ok, response: {}", response); + throw new DdlException("failed to alter backends errorCode: " + response.getStatus().getCode() + + " msg: " + response.getStatus().getMsg()); + } + } catch (RpcException e) { + throw new DdlException("failed to alter backends", e); + } + } + + /** + * @param hostInfos : backend's ip, hostName and port + * @throws DdlException + */ + @Override + public void addBackends(List hostInfos, Map tagMap) throws UserException { + // issue rpc to meta to add this node, then fe master would add this node to its backends + + String clusterName = tagMap.getOrDefault(Tag.CLOUD_CLUSTER_NAME, Tag.VALUE_DEFAULT_CLOUD_CLUSTER_NAME); + if (clusterName.isEmpty()) { + throw new UserException("clusterName empty"); + } + + tryCreateCluster(clusterName, RandomIdentifierGenerator.generateRandomIdentifier(8)); + alterBackendCluster(hostInfos, clusterName, Cloud.AlterClusterRequest.Operation.ADD_NODE); + } + + @Override + public void dropBackends(List hostInfos) throws DdlException { + alterBackendCluster(hostInfos, "", Cloud.AlterClusterRequest.Operation.DROP_NODE); + } + + @Override + public void dropBackendsByIds(List ids) throws DdlException { + for (String id : ids) { + if (getBackend(Long.parseLong(id)) == null) { + throw new DdlException("backend does not exists[" + id + "]"); + } + dropBackend(Long.parseLong(id)); + } + } + @Override public void replayAddBackend(Backend newBackend) { super.replayAddBackend(newBackend); @@ -654,6 +724,87 @@ public Map getCloudClusterNameToId() { } } + // FrontendCluster = SqlServerCluster + private void alterFrontendCluster(FrontendNodeType role, String host, int editLogPort, + Cloud.AlterClusterRequest.Operation op) throws DdlException { + // Issue rpc to meta to add this node, then fe master would add this node to its frontends + Cloud.NodeInfoPB nodeInfoPB = Cloud.NodeInfoPB.newBuilder() + .setIp(host) + .setHost(host) + .setEditLogPort(editLogPort) + .setNodeType(role == FrontendNodeType.MASTER ? Cloud.NodeInfoPB.NodeType.FE_MASTER + : Cloud.NodeInfoPB.NodeType.FE_OBSERVER) + .setCtime(System.currentTimeMillis() / 1000) + .build(); + + Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder() + .setClusterId(Config.cloud_sql_server_cluster_id) + .setClusterName(Config.cloud_sql_server_cluster_name) + .setType(Cloud.ClusterPB.Type.SQL) + .addNodes(nodeInfoPB) + .build(); + + Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder() + .setInstanceId(Config.cloud_instance_id) + .setOp(op) + .setCluster(clusterPB) + .build(); + + Cloud.AlterClusterResponse response; + try { + response = MetaServiceProxy.getInstance().alterCluster(request); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("alter frontend not ok, response: {}", response); + throw new DdlException("failed to alter frontend errorCode: " + response.getStatus().getCode() + + " msg: " + response.getStatus().getMsg()); + } + } catch (RpcException e) { + throw new DdlException("failed to alter frontend", e); + } + } + + public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { + if (role != FrontendNodeType.MASTER && role != FrontendNodeType.OBSERVER) { + throw new DdlException("unsupported frontend role: " + role); + } + + Cloud.AlterClusterRequest.Operation op; + op = role == FrontendNodeType.MASTER ? Cloud.AlterClusterRequest.Operation.ADD_CLUSTER + : Cloud.AlterClusterRequest.Operation.ADD_NODE; + alterFrontendCluster(role, host, editLogPort, op); + } + + public void dropFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { + alterFrontendCluster(role, host, editLogPort, Cloud.AlterClusterRequest.Operation.DROP_NODE); + } + + private void tryCreateCluster(String clusterName, String clusterId) throws UserException { + Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder() + .setClusterId(clusterId) + .setClusterName(clusterName) + .setType(Cloud.ClusterPB.Type.COMPUTE) + .build(); + + Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder() + .setCloudUniqueId(Config.cloud_unique_id) + .setOp(Cloud.AlterClusterRequest.Operation.ADD_CLUSTER) + .setCluster(clusterPB) + .build(); + + Cloud.AlterClusterResponse response; + try { + response = MetaServiceProxy.getInstance().alterCluster(request); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK + && response.getStatus().getCode() != Cloud.MetaServiceCode.ALREADY_EXISTED) { + LOG.warn("create cluster not ok, response: {}", response); + throw new UserException("failed to create cluster errorCode: " + response.getStatus().getCode() + + " msg: " + response.getStatus().getMsg()); + } + } catch (RpcException e) { + throw new UserException("failed to create cluster", e); + } + } + public List> getCurrentObFrontends() { List frontends = Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER); List> frontendsPair = new ArrayList<>(); @@ -816,4 +967,25 @@ public void waitForAutoStart(String clusterName) throws DdlException { LOG.info("auto start cluster {}, start cost {} ms", clusterName, stopWatch.getTime()); } } + + public void tryCreateInstance(String instanceId, String name, boolean sseEnabled) throws DdlException { + Cloud.CreateInstanceRequest.Builder builder = Cloud.CreateInstanceRequest.newBuilder(); + builder.setInstanceId(instanceId); + builder.setName(name); + builder.setSseEnabled(sseEnabled); + + Cloud.CreateInstanceResponse response; + try { + response = MetaServiceProxy.getInstance().createInstance(builder.build()); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK + && response.getStatus().getCode() != Cloud.MetaServiceCode.ALREADY_EXISTED) { + LOG.warn("Failed to create instance {}, response: {}", instanceId, response); + throw new DdlException("Failed to create instance"); + } + LOG.info("Successfully created instance {}, response: {}", instanceId, response); + } catch (RpcException e) { + LOG.warn("Failed to create instance {}", instanceId, e); + throw new DdlException("Failed to create instance"); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 6000831172e421d..128814711a00e2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -1235,7 +1235,7 @@ public static Map analyzeBackendTagsProperties(Map RESERVED_TAG_TYPE = ImmutableSet.of( diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 0f5c81b1cf06929..f01c1a13d1bfbf0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -480,10 +480,13 @@ public static boolean statsTblAvailable() { // dbName, // StatisticConstants.HISTOGRAM_TBL_NAME)); } catch (Throwable t) { + LOG.info("stat table does not exist, db_name: {}, table_name: {}", dbName, + StatisticConstants.TABLE_STATISTIC_TBL_NAME); return false; } if (Config.isCloudMode()) { if (!((CloudSystemInfoService) Env.getCurrentSystemInfo()).availableBackendsExists()) { + LOG.info("there are no available backends"); return false; } try (AutoCloseConnectContext r = buildConnectContext()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 515db76b096d089..d2d05e97995a580 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -95,6 +95,8 @@ public void setMaster(int clusterId, String token, long epoch) { tMasterInfo.setHttpPort(Config.http_port); long flags = heartbeatFlags.getHeartbeatFlags(); tMasterInfo.setHeartbeatFlags(flags); + tMasterInfo.setCloudInstanceId(Config.cloud_instance_id); + tMasterInfo.setMetaServiceEndpoint(Config.meta_service_endpoint); masterInfo.set(tMasterInfo); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java index 81e29d3abf385dd..6901d3b604c9b56 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java @@ -144,7 +144,7 @@ public void testDecommissionBackend() throws Exception { + ":" + backend.getHeartbeatPort() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(decommissionStmtStr, connectContext); - Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionStmt); Assert.assertEquals(true, backend.isDecommissioned()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index fc80ecbd97dd9d9..53355f6faa4929a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -100,7 +100,7 @@ public void testDecommissionBackend() throws Exception { Assertions.assertNotNull(srcBackend); String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); - Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionStmt); Assertions.assertTrue(srcBackend.isDecommissioned()); long startTimestamp = System.currentTimeMillis(); @@ -153,7 +153,7 @@ public void testDecommissionBackendById() throws Exception { // decommission backend by id String decommissionByIdStmtStr = "alter system decommission backend \"" + srcBackend.getId() + "\""; AlterSystemStmt decommissionByIdStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionByIdStmtStr); - Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionByIdStmt); + Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionByIdStmt); Assertions.assertTrue(srcBackend.isDecommissioned()); long startTimestamp = System.currentTimeMillis(); @@ -215,7 +215,7 @@ public void testDecommissionBackendWithDropTable() throws Exception { // 6. execute decommission String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); - Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionStmt); Assertions.assertTrue(srcBackend.isDecommissioned()); long startTimestamp = System.currentTimeMillis(); @@ -315,7 +315,7 @@ public void testDecommissionBackendWithMTMV() throws Exception { String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); - Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionStmt); Assertions.assertTrue(srcBackend.isDecommissioned()); long startTimestamp = System.currentTimeMillis(); diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index 4daea779735f58a..7c94bc9ce0c8717 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -39,6 +39,8 @@ struct TMasterInfo { 7: optional i64 heartbeat_flags 8: optional i64 backend_id 9: optional list frontend_infos + 10: optional string meta_service_endpoint; + 11: optional string cloud_instance_id; } struct TBackendInfo {