diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index 02c9bda41b6669..e981e9165b5e1d 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -26,6 +26,7 @@ #include #include +#include "cloud/config.h" #include "common/config.h" #include "common/status.h" #include "olap/storage_engine.h" @@ -244,6 +245,37 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { _engine.notify_listeners(); } + if (master_info.__isset.meta_service_endpoint != config::is_cloud_mode()) { + return Status::InvalidArgument( + "fe and be do not work in same mode, fe cloud mode: {}," + " be cloud mode: {}", + master_info.__isset.meta_service_endpoint, config::is_cloud_mode()); + } + + if (master_info.__isset.meta_service_endpoint && config::meta_service_endpoint.empty() && + !master_info.meta_service_endpoint.empty()) { + auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint, + true); + LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint << " " + << st; + } + + if (master_info.__isset.cloud_instance_id) { + if (!config::cloud_instance_id.empty() && + config::cloud_instance_id != master_info.cloud_instance_id) { + return Status::InvalidArgument( + "cloud_instance_id in fe.conf and be.conf are not same, fe: {}, be: {}", + master_info.cloud_instance_id, config::cloud_instance_id); + } + + if (config::cloud_instance_id.empty() && !master_info.cloud_instance_id.empty()) { + auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id, true); + config::set_cloud_unique_id(master_info.cloud_instance_id); + LOG(INFO) << "set config cloud_instance_id " << master_info.cloud_instance_id << " " + << st; + } + } + return Status::OK(); } diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 816f1108299cb8..36217ddb61de7d 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -943,7 +943,7 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos, bool j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx"); } - LOG(INFO) << "get storage vault, enable_storage_vault=" << is_vault_mode + LOG(INFO) << "get storage vault, enable_storage_vault=" << *is_vault_mode << " response=" << resp.ShortDebugString(); return Status::OK(); } diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index b98b2e3d0efc45..1cf6e2ac039828 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -160,29 +160,7 @@ struct RefreshFSVaultVisitor { }; Status CloudStorageEngine::open() { - cloud::StorageVaultInfos vault_infos; - bool enable_storage_vault = false; - do { - auto st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault); - if (st.ok()) { - break; - } - - LOG(WARNING) << "failed to get vault info, retry after 5s, err=" << st; - std::this_thread::sleep_for(5s); - } while (vault_infos.empty()); - - for (auto& [id, vault_info, path_format] : vault_infos) { - if (auto st = std::visit(VaultCreateFSVisitor {id, path_format}, vault_info); !st.ok()) - [[unlikely]] { - return vault_process_error(id, vault_info, std::move(st)); - } - } - - // vault mode should not support latest_fs to get rid of unexpected storage backends choosen - if (!enable_storage_vault) { - set_latest_fs(get_filesystem(std::get<0>(vault_infos.back()))); - } + sync_storage_vault(); // TODO(plat1ko): DeleteBitmapTxnManager diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 337b3194fd22b0..d3a55c3c377276 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -74,14 +74,27 @@ class CloudStorageEngine final : public BaseStorageEngine { } void _check_file_cache_ttl_block_valid(); - std::optional get_storage_resource(const std::string& vault_id) const { + std::optional get_storage_resource(const std::string& vault_id) { + LOG(INFO) << "Getting storage resource for vault_id: " << vault_id; if (vault_id.empty()) { + if (latest_fs() == nullptr) { + LOG(INFO) << "there is not latest fs"; + return std::nullopt; + } return StorageResource {latest_fs()}; } - if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) { - return storage_resource->first; - } + bool synced = false; + do { + if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) { + return storage_resource->first; + } + if (synced) { + break; + } + sync_storage_vault(); + synced = true; + } while (true); return std::nullopt; } diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 1b8256bf932135..0f59b51059b69e 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -17,10 +17,14 @@ #include "cloud/config.h" +#include "common/status.h" + namespace doris::config { -DEFINE_String(cloud_unique_id, ""); -DEFINE_String(meta_service_endpoint, ""); +DEFINE_String(deploy_mode, ""); +DEFINE_mString(cloud_instance_id, ""); +DEFINE_mString(cloud_unique_id, ""); +DEFINE_mString(meta_service_endpoint, ""); DEFINE_Bool(meta_service_use_load_balancer, "false"); DEFINE_mInt32(meta_service_rpc_timeout_ms, "10000"); DEFINE_Bool(meta_service_connection_pooled, "true"); @@ -64,4 +68,10 @@ DEFINE_mBool(enable_new_tablet_do_compaction, "false"); DEFINE_Bool(enable_cloud_txn_lazy_commit, "false"); +void set_cloud_unique_id(std::string instance_id) { + if (cloud_unique_id.empty() && !instance_id.empty()) { + static_cast(set_config("cloud_unique_id", "1:" + instance_id + ":compute", true)); + } +} + } // namespace doris::config diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 104ead04996dd2..57f6348df7067b 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -21,12 +21,17 @@ namespace doris::config { -DECLARE_String(cloud_unique_id); +DECLARE_String(deploy_mode); +// deprecated do not configure directly +DECLARE_mString(cloud_instance_id); +DECLARE_mString(cloud_unique_id); static inline bool is_cloud_mode() { - return !cloud_unique_id.empty(); + return deploy_mode == "cloud" || !cloud_unique_id.empty(); } +void set_cloud_unique_id(std::string instance_id); + // Set the endpoint of meta service. // // If meta services are deployed behind a load balancer, set this config to "host:port" of the load balancer. @@ -40,7 +45,7 @@ static inline bool is_cloud_mode() { // If you want to access a group of meta services directly, set the addresses of meta services to this config, // separated by a comma, like "host:port,host:port,host:port", then BE will choose a server to connect in randomly. // In this mode, The config meta_service_connection_pooled is still useful, but the other two configs will be ignored. -DECLARE_String(meta_service_endpoint); +DECLARE_mString(meta_service_endpoint); // Set the underlying connection type to pooled. DECLARE_Bool(meta_service_connection_pooled); DECLARE_mInt64(meta_service_connection_pool_size); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c310f6b3e158fa..75f4bb7c4eaada 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -37,6 +37,7 @@ #include #include +#include "cloud/config.h" #include "common/config.h" #include "common/logging.h" #include "common/status.h" @@ -1673,6 +1674,8 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t SET_FIELD(it.second, std::vector, fill_conf_map, set_to_default); } + set_cloud_unique_id(cloud_instance_id); + return true; } diff --git a/cloud/src/meta-service/meta_service_resource.cpp b/cloud/src/meta-service/meta_service_resource.cpp index 8a25a73771ccbd..b8bef65c91b264 100644 --- a/cloud/src/meta-service/meta_service_resource.cpp +++ b/cloud/src/meta-service/meta_service_resource.cpp @@ -2338,9 +2338,8 @@ void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller, std::find_if(instance.storage_vault_names().begin(), instance.storage_vault_names().end(), [](const std::string& name) { return name == BUILT_IN_STORAGE_VAULT_NAME; }) == instance.storage_vault_names().end()) { - code = MetaServiceCode::STORAGE_VAULT_NOT_FOUND; - msg = "instance has no built in storage vault"; - return; + LOG_EVERY_N(INFO, 100) << "There is no builtin vault in instance " + << instance.instance_id(); } auto get_cluster_mysql_user = [](const ClusterPB& c, std::set* mysql_users) { diff --git a/cloud/src/resource-manager/resource_manager.cpp b/cloud/src/resource-manager/resource_manager.cpp index 58122ac3f7cb3c..43f0a7368d812b 100644 --- a/cloud/src/resource-manager/resource_manager.cpp +++ b/cloud/src/resource-manager/resource_manager.cpp @@ -171,7 +171,9 @@ bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std:: n.heartbeat_port()) { continue; } - ss << "check cluster params failed, node : " << proto_to_json(n); + ss << "check cluster params failed, edit_log_port is required for frontends while " + "heatbeat_port is required for banckens, node : " + << proto_to_json(n); *err = ss.str(); no_err = false; break; diff --git a/docker/runtime/doris-compose/cluster.py b/docker/runtime/doris-compose/cluster.py index fcddd7f4c938b8..0ce12f3c7d4d19 100644 --- a/docker/runtime/doris-compose/cluster.py +++ b/docker/runtime/doris-compose/cluster.py @@ -291,15 +291,27 @@ def docker_env(self): enable_coverage = self.cluster.coverage_dir envs = { - "MY_IP": self.get_ip(), - "MY_ID": self.id, - "MY_TYPE": self.node_type(), - "FE_QUERY_PORT": FE_QUERY_PORT, - "FE_EDITLOG_PORT": FE_EDITLOG_PORT, - "BE_HEARTBEAT_PORT": BE_HEARTBEAT_PORT, - "DORIS_HOME": os.path.join(self.docker_home_dir()), - "STOP_GRACE": 1 if enable_coverage else 0, - "IS_CLOUD": 1 if self.cluster.is_cloud else 0, + "MY_IP": + self.get_ip(), + "MY_ID": + self.id, + "MY_TYPE": + self.node_type(), + "FE_QUERY_PORT": + FE_QUERY_PORT, + "FE_EDITLOG_PORT": + FE_EDITLOG_PORT, + "BE_HEARTBEAT_PORT": + BE_HEARTBEAT_PORT, + "DORIS_HOME": + os.path.join(self.docker_home_dir()), + "STOP_GRACE": + 1 if enable_coverage else 0, + "IS_CLOUD": + 1 if self.cluster.is_cloud else 0, + "SQL_MODE_NODE_MGR": + 1 if hasattr(self.cluster, 'sql_mode_node_mgr') + and self.cluster.sql_mode_node_mgr else 0 } if self.cluster.is_cloud: @@ -390,15 +402,22 @@ def get_add_init_config(self): cfg += self.cluster.fe_config if self.cluster.is_cloud: cfg += [ - "cloud_unique_id = " + self.cloud_unique_id(), "meta_service_endpoint = {}".format( - self.cluster.get_meta_server_addr()), - "", + self.cluster.get_meta_server_addr()), "", "# For regression-test", "ignore_unsupported_properties_in_cloud_mode = true", "merge_on_write_forced_to_false = true", + "deploy_mode = cloud" ] + if self.cluster.sql_mode_node_mgr: + cfg += [ + "cloud_instance_id = " + self.cloud_instance_id(), + ] + else: + cfg += [ + "cloud_unique_id = " + self.cloud_unique_id(), + ] return cfg def init_is_follower(self): @@ -420,6 +439,9 @@ def docker_env(self): def cloud_unique_id(self): return "sql_server_{}".format(self.id) + def cloud_instance_id(self): + return "reg_cloud_instance" + def entrypoint(self): return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_fe.sh")] @@ -450,14 +472,26 @@ def get_add_init_config(self): cfg += self.cluster.be_config if self.cluster.is_cloud: cfg += [ - "cloud_unique_id = " + self.cloud_unique_id(), - "meta_service_endpoint = {}".format( - self.cluster.get_meta_server_addr()), 'tmp_file_dirs = [ {"path":"./storage/tmp","max_cache_bytes":10240000, "max_upload_bytes":10240000}]', 'enable_file_cache = true', 'file_cache_path = [ {{"path": "{}/storage/file_cache", "total_size":53687091200, "query_limit": 10737418240}}]' .format(self.docker_home_dir()), + "deploy_mode = cloud", ] + + if self.cluster.be_metaservice_endpoint: + cfg += [ + "meta_service_endpoint = {}".format( + self.cluster.get_meta_server_addr()), + ] + if self.cluster.be_cloud_instanceid: + cfg += [ + "cloud_instance_id = " + self.cloud_instance_id(), + ] + if not self.cluster.sql_mode_node_mgr: + cfg += [ + "cloud_unique_id = " + self.cloud_unique_id(), + ] return cfg def init_cluster_name(self): @@ -519,6 +553,9 @@ def docker_env(self): def cloud_unique_id(self): return "compute_node_{}".format(self.id) + def cloud_instance_id(self): + return "reg_cloud_instance" + def docker_home_dir(self): return os.path.join(DOCKER_DORIS_PATH, "be") @@ -628,7 +665,8 @@ class Cluster(object): def __init__(self, name, subnet, image, is_cloud, fe_config, be_config, ms_config, recycle_config, fe_follower, be_disks, be_cluster, - reg_be, coverage_dir, cloud_store_config): + reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr, + be_metaservice_endpoint, be_cloud_instanceid): self.name = name self.subnet = subnet self.image = image @@ -647,11 +685,15 @@ def __init__(self, name, subnet, image, is_cloud, fe_config, be_config, node_type: Group(node_type) for node_type in Node.TYPE_ALL } + self.sql_mode_node_mgr = sql_mode_node_mgr + self.be_metaservice_endpoint = be_metaservice_endpoint + self.be_cloud_instanceid = be_cloud_instanceid @staticmethod def new(name, image, is_cloud, fe_config, be_config, ms_config, recycle_config, fe_follower, be_disks, be_cluster, reg_be, - coverage_dir, cloud_store_config): + coverage_dir, cloud_store_config, sql_mode_node_mgr, + be_metaservice_endpoint, be_cloud_instanceid): if not os.path.exists(LOCAL_DORIS_PATH): os.makedirs(LOCAL_DORIS_PATH, exist_ok=True) os.chmod(LOCAL_DORIS_PATH, 0o777) @@ -663,7 +705,9 @@ def new(name, image, is_cloud, fe_config, be_config, ms_config, cluster = Cluster(name, subnet, image, is_cloud, fe_config, be_config, ms_config, recycle_config, fe_follower, be_disks, be_cluster, reg_be, - coverage_dir, cloud_store_config) + coverage_dir, cloud_store_config, + sql_mode_node_mgr, be_metaservice_endpoint, + be_cloud_instanceid) os.makedirs(cluster.get_path(), exist_ok=True) os.makedirs(get_status_path(name), exist_ok=True) cluster._save_meta() diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py index ed88dd03f4daf8..de818c4f6c4f1c 100644 --- a/docker/runtime/doris-compose/command.py +++ b/docker/runtime/doris-compose/command.py @@ -297,6 +297,45 @@ def add_parser(self, args_parsers): default="", help="Set code coverage output directory") + parser.add_argument("--sql-mode-node-mgr", + default=False, + action=self._get_parser_bool_action(True), + help="Manager fe be via sql instead of http") + + if self._support_boolean_action(): + parser.add_argument( + "--be-metaservice-endpoint", + default=True, + action=self._get_parser_bool_action(False), + help= + "Do not set BE meta service endpoint in conf. Default is False." + ) + else: + parser.add_argument( + "--no-be-metaservice-endpoint", + dest='be_metaservice_endpoint', + default=True, + action=self._get_parser_bool_action(False), + help= + "Do not set BE meta service endpoint in conf. Default is False." + ) + + if self._support_boolean_action(): + parser.add_argument( + "--be-cloud-instanceid", + default=True, + action=self._get_parser_bool_action(False), + help= + "Do not set BE cloud instance ID in conf. Default is False.") + else: + parser.add_argument( + "--no-be-cloud-instanceid", + dest='be_cloud_instanceid', + default=True, + action=self._get_parser_bool_action(False), + help= + "Do not set BE cloud instance ID in conf. Default is False.") + parser.add_argument( "--fdb-version", type=str, @@ -394,7 +433,8 @@ def run(self, args): args.NAME, args.IMAGE, args.cloud, args.fe_config, args.be_config, args.ms_config, args.recycle_config, args.fe_follower, args.be_disks, args.be_cluster, args.reg_be, - args.coverage_dir, cloud_store_config) + args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr, + args.be_metaservice_endpoint, args.be_cloud_instanceid) LOG.info("Create new cluster {} succ, cluster path is {}".format( args.NAME, cluster.get_path())) @@ -483,6 +523,49 @@ def do_add_node(node_type, add_num, add_ids): "Not up cluster cause specific --no-start, related node num {}" .format(related_node_num))) else: + LOG.info("Using SQL mode for node management ? {}".format( + args.sql_mode_node_mgr)) + + # Wait for FE master to be elected + LOG.info("Waiting for FE master to be elected...") + expire_ts = time.time() + 30 + while expire_ts > time.time(): + db_mgr = database.get_db_mgr(args.NAME, False) + for id in add_fe_ids: + fe_state = db_mgr.get_fe(id) + if fe_state is not None and fe_state.alive: + break + LOG.info("there is no fe ready") + time.sleep(5) + + if cluster.is_cloud and args.sql_mode_node_mgr: + db_mgr = database.get_db_mgr(args.NAME, False) + master_fe_endpoint = CLUSTER.get_master_fe_endpoint( + cluster.name) + # Add FEs except master_fe + for fe in cluster.get_all_nodes(CLUSTER.Node.TYPE_FE): + fe_endpoint = f"{fe.get_ip()}:{CLUSTER.FE_EDITLOG_PORT}" + if fe_endpoint != master_fe_endpoint: + try: + db_mgr.add_fe(fe_endpoint) + LOG.info(f"Added FE {fe_endpoint} successfully.") + except Exception as e: + LOG.error( + f"Failed to add FE {fe_endpoint}: {str(e)}") + + # Add BEs + for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE): + be_endpoint = f"{be.get_ip()}:{CLUSTER.BE_HEARTBEAT_PORT}" + try: + db_mgr.add_be(be_endpoint) + LOG.info(f"Added BE {be_endpoint} successfully.") + except Exception as e: + LOG.error(f"Failed to add BE {be_endpoint}: {str(e)}") + + cloud_store_config = self._get_cloud_store_config() + + db_mgr.create_default_storage_vault(cloud_store_config) + if args.wait_timeout != 0: if args.wait_timeout == -1: args.wait_timeout = 1000000000 @@ -509,6 +592,7 @@ def do_add_node(node_type, add_num, add_ids): err += "dead be: " + str(dead_backends) + ". " raise Exception(err) time.sleep(1) + LOG.info( utils.render_green( "Up cluster {} succ, related node num {}".format( @@ -615,6 +699,8 @@ def run(self, args): args.fdb_id, ignore_not_exists=True) + LOG.info("down cluster " + args.NAME + " for all " + str(for_all)) + if for_all: if os.path.exists(cluster.get_compose_file()): try: @@ -700,6 +786,8 @@ def __init__(self): self.tablet_num = "" self.last_heartbeat = "" self.err_msg = "" + self.edit_log_port = 0 + self.heartbeat_port = 0 def info(self, detail): result = [ @@ -711,21 +799,23 @@ def info(self, detail): if detail: query_port = "" http_port = "" + heartbeat_port = "" + edit_log_port = "" node_path = CLUSTER.get_node_path(self.cluster_name, self.node_type, self.id) if self.node_type == CLUSTER.Node.TYPE_FE: query_port = CLUSTER.FE_QUERY_PORT http_port = CLUSTER.FE_HTTP_PORT + edit_log_port = CLUSTER.FE_EDITLOG_PORT elif self.node_type == CLUSTER.Node.TYPE_BE: http_port = CLUSTER.BE_WEBSVR_PORT + heartbeat_port = CLUSTER.BE_HEARTBEAT_PORT elif self.node_type == CLUSTER.Node.TYPE_MS or self.node_type == CLUSTER.Node.TYPE_RECYCLE: http_port = CLUSTER.MS_PORT else: pass result += [ - query_port, - http_port, - node_path, + query_port, http_port, node_path, edit_log_port, heartbeat_port ] return result @@ -738,6 +828,7 @@ def update_db_info(self, db_mgr): self.query_port = fe.query_port self.last_heartbeat = fe.last_heartbeat self.err_msg = fe.err_msg + self.edit_log_port = fe.edit_log_port elif self.node_type == CLUSTER.Node.TYPE_BE: self.backend_id = -1 be = db_mgr.get_be(self.id) @@ -747,6 +838,7 @@ def update_db_info(self, db_mgr): self.tablet_num = be.tablet_num self.last_heartbeat = be.last_heartbeat self.err_msg = be.err_msg + self.heartbeat_port = be.heartbeat_port class GenConfCommand(Command): @@ -903,8 +995,7 @@ def parse_cluster_compose_file(cluster_name): if services is None: return COMPOSE_BAD, {} return COMPOSE_GOOD, { - service: - ComposeService( + service: ComposeService( service, list(service_conf["networks"].values())[0] ["ipv4_address"], service_conf["image"]) @@ -980,6 +1071,8 @@ def parse_cluster_compose_file(cluster_name): "query_port", "http_port", "path", + "edit_log_port", + "heartbeat_port", ] rows = [] @@ -1056,6 +1149,47 @@ def get_node_seq(node): return self._handle_data(header, rows) +class GetCloudIniCommand(Command): + + def add_parser(self, args_parsers): + parser = args_parsers.add_parser("get-cloud-ini", + help="Get cloud.init") + parser.add_argument( + "NAME", + nargs="*", + help= + "Specify multiple clusters, if specific, show all their containers." + ) + self._add_parser_output_json(parser) + + def _handle_data(self, header, datas): + if utils.is_enable_log(): + table = prettytable.PrettyTable( + [utils.render_green(field) for field in header]) + for row in datas: + table.add_row(row) + print(table) + return "" + else: + datas.insert(0, header) + return datas + + def run(self, args): + + header = ["key", "value"] + + rows = [] + + with open(CLUSTER.CLOUD_CFG_FILE, "r") as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + key, value = line.split('=', 1) + rows.append([key.strip(), value.strip()]) + + return self._handle_data(header, rows) + + ALL_COMMANDS = [ UpCommand("up"), DownCommand("down"), @@ -1064,6 +1198,7 @@ def get_node_seq(node): SimpleCommand("restart", "Restart the doris containers. "), SimpleCommand("pause", "Pause the doris containers. "), SimpleCommand("unpause", "Unpause the doris containers. "), + GetCloudIniCommand("get-cloud-ini"), GenConfCommand("config"), ListCommand("ls"), ] diff --git a/docker/runtime/doris-compose/database.py b/docker/runtime/doris-compose/database.py index d29905e94a9766..bbf6fb4fbeb29c 100644 --- a/docker/runtime/doris-compose/database.py +++ b/docker/runtime/doris-compose/database.py @@ -20,6 +20,7 @@ import pymysql import time import utils +import uuid LOG = utils.get_logger() @@ -27,19 +28,20 @@ class FEState(object): def __init__(self, id, query_port, is_master, alive, last_heartbeat, - err_msg): + err_msg, edit_log_port): self.id = id self.query_port = query_port self.is_master = is_master self.alive = alive self.last_heartbeat = last_heartbeat self.err_msg = err_msg + self.edit_log_port = edit_log_port class BEState(object): def __init__(self, id, backend_id, decommissioned, alive, tablet_num, - last_heartbeat, err_msg): + last_heartbeat, err_msg, heartbeat_port): self.id = id self.backend_id = backend_id self.decommissioned = decommissioned @@ -47,6 +49,7 @@ def __init__(self, id, backend_id, decommissioned, alive, tablet_num, self.tablet_num = tablet_num self.last_heartbeat = last_heartbeat self.err_msg = err_msg + self.heartbeat_port = heartbeat_port class DBManager(object): @@ -70,6 +73,15 @@ def load_states(self, query_ports): self._load_fe_states(query_ports) self._load_be_states() + def add_fe(self, fe_endpoint): + try: + sql = f"ALTER SYSTEM ADD FOLLOWER '{fe_endpoint}'" + self._exec_query(sql) + LOG.info(f"Added FE {fe_endpoint} via SQL successfully.") + except Exception as e: + LOG.error(f"Failed to add FE {fe_endpoint} via SQL: {str(e)}") + raise + def drop_fe(self, fe_endpoint): id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")]) try: @@ -85,6 +97,15 @@ def drop_fe(self, fe_endpoint): return raise e + def add_be(self, be_endpoint): + try: + sql = f"ALTER SYSTEM ADD BACKEND '{be_endpoint}'" + self._exec_query(sql) + LOG.info(f"Added BE {be_endpoint} via SQL successfully.") + except Exception as e: + LOG.error(f"Failed to add BE {be_endpoint} via SQL: {str(e)}") + raise + def drop_be(self, be_endpoint): id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")]) try: @@ -140,23 +161,55 @@ def decommission_be(self, be_endpoint): time.sleep(5) + def create_default_storage_vault(self, cloud_store_config): + try: + # Create storage vault + create_vault_sql = f""" + CREATE STORAGE VAULT IF NOT EXISTS default_vault + PROPERTIES ( + "type" = "S3", + "s3.access_key" = "{cloud_store_config['DORIS_CLOUD_AK']}", + "s3.secret_key" = "{cloud_store_config['DORIS_CLOUD_SK']}", + "s3.endpoint" = "{cloud_store_config['DORIS_CLOUD_ENDPOINT']}", + "s3.bucket" = "{cloud_store_config['DORIS_CLOUD_BUCKET']}", + "s3.region" = "{cloud_store_config['DORIS_CLOUD_REGION']}", + "s3.root.path" = "{str(uuid.uuid4())}", + "provider" = "{cloud_store_config['DORIS_CLOUD_PROVIDER']}" + ); + """ + self._exec_query(create_vault_sql) + LOG.info("Created storage vault 'default_vault'") + + # Set as default storage vault + set_default_vault_sql = "SET default_vault as DEFAULT STORAGE VAULT;" + self._exec_query(set_default_vault_sql) + LOG.info("Set 'default_vault' as the default storage vault") + + except Exception as e: + LOG.error(f"Failed to create default storage vault: {str(e)}") + raise + def _load_fe_states(self, query_ports): fe_states = {} alive_master_fe_port = None for record in self._exec_query(''' - select Host, IsMaster, Alive, LastHeartbeat, ErrMsg - from frontends()'''): - ip, is_master, alive, last_heartbeat, err_msg = record + show frontends '''): + # Unpack the record into individual columns + name, ip, edit_log_port, _, query_port, _, _, role, is_master, cluster_id, _, alive, _, _, last_heartbeat, _, err_msg, _, _ = record is_master = utils.is_true(is_master) alive = utils.is_true(alive) id = CLUSTER.Node.get_id_from_ip(ip) query_port = query_ports.get(id, "") last_heartbeat = utils.escape_null(last_heartbeat) fe = FEState(id, query_port, is_master, alive, last_heartbeat, - err_msg) + err_msg, edit_log_port) fe_states[id] = fe if is_master and alive and query_port: alive_master_fe_port = query_port + LOG.info( + "record of show frontends, name {}, ip {}, alive {}, is_master {}, role {}" + .format(name, ip, alive, is_master, role)) + self.fe_states = fe_states if alive_master_fe_port and alive_master_fe_port != self.query_port: self.query_port = alive_master_fe_port @@ -165,17 +218,18 @@ def _load_fe_states(self, query_ports): def _load_be_states(self): be_states = {} for record in self._exec_query(''' - select BackendId, Host, LastHeartbeat, Alive, SystemDecommissioned, TabletNum, ErrMsg + select BackendId, Host, LastHeartbeat, Alive, SystemDecommissioned, TabletNum, ErrMsg, HeartbeatPort from backends()'''): - backend_id, ip, last_heartbeat, alive, decommissioned, tablet_num, err_msg = record + backend_id, ip, last_heartbeat, alive, decommissioned, tablet_num, err_msg, heartbeat_port = record backend_id = int(backend_id) alive = utils.is_true(alive) decommissioned = utils.is_true(decommissioned) tablet_num = int(tablet_num) id = CLUSTER.Node.get_id_from_ip(ip) last_heartbeat = utils.escape_null(last_heartbeat) + heartbeat_port = utils.escape_null(heartbeat_port) be = BEState(id, backend_id, decommissioned, alive, tablet_num, - last_heartbeat, err_msg) + last_heartbeat, err_msg, heartbeat_port) be_states[id] = be self.be_states = be_states diff --git a/docker/runtime/doris-compose/resource/init_be.sh b/docker/runtime/doris-compose/resource/init_be.sh index d9b7953b53430d..e0ed5b92cb8da2 100755 --- a/docker/runtime/doris-compose/resource/init_be.sh +++ b/docker/runtime/doris-compose/resource/init_be.sh @@ -48,6 +48,12 @@ add_cloud_be() { return fi + # Check if SQL_MODE_NODE_MGR is set to 1 + if [ "$SQL_MODE_NODE_MGR" -eq 1 ]; then + health_log "SQL_MODE_NODE_MGR is set to 1, skipping cluster creation" + return + fi + cluster_file_name="${DORIS_HOME}/conf/CLUSTER_NAME" cluster_name=$(cat $cluster_file_name) if [ -z $cluster_name ]; then diff --git a/docker/runtime/doris-compose/resource/init_cloud.sh b/docker/runtime/doris-compose/resource/init_cloud.sh index 78152b5330bd98..5740335ace3f32 100644 --- a/docker/runtime/doris-compose/resource/init_cloud.sh +++ b/docker/runtime/doris-compose/resource/init_cloud.sh @@ -50,6 +50,13 @@ check_init_cloud() { lock_cluster + # Check if SQL_MODE_NODE_MGR is set + if [[ "$SQL_MODE_NODE_MGR" -eq 1 ]]; then + health_log "SQL_MODE_NODE_MGR is set, skipping create_instance" + touch $HAS_CREATE_INSTANCE_FILE + return + fi + output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999" \ -d '{"instance_id":"default_instance_id", "name": "default_instance", diff --git a/docker/runtime/doris-compose/resource/init_fe.sh b/docker/runtime/doris-compose/resource/init_fe.sh index e46a6cac9b2e89..d4aad29b0e5cf8 100755 --- a/docker/runtime/doris-compose/resource/init_fe.sh +++ b/docker/runtime/doris-compose/resource/init_fe.sh @@ -88,6 +88,21 @@ start_cloud_fe() { return fi + # Check if SQL_MODE_NODE_MGR is set to 1 + if [ "${SQL_MODE_NODE_MGR}" = "1" ]; then + health_log "SQL_MODE_NODE_MGR is set to 1. Skipping add FE." + + touch $REGISTER_FILE + + fe_daemon & + bash $DORIS_HOME/bin/start_fe.sh --daemon + + if [ "$MY_ID" == "1" ]; then + echo $MY_IP >$MASTER_FE_IP_FILE + fi + return + fi + wait_create_instance action=add_cluster diff --git a/docker/runtime/fe/resource/fe_disaggregated_entrypoint.sh b/docker/runtime/fe/resource/fe_disaggregated_entrypoint.sh index d9af8ab612ad31..876a3f9ace3009 100755 --- a/docker/runtime/fe/resource/fe_disaggregated_entrypoint.sh +++ b/docker/runtime/fe/resource/fe_disaggregated_entrypoint.sh @@ -202,6 +202,8 @@ add_cluster_info_to_conf check_and_modify_fqdn_config link_config_files variables_inital +check_or_register_in_ms + check_or_register_in_ms /opt/apache-doris/fe/bin/start_fe.sh --console diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 42affa36daf601..968a307c842bba 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1823,7 +1823,7 @@ public class Config extends ConfigBase { public static boolean enable_date_conversion = true; @ConfField(mutable = false, masterOnly = true) - public static boolean enable_multi_tags = false; + public static boolean enable_multi_tags = true; /** * If set to TRUE, FE will convert DecimalV2 to DecimalV3 automatically. @@ -2783,15 +2783,24 @@ public class Config extends ConfigBase { @ConfField public static int warn_sys_accumulated_file_size = 2; @ConfField public static int audit_sys_accumulated_file_size = 4; + @ConfField + public static String deploy_mode = ""; + + // compatibily with elder version. + // cloud_unique_id is introduced before cloud_instance_id, so it has higher priority. @ConfField public static String cloud_unique_id = ""; + // If cloud_unique_id is empty, cloud_instance_id works, otherwise cloud_unique_id works. + @ConfField + public static String cloud_instance_id = ""; + public static boolean isCloudMode() { - return !cloud_unique_id.isEmpty(); + return deploy_mode.equals("cloud") || !cloud_unique_id.isEmpty() || !cloud_instance_id.isEmpty(); } public static boolean isNotCloudMode() { - return cloud_unique_id.isEmpty(); + return !isCloudMode(); } /** @@ -3023,4 +3032,8 @@ public static int metaServiceRpcRetryTimes() { //========================================================================== // end of cloud config //========================================================================== + + @ConfField(description = {"检查资源就绪的周期,单位秒", + "Interval checking if resource is ready"}) + public static long resource_not_ready_sleep_seconds = 5; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 95ad5ae824bebc..51ed811cc06336 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -99,18 +99,18 @@ public class Alter { private AlterHandler schemaChangeHandler; private AlterHandler materializedViewHandler; - private SystemHandler clusterHandler; + private SystemHandler systemHandler; public Alter() { schemaChangeHandler = Config.isCloudMode() ? new CloudSchemaChangeHandler() : new SchemaChangeHandler(); materializedViewHandler = new MaterializedViewHandler(); - clusterHandler = new SystemHandler(); + systemHandler = new SystemHandler(); } public void start() { schemaChangeHandler.start(); materializedViewHandler.start(); - clusterHandler.start(); + systemHandler.start(); } public void processCreateMaterializedView(CreateMaterializedViewStmt stmt) @@ -710,8 +710,8 @@ public void replayModifyViewDef(AlterViewInfo alterViewInfo) throws MetaNotFound } } - public void processAlterCluster(AlterSystemStmt stmt) throws UserException { - clusterHandler.process(Collections.singletonList(stmt.getAlterClause()), null, null); + public void processAlterSystem(AlterSystemStmt stmt) throws UserException { + systemHandler.process(Collections.singletonList(stmt.getAlterClause()), null, null); } private void processRename(Database db, OlapTable table, List alterClauses) throws DdlException { @@ -958,8 +958,8 @@ public AlterHandler getMaterializedViewHandler() { return materializedViewHandler; } - public AlterHandler getClusterHandler() { - return clusterHandler; + public AlterHandler getSystemHandler() { + return systemHandler; } public void processAlterMTMV(AlterMTMV alterMTMV, boolean isReplay) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index 28b3684ed04602..78b3b76ea78d46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -72,7 +72,7 @@ public class SystemHandler extends AlterHandler { private static final Logger LOG = LogManager.getLogger(SystemHandler.class); public SystemHandler() { - super("cluster"); + super("system"); } @Override @@ -156,15 +156,13 @@ public synchronized void process(String rawSql, List alterClauses, // for decommission operation, here is no decommission job. the system handler will check // all backend in decommission state for (Backend backend : decommissionBackends) { - backend.setDecommissioned(true); - Env.getCurrentEnv().getEditLog().logBackendStateChange(backend); - LOG.info("set backend {} to decommission", backend.getId()); + Env.getCurrentSystemInfo().decommissionBackend(backend); } } else if (alterClause instanceof AddObserverClause) { AddObserverClause clause = (AddObserverClause) alterClause; Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER, clause.getHost(), - clause.getPort(), ""); + clause.getPort()); } else if (alterClause instanceof DropObserverClause) { DropObserverClause clause = (DropObserverClause) alterClause; Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER, clause.getHost(), @@ -172,7 +170,7 @@ public synchronized void process(String rawSql, List alterClauses, } else if (alterClause instanceof AddFollowerClause) { AddFollowerClause clause = (AddFollowerClause) alterClause; Env.getCurrentEnv().addFrontend(FrontendNodeType.FOLLOWER, clause.getHost(), - clause.getPort(), ""); + clause.getPort()); } else if (alterClause instanceof DropFollowerClause) { DropFollowerClause clause = (DropFollowerClause) alterClause; Env.getCurrentEnv().dropFrontend(FrontendNodeType.FOLLOWER, clause.getHost(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index eb8b10bbfe4d8b..a4bc55eeeec5ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -423,7 +423,7 @@ public class Env { // node name is used for bdbje NodeName. protected String nodeName; protected FrontendNodeType role; - private FrontendNodeType feType; + protected FrontendNodeType feType; // replica and observer use this value to decide provide read service or not private long synchronizedTimeMs; private MasterInfo masterInfo; @@ -1142,6 +1142,13 @@ public void setHttpReady(boolean httpReady) { this.httpReady.set(httpReady); } + protected boolean isStartFromEmpty() { + File roleFile = new File(this.imageDir, Storage.ROLE_FILE); + File versionFile = new File(this.imageDir, Storage.VERSION_FILE); + + return !roleFile.exists() && !versionFile.exists(); + } + protected void getClusterIdAndRole() throws IOException { File roleFile = new File(this.imageDir, Storage.ROLE_FILE); File versionFile = new File(this.imageDir, Storage.VERSION_FILE); @@ -2982,11 +2989,19 @@ protected void runAfterCatalogReady() { }; } + public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { + addFrontend(role, host, editLogPort, ""); + } + public void addFrontend(FrontendNodeType role, String host, int editLogPort, String nodeName) throws DdlException { if (!tryLock(false)) { throw new DdlException("Failed to acquire env lock. Try again"); } try { + if (Strings.isNullOrEmpty(nodeName)) { + nodeName = genFeNodeName(host, editLogPort, false /* new name style */); + } + Frontend fe = checkFeExist(host, editLogPort); if (fe != null) { throw new DdlException("frontend already exists " + fe); @@ -2995,10 +3010,6 @@ public void addFrontend(FrontendNodeType role, String host, int editLogPort, Str throw new DdlException("frontend's hostName should not be empty while enable_fqdn_mode is true"); } - if (Strings.isNullOrEmpty(nodeName)) { - nodeName = genFeNodeName(host, editLogPort, false /* new name style */); - } - if (removedFrontends.contains(nodeName)) { throw new DdlException("frontend name already exists " + nodeName + ". Try again"); } @@ -3031,7 +3042,7 @@ public void modifyFrontendHostName(String srcHost, int srcPort, String destHost) modifyFrontendHost(fe.getNodeName(), destHost); } - public void modifyFrontendHost(String nodeName, String destHost) throws DdlException { + private void modifyFrontendHost(String nodeName, String destHost) throws DdlException { if (!tryLock(false)) { throw new DdlException("Failed to acquire env lock. Try again"); } @@ -3059,6 +3070,11 @@ public void modifyFrontendHost(String nodeName, String destHost) throws DdlExcep } public void dropFrontend(FrontendNodeType role, String host, int port) throws DdlException { + dropFrontendFromBDBJE(role, host, port); + } + + public void dropFrontendFromBDBJE(FrontendNodeType role, String host, int port) throws DdlException { + if (port == selfNode.getPort() && feType == FrontendNodeType.MASTER && selfNode.getHost().equals(host)) { throw new DdlException("can not drop current master node."); @@ -4308,8 +4324,8 @@ public CooldownConfHandler getCooldownConfHandler() { return cooldownConfHandler; } - public SystemHandler getClusterHandler() { - return (SystemHandler) this.alter.getClusterHandler(); + public SystemHandler getSystemHandler() { + return (SystemHandler) this.alter.getSystemHandler(); } public BackupHandler getBackupHandler() { @@ -5430,15 +5446,15 @@ public void replayModifyTableDefaultDistributionBucketNum(ModifyTableDefaultDist } /* - * used for handling AlterClusterStmt - * (for client is the ALTER CLUSTER command). + * used for handling AlterSystemStmt + * (for client is the ALTER SYSTEM command). */ - public void alterCluster(AlterSystemStmt stmt) throws DdlException, UserException { - this.alter.processAlterCluster(stmt); + public void alterSystem(AlterSystemStmt stmt) throws DdlException, UserException { + this.alter.processAlterSystem(stmt); } - public void cancelAlterCluster(CancelAlterSystemStmt stmt) throws DdlException { - this.alter.getClusterHandler().cancel(stmt); + public void cancelAlterSystem(CancelAlterSystemStmt stmt) throws DdlException { + this.alter.getSystemHandler().cancel(stmt); } // Switch catalog of this session diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index 87e8a0fc3b0ce8..cd33ee9980cc68 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -30,7 +30,6 @@ import org.apache.doris.analysis.PartitionDesc; import org.apache.doris.analysis.RangePartitionDesc; import org.apache.doris.analysis.TableName; -import org.apache.doris.cloud.catalog.CloudEnv; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; @@ -55,8 +54,6 @@ public class InternalSchemaInitializer extends Thread { - public static final int TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS = 5; - private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class); public InternalSchemaInitializer() { @@ -72,17 +69,17 @@ public void run() { FrontendNodeType feType = Env.getCurrentEnv().getFeType(); if (feType.equals(FrontendNodeType.INIT) || feType.equals(FrontendNodeType.UNKNOWN)) { LOG.warn("FE is not ready"); - Thread.sleep(5000); + Thread.sleep(Config.resource_not_ready_sleep_seconds * 1000); continue; } Thread.currentThread() - .join(TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS * 1000L); + .join(Config.resource_not_ready_sleep_seconds * 1000L); createDb(); createTbl(); } catch (Throwable e) { LOG.warn("Statistics storage initiated failed, will try again later", e); try { - Thread.sleep(5000); + Thread.sleep(Config.resource_not_ready_sleep_seconds * 1000); } catch (InterruptedException ex) { LOG.info("Sleep interrupted. {}", ex.getMessage()); } @@ -154,7 +151,7 @@ public static void modifyTblReplicaCount(Database database, String tblName) { } } try { - Thread.sleep(5000); + Thread.sleep(Config.resource_not_ready_sleep_seconds * 1000); } catch (InterruptedException t) { // IGNORE } @@ -200,10 +197,6 @@ private static CreateTableStmt buildStatisticsTblStmt(String statsTableName, Lis } }; - if (Config.isCloudMode() && ((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) { - properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, FeConstants.BUILT_IN_STORAGE_VAULT_NAME); - } - PropertyAnalyzer.getInstance().rewriteForceProperties(properties); CreateTableStmt createTableStmt = new CreateTableStmt(true, false, tableName, InternalSchema.getCopiedSchema(statsTableName), @@ -238,10 +231,6 @@ private static CreateTableStmt buildAuditTblStmt() throws UserException { } }; - if (Config.isCloudMode() && ((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) { - properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, FeConstants.BUILT_IN_STORAGE_VAULT_NAME); - } - PropertyAnalyzer.getInstance().rewriteForceProperties(properties); CreateTableStmt createTableStmt = new CreateTableStmt(true, false, tableName, InternalSchema.getCopiedSchema(AuditLoaderPlugin.AUDIT_LOG_TABLE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java index 1ebda4dc8aa037..e212a7f948ecba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java @@ -38,11 +38,13 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.CountingDataOutputStream; +import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -89,6 +91,21 @@ public CloudUpgradeMgr getCloudUpgradeMgr() { return this.upgradeMgr; } + @Override + public void initialize(String[] args) throws Exception { + if (Strings.isNullOrEmpty(Config.cloud_unique_id)) { + if (Strings.isNullOrEmpty(Config.cloud_instance_id)) { + throw new UserException("cloud_instance_id must be specified if deployed in dissaggregated"); + } + LOG.info("cloud_unique_id is not set, setting it using instance_id"); + Config.cloud_unique_id = "1:" + Config.cloud_instance_id + ":sql_server00"; + } + + LOG.info("Initializing CloudEnv with cloud_unique_id: {}", Config.cloud_unique_id); + + super.initialize(args); + } + @Override protected void startMasterOnlyDaemonThreads() { LOG.info("start cloud Master only daemon threads"); @@ -117,9 +134,13 @@ public CacheHotspotManager getCacheHotspotMgr() { return cacheHotspotMgr; } + private CloudSystemInfoService getCloudSystemInfoService() { + return (CloudSystemInfoService) systemInfo; + } + private Cloud.NodeInfoPB getLocalTypeFromMetaService() { // get helperNodes from ms - Cloud.GetClusterResponse response = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + Cloud.GetClusterResponse response = getCloudSystemInfoService() .getCloudCluster(Config.cloud_sql_server_cluster_name, Config.cloud_sql_server_cluster_id, ""); if (!response.hasStatus() || !response.getStatus().hasCode() || response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { @@ -166,6 +187,22 @@ private Cloud.NodeInfoPB getLocalTypeFromMetaService() { return local.orElse(null); } + private void tryAddMyselToMS() { + try { + try { + if (Strings.isNullOrEmpty(Config.cloud_instance_id)) { + throw new DdlException("unable to create instance due to empty cloud_instance_id"); + } + getCloudSystemInfoService().tryCreateInstance(Config.cloud_instance_id, + Config.cloud_instance_id, false); + } catch (Exception e) { + return; + } + addFrontend(FrontendNodeType.MASTER, selfNode.getHost(), selfNode.getPort()); + } catch (DdlException e) { + LOG.warn("get ddl exception ", e); + } + } protected void getClusterIdAndRole() throws IOException { NodeInfoPB.NodeType type = NodeInfoPB.NodeType.UNKNOWN; @@ -175,14 +212,19 @@ protected void getClusterIdAndRole() throws IOException { try { nodeInfoPB = getLocalTypeFromMetaService(); } catch (Exception e) { - LOG.warn("failed to get local fe's type, sleep 5 s, try again. exception: {}", e.getMessage()); + LOG.warn("failed to get local fe's type, sleep {} s, try again. exception: {}", + Config.resource_not_ready_sleep_seconds, e.getMessage()); } if (nodeInfoPB == null) { - LOG.warn("failed to get local fe's type, sleep 5 s, try again."); + LOG.warn("failed to get local fe's type, sleep {} s, try again.", + Config.resource_not_ready_sleep_seconds); + if (isStartFromEmpty()) { + tryAddMyselToMS(); + } try { - Thread.sleep(5000); + Thread.sleep(Config.resource_not_ready_sleep_seconds * 1000); } catch (InterruptedException e) { - LOG.warn("thread sleep Exception", e); + LOG.info("interrupted by {}", e); } continue; } @@ -221,7 +263,7 @@ public void checkCloudClusterPriv(String clusterName) throws DdlException { + "' for cloud cluster '" + clusterName + "'", ErrorCode.ERR_CLUSTER_NO_PERMISSIONS); } - if (!((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterNames().contains(clusterName)) { + if (!getCloudSystemInfoService().getCloudClusterNames().contains(clusterName)) { if (LOG.isDebugEnabled()) { LOG.debug("current instance does not have a cluster name :{}", clusterName); } @@ -232,9 +274,9 @@ public void checkCloudClusterPriv(String clusterName) throws DdlException { public void changeCloudCluster(String clusterName, ConnectContext ctx) throws DdlException { checkCloudClusterPriv(clusterName); - ((CloudSystemInfoService) Env.getCurrentSystemInfo()).waitForAutoStart(clusterName); + getCloudSystemInfoService().waitForAutoStart(clusterName); try { - ((CloudSystemInfoService) Env.getCurrentSystemInfo()).addCloudCluster(clusterName, ""); + getCloudSystemInfoService().addCloudCluster(clusterName, ""); } catch (UserException e) { throw new DdlException(e.getMessage(), e.getMysqlErrorCode()); } @@ -344,4 +386,24 @@ public long saveCloudWarmUpJob(CountingDataOutputStream dos, long checksum) thro public void cancelCloudWarmUp(CancelCloudWarmUpStmt stmt) throws DdlException { getCacheHotspotMgr().cancel(stmt); } + + @Override + public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { + getCloudSystemInfoService().addFrontend(role, host, editLogPort); + } + + @Override + public void dropFrontend(FrontendNodeType role, String host, int port) throws DdlException { + if (port == selfNode.getPort() && feType == FrontendNodeType.MASTER + && selfNode.getHost().equals(host)) { + throw new DdlException("can not drop current master node."); + } + + getCloudSystemInfoService().dropFrontend(role, host, port); + } + + @Override + public void modifyFrontendHostName(String srcHost, int srcPort, String destHost) throws DdlException { + throw new DdlException("modify frontend host name is not supported in cloud mode"); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index c9c799687abb61..47108b147f5bc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -403,4 +403,9 @@ public Cloud. ResetRLProgressResponse resetRLProgress(Cloud. ResetRLProgressRequ } return blockingStub.finishTabletJob(request); } + + public Cloud.CreateInstanceResponse + createInstance(Cloud.CreateInstanceRequest request) { + return blockingStub.createInstance(request); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index 33a4d81feaaa54..918f0c34189d94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -106,6 +106,7 @@ private MetaServiceClient getProxy() { } String address = Config.meta_service_endpoint; + address = address.replaceAll("^[\"']|[\"']$", ""); MetaServiceClient service = serviceMap.get(address); if (service != null && service.isNormalState() && !service.isConnectionAgeExpired()) { return service; @@ -547,4 +548,13 @@ public Cloud.ResetRLProgressResponse resetRLProgress(Cloud.ResetRLProgressReques throw new RpcException("", e.getMessage(), e); } } + + public Cloud.CreateInstanceResponse createInstance(Cloud.CreateInstanceRequest request) throws RpcException { + try { + final MetaServiceClient client = getProxy(); + return client.createInstance(request); + } catch (Exception e) { + throw new RpcException("", e.getMessage(), e); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index 03cbbfe814a8b6..606f52369e5f7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -29,7 +29,9 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.common.RandomIdentifierGenerator; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.NetUtils; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.metric.MetricRepo; import org.apache.doris.qe.ConnectContext; @@ -287,7 +289,7 @@ public synchronized void updateFrontends(List toAdd, List to continue; } try { - Env.getCurrentEnv().dropFrontend(fe.getRole(), fe.getHost(), fe.getEditLogPort()); + Env.getCurrentEnv().dropFrontendFromBDBJE(fe.getRole(), fe.getHost(), fe.getEditLogPort()); LOG.info("dropped cloud frontend={} ", fe); } catch (DdlException e) { LOG.warn("failed to drop cloud frontend={} ", fe); @@ -308,6 +310,103 @@ public synchronized void updateFrontends(List toAdd, List to } } + private void alterBackendCluster(List hostInfos, String clusterId, + Cloud.AlterClusterRequest.Operation operation) throws DdlException { + if (Strings.isNullOrEmpty(Config.cloud_instance_id)) { + throw new DdlException("unable to alter backends due to empty cloud_instance_id"); + } + // Issue rpc to meta to alter node, then fe master would add this node to its frontends + Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder() + .setClusterId(clusterId) + .setType(Cloud.ClusterPB.Type.COMPUTE) + .build(); + + for (HostInfo hostInfo : hostInfos) { + Cloud.NodeInfoPB nodeInfoPB = Cloud.NodeInfoPB.newBuilder() + .setIp(hostInfo.getHost()) + .setHost(hostInfo.getHost()) + .setHeartbeatPort(hostInfo.getPort()) + .setCtime(System.currentTimeMillis() / 1000) + .build(); + clusterPB = clusterPB.toBuilder().addNodes(nodeInfoPB).build(); + } + + Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder() + .setInstanceId(Config.cloud_instance_id) + .setOp(operation) + .setCluster(clusterPB) + .build(); + + Cloud.AlterClusterResponse response; + try { + response = MetaServiceProxy.getInstance().alterCluster(request); + LOG.info("alter cluster, request: {}, response: {}", request, response); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("alter backends not ok, response: {}", response); + throw new DdlException("failed to alter backends errorCode: " + response.getStatus().getCode() + + " msg: " + response.getStatus().getMsg()); + } + } catch (RpcException e) { + throw new DdlException("failed to alter backends", e); + } + } + + /** + * @param hostInfos : backend's ip, hostName and port + * @throws DdlException + */ + @Override + public void addBackends(List hostInfos, Map tagMap) throws UserException { + // issue rpc to meta to add this node, then fe master would add this node to its backends + + String clusterName = tagMap.getOrDefault(Tag.CLOUD_CLUSTER_NAME, Tag.VALUE_DEFAULT_CLOUD_CLUSTER_NAME); + if (clusterName.isEmpty()) { + throw new UserException("clusterName empty"); + } + + String clusterId = tryCreateCluster(clusterName, RandomIdentifierGenerator.generateRandomIdentifier(8)); + alterBackendCluster(hostInfos, clusterId, Cloud.AlterClusterRequest.Operation.ADD_NODE); + } + + // final entry of dropping backend + // We can not do it in batch, because be may belong to different clusters. + // Maybe we can opt it in the future. + @Override + public void dropBackend(String host, int heartbeatPort) throws DdlException { + Backend droppedBackend = getBackendWithHeartbeatPort(host, heartbeatPort); + if (droppedBackend == null) { + throw new DdlException("backend does not exists[" + NetUtils + .getHostPortInAccessibleFormat(host, heartbeatPort) + "]"); + } + String clusterId = droppedBackend.getTagMap().get(Tag.CLOUD_CLUSTER_ID); + if (clusterId == null || clusterId.isEmpty()) { + throw new DdlException("Failed to get cluster ID for backend: " + droppedBackend.getId()); + } + + List hostInfos = new ArrayList<>(); + hostInfos.add(new HostInfo(host, heartbeatPort)); + + alterBackendCluster(hostInfos, clusterId, Cloud.AlterClusterRequest.Operation.DROP_NODE); + } + + @Override + public void decommissionBackend(Backend backend) throws UserException { + String clusterId = backend.getTagMap().get(Tag.CLOUD_CLUSTER_ID); + if (clusterId == null || clusterId.isEmpty()) { + throw new UserException("Failed to get cluster ID for backend: " + backend.getId()); + } + + List hostInfos = new ArrayList<>(); + hostInfos.add(new HostInfo(backend.getHost(), backend.getHeartbeatPort())); + try { + alterBackendCluster(hostInfos, clusterId, Cloud.AlterClusterRequest.Operation.DECOMMISSION_NODE); + } catch (DdlException e) { + String errorMessage = e.getMessage(); + LOG.warn("Failed to decommission backend: {}", errorMessage); + throw new UserException(errorMessage); + } + } + @Override public void replayAddBackend(Backend newBackend) { super.replayAddBackend(newBackend); @@ -663,6 +762,115 @@ public Map getCloudClusterNameToId() { } } + // FrontendCluster = SqlServerCluster + private void alterFrontendCluster(FrontendNodeType role, String host, int editLogPort, + Cloud.AlterClusterRequest.Operation op) throws DdlException { + if (Strings.isNullOrEmpty(Config.cloud_instance_id)) { + throw new DdlException("unable to alter frontend due to empty cloud_instance_id"); + } + + // Issue rpc to meta to add this node, then fe master would add this node to its frontends + Cloud.NodeInfoPB nodeInfoPB = Cloud.NodeInfoPB.newBuilder() + .setIp(host) + .setHost(host) + .setEditLogPort(editLogPort) + .setNodeType(role == FrontendNodeType.MASTER ? Cloud.NodeInfoPB.NodeType.FE_MASTER + : Cloud.NodeInfoPB.NodeType.FE_OBSERVER) + .setCtime(System.currentTimeMillis() / 1000) + .build(); + + Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder() + .setClusterId(Config.cloud_sql_server_cluster_id) + .setClusterName(Config.cloud_sql_server_cluster_name) + .setType(Cloud.ClusterPB.Type.SQL) + .addNodes(nodeInfoPB) + .build(); + + Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder() + .setInstanceId(Config.cloud_instance_id) + .setOp(op) + .setCluster(clusterPB) + .build(); + + Cloud.AlterClusterResponse response; + try { + response = MetaServiceProxy.getInstance().alterCluster(request); + LOG.info("alter cluster, request: {}, response: {}", request, response); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("alter frontend not ok, response: {}", response); + throw new DdlException("failed to alter frontend errorCode: " + response.getStatus().getCode() + + " msg: " + response.getStatus().getMsg()); + } + } catch (RpcException e) { + throw new DdlException("failed to alter frontend", e); + } + } + + public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { + Cloud.AlterClusterRequest.Operation op; + op = role == FrontendNodeType.MASTER ? Cloud.AlterClusterRequest.Operation.ADD_CLUSTER + : Cloud.AlterClusterRequest.Operation.ADD_NODE; + alterFrontendCluster(role, host, editLogPort, op); + } + + public void dropFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { + alterFrontendCluster(role, host, editLogPort, Cloud.AlterClusterRequest.Operation.DROP_NODE); + } + + private String tryCreateCluster(String clusterName, String clusterId) throws UserException { + if (Strings.isNullOrEmpty(Config.cloud_instance_id)) { + throw new DdlException("unable to create cluster due to empty cloud_instance_id"); + } + + Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder() + .setClusterId(clusterId) + .setClusterName(clusterName) + .setType(Cloud.ClusterPB.Type.COMPUTE) + .build(); + + Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder() + .setCloudUniqueId(Config.cloud_unique_id) + .setOp(Cloud.AlterClusterRequest.Operation.ADD_CLUSTER) + .setCluster(clusterPB) + .build(); + + Cloud.AlterClusterResponse response; + try { + response = MetaServiceProxy.getInstance().alterCluster(request); + LOG.info("alter cluster, request: {}, response: {}", request, response); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK + && response.getStatus().getCode() != Cloud.MetaServiceCode.ALREADY_EXISTED) { + LOG.warn("create cluster not ok, response: {}", response); + throw new UserException("failed to create cluster errorCode: " + response.getStatus().getCode() + + " msg: " + response.getStatus().getMsg()); + } + + if (response.getStatus().getCode() == Cloud.MetaServiceCode.OK) { + return clusterId; + } else if (response.getStatus().getCode() == Cloud.MetaServiceCode.ALREADY_EXISTED) { + Cloud.GetClusterResponse clusterResponse = getCloudCluster(clusterName, "", ""); + if (clusterResponse.getStatus().getCode() == Cloud.MetaServiceCode.OK) { + if (clusterResponse.getClusterCount() > 0) { + Cloud.ClusterPB cluster = clusterResponse.getCluster(0); + return cluster.getClusterId(); + } else { + throw new UserException("Cluster information not found in the response"); + } + } else { + throw new UserException("Failed to get cluster. Error code: " + + clusterResponse.getStatus().getCode() + ", message: " + + clusterResponse.getStatus().getMsg()); + } + } else { + throw new UserException("Failed to create or get cluster. Error code: " + + response.getStatus().getCode() + ", message: " + response.getStatus().getMsg()); + } + } catch (RpcException e) { + throw new UserException("failed to create cluster", e); + } + + } + public List> getCurrentObFrontends() { List frontends = Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER); List> frontendsPair = new ArrayList<>(); @@ -778,7 +986,9 @@ public String waitForAutoStart(String clusterName) throws DdlException { Cloud.AlterClusterResponse response; try { - response = MetaServiceProxy.getInstance().alterCluster(builder.build()); + Cloud.AlterClusterRequest request = builder.build(); + response = MetaServiceProxy.getInstance().alterCluster(request); + LOG.info("alter cluster, request: {}, response: {}", request, response); if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { LOG.warn("notify to resume cluster not ok, cluster {}, response: {}", clusterName, response); } @@ -829,4 +1039,28 @@ public String waitForAutoStart(String clusterName) throws DdlException { } return clusterName; } + + public void tryCreateInstance(String instanceId, String name, boolean sseEnabled) throws DdlException { + Cloud.CreateInstanceRequest.Builder builder = Cloud.CreateInstanceRequest.newBuilder(); + builder.setInstanceId(instanceId); + builder.setName(name); + builder.setSseEnabled(sseEnabled); + + Cloud.CreateInstanceResponse response; + try { + + Cloud.CreateInstanceRequest request = builder.build(); + response = MetaServiceProxy.getInstance().createInstance(request); + LOG.info("create instance, request: {}, response: {}", request, response); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK + && response.getStatus().getCode() != Cloud.MetaServiceCode.ALREADY_EXISTED) { + LOG.warn("Failed to create instance {}, response: {}", instanceId, response); + throw new DdlException("Failed to create instance"); + } + LOG.info("Successfully created instance {}, response: {}", instanceId, response); + } catch (RpcException e) { + LOG.warn("Failed to create instance {}", instanceId, e); + throw new DdlException("Failed to create instance"); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/RandomIdentifierGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/common/RandomIdentifierGenerator.java new file mode 100644 index 00000000000000..38fee894eb3d99 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/RandomIdentifierGenerator.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import java.security.SecureRandom; + +public class RandomIdentifierGenerator { + + private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + private static final String ALPHANUMERIC = ALPHABET + "0123456789_"; + private static final SecureRandom RANDOM = new SecureRandom(); + + /** + * Generates a random identifier matching the pattern "^[a-zA-Z][a-zA-Z0-9_]*$". + * + * @param length The desired length of the identifier (must be at least 1) + * @return A randomly generated identifier + */ + public static String generateRandomIdentifier(int length) { + if (length < 1) { + throw new IllegalArgumentException("Length must be at least 1"); + } + + StringBuilder sb = new StringBuilder(length); + + // First character must be a letter + sb.append(ALPHABET.charAt(RANDOM.nextInt(ALPHABET.length()))); + + // Remaining characters can be alphanumeric or underscore + for (int i = 1; i < length; i++) { + sb.append(ALPHANUMERIC.charAt(RANDOM.nextInt(ALPHANUMERIC.length()))); + } + + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 760ee10542a646..27fd06cd6cfd2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -1235,7 +1235,7 @@ public static Map analyzeBackendTagsProperties(Map RESERVED_TAG_TYPE = ImmutableSet.of( diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 0f5c81b1cf0692..f01c1a13d1bfbf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -480,10 +480,13 @@ public static boolean statsTblAvailable() { // dbName, // StatisticConstants.HISTOGRAM_TBL_NAME)); } catch (Throwable t) { + LOG.info("stat table does not exist, db_name: {}, table_name: {}", dbName, + StatisticConstants.TABLE_STATISTIC_TBL_NAME); return false; } if (Config.isCloudMode()) { if (!((CloudSystemInfoService) Env.getCurrentSystemInfo()).availableBackendsExists()) { + LOG.info("there are no available backends"); return false; } try (AutoCloseConnectContext r = buildConnectContext()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 515db76b096d08..5dd8dd9fca1ca0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -95,6 +95,14 @@ public void setMaster(int clusterId, String token, long epoch) { tMasterInfo.setHttpPort(Config.http_port); long flags = heartbeatFlags.getHeartbeatFlags(); tMasterInfo.setHeartbeatFlags(flags); + if (Config.isCloudMode()) { + // Set cloud_instance_id and meta_service_endpoint even if there are empty + // Be can knowns that fe is working in cloud mode. + // Set the cloud instance ID for cloud deployment identification + tMasterInfo.setCloudInstanceId(Config.cloud_instance_id); + // Set the endpoint for the metadata service in cloud mode + tMasterInfo.setMetaServiceEndpoint(Config.meta_service_endpoint); + } masterInfo.set(tMasterInfo); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index f81d8b4d7b02b6..6126cc7a7021b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -290,6 +290,15 @@ public void dropBackend(String host, int heartbeatPort) throws DdlException { MetricRepo.generateBackendsTabletMetrics(); } + public void decommissionBackend(Backend backend) throws UserException { + // set backend's state as 'decommissioned' + // for decommission operation, here is no decommission job. the system handler will check + // all backend in decommission state + backend.setDecommissioned(true); + Env.getCurrentEnv().getEditLog().logBackendStateChange(backend); + LOG.info("set backend {} to decommission", backend.getId()); + } + // only for test public void dropAllBackend() { // update idToBackend diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java index 81e29d3abf385d..6901d3b604c9b5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java @@ -144,7 +144,7 @@ public void testDecommissionBackend() throws Exception { + ":" + backend.getHeartbeatPort() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(decommissionStmtStr, connectContext); - Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionStmt); Assert.assertEquals(true, backend.isDecommissioned()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index fc80ecbd97dd9d..53355f6faa4929 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -100,7 +100,7 @@ public void testDecommissionBackend() throws Exception { Assertions.assertNotNull(srcBackend); String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); - Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionStmt); Assertions.assertTrue(srcBackend.isDecommissioned()); long startTimestamp = System.currentTimeMillis(); @@ -153,7 +153,7 @@ public void testDecommissionBackendById() throws Exception { // decommission backend by id String decommissionByIdStmtStr = "alter system decommission backend \"" + srcBackend.getId() + "\""; AlterSystemStmt decommissionByIdStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionByIdStmtStr); - Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionByIdStmt); + Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionByIdStmt); Assertions.assertTrue(srcBackend.isDecommissioned()); long startTimestamp = System.currentTimeMillis(); @@ -215,7 +215,7 @@ public void testDecommissionBackendWithDropTable() throws Exception { // 6. execute decommission String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); - Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionStmt); Assertions.assertTrue(srcBackend.isDecommissioned()); long startTimestamp = System.currentTimeMillis(); @@ -315,7 +315,7 @@ public void testDecommissionBackendWithMTMV() throws Exception { String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); - Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionStmt); Assertions.assertTrue(srcBackend.isDecommissioned()); long startTimestamp = System.currentTimeMillis(); diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index 4daea779735f58..7c94bc9ce0c871 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -39,6 +39,8 @@ struct TMasterInfo { 7: optional i64 heartbeat_flags 8: optional i64 backend_id 9: optional list frontend_infos + 10: optional string meta_service_endpoint; + 11: optional string cloud_instance_id; } struct TBackendInfo { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 9272c818199021..96020f1a35bb09 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -303,6 +303,7 @@ class Suite implements GroovyInterceptable { Thread.sleep(1000) } + logger.info("get fe {}", fe) assertNotNull(fe) if (!dockerIsCloud) { for (def be : cluster.getAllBackends()) { @@ -319,8 +320,8 @@ class Suite implements GroovyInterceptable { def sql = "CREATE DATABASE IF NOT EXISTS " + context.dbName logger.info("try create database if not exists {}", context.dbName) JdbcUtils.executeToList(conn, sql) - url = Config.buildUrlWithDb(url, context.dbName) + url = Config.buildUrlWithDb(url, context.dbName) logger.info("connect to docker cluster: suite={}, url={}", name, url) connect(user, password, url, actionSupplier) } finally { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 44220500d1b5bd..fad99702068a12 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -17,8 +17,9 @@ package org.apache.doris.regression.suite import org.apache.doris.regression.Config -import org.apache.doris.regression.util.Http import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.Http +import org.apache.doris.regression.util.JdbcUtils import org.apache.doris.regression.util.NodeType import com.google.common.collect.Maps @@ -29,12 +30,19 @@ import groovy.json.JsonSlurper import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import java.util.stream.Collectors +import java.sql.Connection class ClusterOptions { int feNum = 1 int beNum = 3 + Boolean sqlModeNodeMgr = false + Boolean beMetaServiceEndpoint = true + Boolean beCloudInstanceId = false + + int waitTimeout = 180 + List feConfigs = [ 'heartbeat_interval_second=5', ] @@ -150,6 +158,7 @@ class ServerNode { class Frontend extends ServerNode { + int editLogPort int queryPort boolean isMaster @@ -157,6 +166,7 @@ class Frontend extends ServerNode { Frontend fe = new Frontend() ServerNode.fromCompose(fe, header, index, fields) fe.queryPort = (Integer) fields.get(header.indexOf('query_port')) + fe.editLogPort = (Integer) fields.get(header.indexOf('edit_log_port')) fe.isMaster = fields.get(header.indexOf('is_master')) == 'true' return fe } @@ -177,14 +187,17 @@ class Frontend extends ServerNode { class Backend extends ServerNode { + int heartbeatPort long backendId int tabletNum static Backend fromCompose(ListHeader header, int index, List fields) { Backend be = new Backend() ServerNode.fromCompose(be, header, index, fields) + be.heartbeatPort = (Integer) fields.get(header.indexOf('heartbeat_port')) be.backendId = toLongOrDefault(fields.get(header.indexOf('backend_id')), -1L) be.tabletNum = (int) toLongOrDefault(fields.get(header.indexOf('tablet_num')), 0L) + return be } @@ -255,6 +268,7 @@ class SuiteCluster { final String name final Config config private boolean running + private boolean sqlModeNodeMgr = false; SuiteCluster(String name, Config config) { this.name = name @@ -301,7 +315,19 @@ class SuiteCluster { cmd += ['--fe-follower'] } - cmd += ['--wait-timeout', String.valueOf(180)] + if (options.sqlModeNodeMgr) { + cmd += ['--sql-mode-node-mgr'] + } + if (!options.beMetaServiceEndpoint) { + cmd += ['--no-be-metaservice-endpoint'] + } + if (!options.beCloudInstanceId) { + cmd += ['--no-be-cloud-instanceid'] + } + + cmd += ['--wait-timeout', String.valueOf(options.waitTimeout)] + + sqlModeNodeMgr = options.sqlModeNodeMgr; runCmd(cmd.join(' '), -1) @@ -405,6 +431,7 @@ class SuiteCluster { def data = runCmd(cmd) assert data instanceof List def rows = (List>) data + logger.info("get all nodes {}", rows); def header = new ListHeader(rows.get(0)) for (int i = 1; i < rows.size(); i++) { def row = (List) rows.get(i) diff --git a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy new file mode 100644 index 00000000000000..c79219aeac2bb7 --- /dev/null +++ b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy @@ -0,0 +1,388 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import groovy.json.JsonSlurper + +suite('test_sql_mode_node_mgr', 'docker,p1') { + if (!isCloudMode()) { + return; + } + + def clusterOptions = [ + new ClusterOptions(), + new ClusterOptions(), + ] + + for (options in clusterOptions) { + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + ] + options.cloudMode = true + options.sqlModeNodeMgr = true + options.waitTimeout = 0 + options.feNum = 3 + options.feConfigs += ["resource_not_ready_sleep_seconds=1", + "heartbeat_interval_second=1",] + } + + clusterOptions[0].beCloudInstanceId = true; + clusterOptions[0].beMetaServiceEndpoint = true; + + clusterOptions[1].beCloudInstanceId = false; + clusterOptions[1].beMetaServiceEndpoint = false; + + for (options in clusterOptions) { + docker(options) { + logger.info("docker started"); + + def checkFrontendsAndBackends = { + // Check frontends + def frontendResult = sql_return_maparray """show frontends;""" + logger.info("show frontends result {}", frontendResult) + // Check that we have the expected number of frontends + assert frontendResult.size() == 3, "Expected 3 frontends, but got ${frontendResult.size()}" + + // Check that all required columns are present + def requiredColumns = ['Name', 'Host', 'EditLogPort', 'HttpPort', 'QueryPort', 'RpcPort', 'Role', 'IsMaster', 'ClusterId', 'Join', 'Alive', 'ReplayedJournalId', 'LastHeartbeat', 'IsHelper', 'ErrMsg'] + def actualColumns = frontendResult[0].keySet() + assert actualColumns.containsAll(requiredColumns), "Missing required columns. Expected: ${requiredColumns}, Actual: ${actualColumns}" + + // Check that we have one master and two followers + def masterCount = frontendResult.count { it['IsMaster'] == 'true' } + assert masterCount == 1, "Expected 1 master, but got ${masterCount}" + + def followerCount = frontendResult.count { it['IsMaster'] == 'false' } + assert followerCount == 2, "Expected 2 followers, but got ${followerCount}" + + // Check that all frontends are alive + def aliveCount = frontendResult.count { it['Alive'] == 'true' } + assert aliveCount == 3, "Expected all 3 frontends to be alive, but only ${aliveCount} are alive" + + // Check backends + def backendResult = sql_return_maparray """show backends;""" + logger.info("show backends result {}", backendResult) + // Check that we have the expected number of backends + assert backendResult.size() == 3, "Expected 3 backends, but got ${backendResult.size()}" + + // Check that all required columns are present + def requiredBackendColumns = ['Host', 'HeartbeatPort', 'BePort', 'HttpPort', 'BrpcPort', 'LastStartTime', 'LastHeartbeat', 'Alive', 'SystemDecommissioned', 'TabletNum', 'DataUsedCapacity', 'AvailCapacity', 'TotalCapacity', 'UsedPct', 'MaxDiskUsedPct', 'Tag', 'ErrMsg', 'Version', 'Status'] + def actualBackendColumns = backendResult[0].keySet() + assert actualBackendColumns.containsAll(requiredBackendColumns), "Missing required backend columns. Expected: ${requiredBackendColumns}, Actual: ${actualBackendColumns}" + + // Check that all backends are alive + def aliveBackendCount = backendResult.count { it['Alive'] == 'true' } + assert aliveBackendCount == 3, "Expected all 3 backends to be alive, but only ${aliveBackendCount} are alive" + + // Check that no backends are decommissioned + def decommissionedCount = backendResult.count { it['SystemDecommissioned'] == 'true' || it['ClusterDecommissioned'] == 'true' } + assert decommissionedCount == 0, "Expected no decommissioned backends, but found ${decommissionedCount}" + + // Check that all backends have valid capacities + backendResult.each { backend -> + assert backend['DataUsedCapacity'] != null && backend['AvailCapacity'] != null && backend['TotalCapacity'] != null, "Backend ${backend['BackendId']} has invalid capacity values" + assert backend['UsedPct'] != null && backend['MaxDiskUsedPct'] != null, "Backend ${backend['BackendId']} has invalid disk usage percentages" + } + + logger.info("All backend checks passed successfully") + } + + // Call the function to check frontends and backends + checkFrontendsAndBackends() + + def checkClusterStatus = { int expectedFeNum, int expectedBeNum, int existsRows -> + logger.info("Checking cluster status...") + + // Check FE number + def frontendResult = sql_return_maparray """SHOW FRONTENDS;""" + assert frontendResult.size() == expectedFeNum, "Expected ${expectedFeNum} frontends, but got ${frontendResult.size()}" + logger.info("FE number check passed: ${frontendResult.size()} FEs found") + + // Check BE number + def backendResult = sql_return_maparray """SHOW BACKENDS;""" + assert backendResult.size() == expectedBeNum, "Expected ${expectedBeNum} backends, but got ${backendResult.size()}" + logger.info("BE number check passed: ${backendResult.size()} BEs found") + + // Create table if not exists + sql """ + CREATE TABLE IF NOT EXISTS test_table ( + id INT, + name VARCHAR(50) + ) + DISTRIBUTED BY HASH(id) BUCKETS 3 + PROPERTIES("replication_num" = "3"); + """ + + logger.info("Table 'test_table' created or already exists") + sql """ INSERT INTO test_table VALUES (1, 'test1') """ + + def result = sql """ SELECT * FROM test_table ORDER BY id """ + assert result.size() == existsRows + 1, "Expected ${existsRows + 1} rows, but got ${result.size()}" + logger.info("Read/write check passed: ${result.size()} rows found") + + logger.info("All cluster status checks passed successfully") + } + + // Call the function to check cluster status + checkClusterStatus(3, 3, 0) + + // CASE 1 . check restarting fe and be work. + logger.info("Restarting frontends and backends...") + cluster.restartFrontends(); + cluster.restartBackends(); + + sleep(30000) + context.reconnectFe() + + checkClusterStatus(3, 3, 1) + + // CASE 2. If a be is dropped, query and writing also work. + // Get the list of backends + def backends = sql_return_maparray("SHOW BACKENDS") + logger.info("Current backends: {}", backends) + + // Find a backend to drop + def backendToDrop = backends[0] + def backendHost = backendToDrop['Host'] + def backendHeartbeatPort = backendToDrop['HeartbeatPort'] + + logger.info("Dropping backend: {}:{}", backendHost, backendHeartbeatPort) + + // Drop the selected backend + sql """ ALTER SYSTEM DROPP BACKEND "${backendHost}:${backendHeartbeatPort}"; """ + + // Wait for the backend to be fully dropped + def maxWaitSeconds = 300 + def waited = 0 + while (waited < maxWaitSeconds) { + def currentBackends = sql_return_maparray("SHOW BACKENDS") + if (currentBackends.size() == 2) { + logger.info("Backend successfully dropped") + break + } + sleep(10000) + waited += 10 + } + + if (waited >= maxWaitSeconds) { + throw new Exception("Timeout waiting for backend to be dropped") + } + + checkClusterStatus(3, 2, 2) + + // CASE 3. Add the dropped backend back + logger.info("Adding back the dropped backend: {}:{}", backendHost, backendHeartbeatPort) + sql """ ALTER SYSTEM ADD BACKEND "${backendHost}:${backendHeartbeatPort}"; """ + + // Wait for the backend to be fully added back + maxWaitSeconds = 300 + waited = 0 + while (waited < maxWaitSeconds) { + def currentBackends = sql_return_maparray("SHOW BACKENDS") + if (currentBackends.size() == 3) { + logger.info("Backend successfully added back") + break + } + sleep(10000) + waited += 10 + } + + if (waited >= maxWaitSeconds) { + throw new Exception("Timeout waiting for backend to be added back") + } + + checkClusterStatus(3, 3, 3) + + // CASE 4. If a fe is dropped, query and writing also work. + // Get the list of frontends + def frontends = sql_return_maparray("SHOW FRONTENDS") + logger.info("Current frontends: {}", frontends) + + // Find a non-master frontend to drop + def feToDropMap = frontends.find { it['IsMaster'] == "false" } + assert feToDropMap != null, "No non-master frontend found to drop" + + def feHost = feToDropMap['Host'] + def feEditLogPort = feToDropMap['EditLogPort'] + + logger.info("Dropping non-master frontend: {}:{}", feHost, feEditLogPort) + + // Drop the selected non-master frontend + sql """ ALTER SYSTEM DROP FOLLOWER "${feHost}:${feEditLogPort}"; """ + + // Wait for the frontend to be fully dropped + maxWaitSeconds = 300 + waited = 0 + while (waited < maxWaitSeconds) { + def currentFrontends = sql_return_maparray("SHOW FRONTENDS") + if (currentFrontends.size() == frontends.size() - 1) { + logger.info("Non-master frontend successfully dropped") + break + } + sleep(10000) + waited += 10 + } + + if (waited >= maxWaitSeconds) { + throw new Exception("Timeout waiting for non-master frontend to be dropped") + } + + checkClusterStatus(2, 3, 4) + + // CASE 5. Add dropped frontend back + logger.info("Adding dropped frontend back") + + // Get the current list of frontends + def currentFrontends = sql_return_maparray("SHOW FRONTENDS") + + // Find the dropped frontend by comparing with the original list + def droppedFE = frontends.find { fe -> + !currentFrontends.any { it['Host'] == fe['Host'] && it['EditLogPort'] == fe['EditLogPort'] } + } + + assert droppedFE != null, "Could not find the dropped frontend" + + feHost = droppedFE['Host'] + feEditLogPort = droppedFE['EditLogPort'] + + logger.info("Adding back frontend: {}:{}", feHost, feEditLogPort) + + // Add the frontend back + sql """ ALTER SYSTEM ADD FOLLOWER "${feHost}:${feEditLogPort}"; """ + + // Wait for the frontend to be fully added back + maxWaitSeconds = 300 + waited = 0 + while (waited < maxWaitSeconds) { + def updatedFrontends = sql_return_maparray("SHOW FRONTENDS") + if (updatedFrontends.size() == frontends.size()) { + logger.info("Frontend successfully added back") + break + } + sleep(10000) + waited += 10 + } + + if (waited >= maxWaitSeconds) { + throw new Exception("Timeout waiting for frontend to be added back") + } + + // Verify cluster status after adding the frontend back + checkClusterStatus(3, 3, 5) + + logger.info("Frontend successfully added back and cluster status verified") + + // CASE 6. If fe can not drop itself. + // 6. Attempt to drop the master FE and expect an exception + logger.info("Attempting to drop the master frontend") + + // Get the master frontend information + def masterFE = frontends.find { it['IsMaster'] == "true" } + assert masterFE != null, "No master frontend found" + + def masterHost = masterFE['Host'] + def masterEditLogPort = masterFE['EditLogPort'] + + logger.info("Attempting to drop master frontend: {}:{}", masterHost, masterEditLogPort) + + try { + sql """ ALTER SYSTEM DROP FOLLOWER "${masterHost}:${masterEditLogPort}"; """ + throw new Exception("Expected an exception when trying to drop master frontend, but no exception was thrown") + } catch (Exception e) { + logger.info("Received expected exception when trying to drop master frontend: {}", e.getMessage()) + assert e.getMessage().contains("can not drop current master node."), "Unexpected exception message when trying to drop master frontend" + } + + // Verify that the master frontend is still present + currentFrontends = sql_return_maparray("SHOW FRONTENDS") + assert currentFrontends.find { it['IsMaster'] == "true" && it['Host'] == masterHost && it['EditLogPort'] == masterEditLogPort } != null, "Master frontend should still be present" + logger.info("Successfully verified that the master frontend cannot be dropped") + + + // CASE 7. Attempt to drop a non-existent backend + logger.info("Attempting to drop a non-existent backend") + + // Generate a non-existent host and port + def nonExistentHost = "non.existent.host" + def nonExistentPort = 12345 + + try { + sql """ ALTER SYSTEM DROPP BACKEND "${nonExistentHost}:${nonExistentPort}"; """ + throw new Exception("Expected an exception when trying to drop non-existent backend, but no exception was thrown") + } catch (Exception e) { + logger.info("Received expected exception when trying to drop non-existent backend: {}", e.getMessage()) + assert e.getMessage().contains("backend does not exists"), "Unexpected exception message when trying to drop non-existent backend" + } + + // Verify that the number of backends remains unchanged + def currentBackends = sql_return_maparray("SHOW BACKENDS") + def originalBackendCount = 3 // As per the initial setup in this test + assert currentBackends.size() == originalBackendCount, "Number of backends should remain unchanged after attempting to drop a non-existent backend" + + checkClusterStatus(3, 3, 6) + + // CASE 8. Decommission a backend and verify the process + logger.info("Attempting to decommission a backend") + + // Get the list of current backends + backends = sql_return_maparray("SHOW BACKENDS") + assert backends.size() >= 1, "Not enough backends to perform decommission test" + + // Select a backend to decommission (not the first one, as it might be the master) + def backendToDecommission = backends[1] + def decommissionHost = backendToDecommission['Host'] + def decommissionPort = backendToDecommission['HeartbeatPort'] + + logger.info("Decommissioning backend: {}:{}", decommissionHost, decommissionPort) + + // Decommission the selected backend + sql """ ALTER SYSTEM DECOMMISSION BACKEND "${decommissionHost}:${decommissionPort}"; """ + + // Wait for the decommission process to complete (this may take some time in a real environment) + int maxAttempts = 30 + int attempts = 0 + boolean decommissionComplete = false + + while (attempts < maxAttempts && !decommissionComplete) { + currentBackends = sql_return_maparray("SHOW BACKENDS") + def decommissionedBackend = currentBackends.find { it['Host'] == decommissionHost && it['HeartbeatPort'] == decommissionPort } + + logger.info("decomissionedBackend {}", decommissionedBackend) + // TODO: decomisssion is alive? + if (decommissionedBackend && decommissionedBackend['Alive'] == "true" && decommissionedBackend['SystemDecommissioned'] == "true") { + decommissionComplete = true + } else { + attempts++ + sleep(1000) // Wait for 1 second before checking again + } + } + + assert decommissionComplete, "Backend decommission did not complete within the expected time" + + // Verify that the decommissioned backend is no longer active + def finalBackends = sql_return_maparray("SHOW BACKENDS") + def decommissionedBackend = finalBackends.find { it['Host'] == decommissionHost && it['HeartbeatPort'] == decommissionPort } + assert decommissionedBackend['Alive'] == "true", "Decommissioned backend should not be alive" + assert decommissionedBackend['SystemDecommissioned'] == "true", "Decommissioned backend should have SystemDecommissioned set to true" + + logger.info("Successfully decommissioned backend and verified its status") + + checkClusterStatus(3, 3, 7) + } + } + +} \ No newline at end of file