Skip to content

Commit

Permalink
[improve](cloud) use compute group instead of cluster and remove clou…
Browse files Browse the repository at this point in the history
…d_instance_id (#40767)

1. use compute group instead of cloud cluster.
2. use cluster instead of cloud_instance.
3. simply be config by passing cloud_unique_id to be, we add a config
item (enable_use_cloud_unique_id_from_fe) in be in order to handle
cloud_unique_id is different in ms sometimes, it should not happen.
5. opt sync_vault_info by sync from meta when vault id is empty.
6. use show storage vaults instead of show storage vault.
7. only first fe can be start from empty, otherwise multi master may
start.
8. cancel decommission is not supported in cloud mode.
  • Loading branch information
dataroaring authored Sep 19, 2024
1 parent 9986fa7 commit 9293045
Show file tree
Hide file tree
Showing 55 changed files with 1,314 additions and 217 deletions.
47 changes: 27 additions & 20 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,36 +246,43 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
}

if (master_info.__isset.meta_service_endpoint != config::is_cloud_mode()) {
return Status::InvalidArgument(
LOG(WARNING) << "Detected mismatch in cloud mode configuration between FE and BE. "
<< "FE cloud mode: "
<< (master_info.__isset.meta_service_endpoint ? "true" : "false")
<< ", BE cloud mode: " << (config::is_cloud_mode() ? "true" : "false");
return Status::InvalidArgument<false>(
"fe and be do not work in same mode, fe cloud mode: {},"
" be cloud mode: {}",
master_info.__isset.meta_service_endpoint, config::is_cloud_mode());
}

if (master_info.__isset.meta_service_endpoint && config::meta_service_endpoint.empty() &&
!master_info.meta_service_endpoint.empty()) {
auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint,
true);
LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint << " "
<< st;
}

if (master_info.__isset.cloud_instance_id) {
if (!config::cloud_instance_id.empty() &&
config::cloud_instance_id != master_info.cloud_instance_id) {
return Status::InvalidArgument(
"cloud_instance_id in fe.conf and be.conf are not same, fe: {}, be: {}",
master_info.cloud_instance_id, config::cloud_instance_id);
if (master_info.__isset.meta_service_endpoint) {
if (config::meta_service_endpoint.empty() && !master_info.meta_service_endpoint.empty()) {
auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint,
true);
LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint
<< " " << st;
}

if (config::cloud_instance_id.empty() && !master_info.cloud_instance_id.empty()) {
auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id, true);
config::set_cloud_unique_id(master_info.cloud_instance_id);
LOG(INFO) << "set config cloud_instance_id " << master_info.cloud_instance_id << " "
<< st;
if (master_info.meta_service_endpoint != config::meta_service_endpoint) {
LOG(WARNING) << "Detected mismatch in meta_service_endpoint configuration between FE "
"and BE. "
<< "FE meta_service_endpoint: " << master_info.meta_service_endpoint
<< ", BE meta_service_endpoint: " << config::meta_service_endpoint;
return Status::InvalidArgument<false>(
"fe and be do not work in same mode, fe meta_service_endpoint: {},"
" be meta_service_endpoint: {}",
master_info.meta_service_endpoint, config::meta_service_endpoint);
}
}

if (master_info.__isset.cloud_unique_id &&
config::cloud_unique_id != master_info.cloud_unique_id &&
config::enable_use_cloud_unique_id_from_fe) {
auto st = config::set_config("cloud_unique_id", master_info.cloud_unique_id, true);
LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st;
}

return Status::OK();
}

Expand Down
10 changes: 3 additions & 7 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,12 @@ class CloudStorageEngine final : public BaseStorageEngine {

std::optional<StorageResource> get_storage_resource(const std::string& vault_id) {
LOG(INFO) << "Getting storage resource for vault_id: " << vault_id;
if (vault_id.empty()) {
if (latest_fs() == nullptr) {
LOG(INFO) << "there is not latest fs";
return std::nullopt;
}
return StorageResource {latest_fs()};
}

bool synced = false;
do {
if (vault_id.empty() && latest_fs() != nullptr) {
return StorageResource {latest_fs()};
}
if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) {
return storage_resource->first;
}
Expand Down
7 changes: 1 addition & 6 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
namespace doris::config {

DEFINE_String(deploy_mode, "");
DEFINE_mString(cloud_instance_id, "");
DEFINE_mString(cloud_unique_id, "");
DEFINE_mString(meta_service_endpoint, "");
DEFINE_Bool(meta_service_use_load_balancer, "false");
Expand Down Expand Up @@ -72,10 +71,6 @@ DEFINE_mInt32(remove_expired_tablet_txn_info_interval_seconds, "300");

DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");

void set_cloud_unique_id(std::string instance_id) {
if (cloud_unique_id.empty() && !instance_id.empty()) {
static_cast<void>(set_config("cloud_unique_id", "1:" + instance_id + ":compute", true));
}
}
DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true");

} // namespace doris::config
5 changes: 2 additions & 3 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@ namespace doris::config {

DECLARE_String(deploy_mode);
// deprecated do not configure directly
DECLARE_mString(cloud_instance_id);
DECLARE_mString(cloud_unique_id);

static inline bool is_cloud_mode() {
return deploy_mode == "cloud" || !cloud_unique_id.empty();
}

void set_cloud_unique_id(std::string instance_id);

// Set the endpoint of meta service.
//
// If meta services are deployed behind a load balancer, set this config to "host:port" of the load balancer.
Expand Down Expand Up @@ -107,4 +104,6 @@ DECLARE_mInt32(remove_expired_tablet_txn_info_interval_seconds);

DECLARE_mInt32(tablet_txn_info_min_expired_seconds);

DECLARE_mBool(enable_use_cloud_unique_id_from_fe);

} // namespace doris::config
2 changes: 0 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1676,8 +1676,6 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t
SET_FIELD(it.second, std::vector<std::string>, fill_conf_map, set_to_default);
}

set_cloud_unique_id(cloud_instance_id);

return true;
}

Expand Down
22 changes: 9 additions & 13 deletions docker/runtime/doris-compose/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@

IP_PART4_SIZE = 200

CLUSTER_ID = "12345678"

LOG = utils.get_logger()


Expand Down Expand Up @@ -412,7 +414,7 @@ def get_add_init_config(self):

if self.cluster.sql_mode_node_mgr:
cfg += [
"cloud_instance_id = " + self.cloud_instance_id(),
"cluster_id = " + CLUSTER_ID,
]
else:
cfg += [
Expand All @@ -439,9 +441,6 @@ def docker_env(self):
def cloud_unique_id(self):
return "sql_server_{}".format(self.id)

def cloud_instance_id(self):
return "reg_cloud_instance"

def entrypoint(self):
return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_fe.sh")]

Expand Down Expand Up @@ -484,9 +483,9 @@ def get_add_init_config(self):
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
]
if self.cluster.be_cloud_instanceid:
if self.cluster.be_cluster_id:
cfg += [
"cloud_instance_id = " + self.cloud_instance_id(),
"cluster_id = " + CLUSTER_ID,
]
if not self.cluster.sql_mode_node_mgr:
cfg += [
Expand Down Expand Up @@ -553,9 +552,6 @@ def docker_env(self):
def cloud_unique_id(self):
return "compute_node_{}".format(self.id)

def cloud_instance_id(self):
return "reg_cloud_instance"

def docker_home_dir(self):
return os.path.join(DOCKER_DORIS_PATH, "be")

Expand Down Expand Up @@ -666,7 +662,7 @@ class Cluster(object):
def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
ms_config, recycle_config, fe_follower, be_disks, be_cluster,
reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr,
be_metaservice_endpoint, be_cloud_instanceid):
be_metaservice_endpoint, be_cluster_id):
self.name = name
self.subnet = subnet
self.image = image
Expand All @@ -687,13 +683,13 @@ def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
}
self.sql_mode_node_mgr = sql_mode_node_mgr
self.be_metaservice_endpoint = be_metaservice_endpoint
self.be_cloud_instanceid = be_cloud_instanceid
self.be_cluster_id = be_cluster_id

@staticmethod
def new(name, image, is_cloud, fe_config, be_config, ms_config,
recycle_config, fe_follower, be_disks, be_cluster, reg_be,
coverage_dir, cloud_store_config, sql_mode_node_mgr,
be_metaservice_endpoint, be_cloud_instanceid):
be_metaservice_endpoint, be_cluster_id):
if not os.path.exists(LOCAL_DORIS_PATH):
os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
os.chmod(LOCAL_DORIS_PATH, 0o777)
Expand All @@ -707,7 +703,7 @@ def new(name, image, is_cloud, fe_config, be_config, ms_config,
fe_follower, be_disks, be_cluster, reg_be,
coverage_dir, cloud_store_config,
sql_mode_node_mgr, be_metaservice_endpoint,
be_cloud_instanceid)
be_cluster_id)
os.makedirs(cluster.get_path(), exist_ok=True)
os.makedirs(get_status_path(name), exist_ok=True)
cluster._save_meta()
Expand Down
12 changes: 6 additions & 6 deletions docker/runtime/doris-compose/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,19 +322,19 @@ def add_parser(self, args_parsers):

if self._support_boolean_action():
parser.add_argument(
"--be-cloud-instanceid",
"--be-cluster-id",
default=True,
action=self._get_parser_bool_action(False),
help=
"Do not set BE cloud instance ID in conf. Default is False.")
"Do not set BE cluster ID in conf. Default is False.")
else:
parser.add_argument(
"--no-be-cloud-instanceid",
dest='be_cloud_instanceid',
"--no-be-cluster-id",
dest='be_cluster_id',
default=True,
action=self._get_parser_bool_action(False),
help=
"Do not set BE cloud instance ID in conf. Default is False.")
"Do not set BE cluser ID in conf. Default is False.")

parser.add_argument(
"--fdb-version",
Expand Down Expand Up @@ -434,7 +434,7 @@ def run(self, args):
args.be_config, args.ms_config, args.recycle_config,
args.fe_follower, args.be_disks, args.be_cluster, args.reg_be,
args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr,
args.be_metaservice_endpoint, args.be_cloud_instanceid)
args.be_metaservice_endpoint, args.be_cluster_id)
LOG.info("Create new cluster {} succ, cluster path is {}".format(
args.NAME, cluster.get_path()))

Expand Down
10 changes: 4 additions & 6 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2819,16 +2819,12 @@ public class Config extends ConfigBase {
public static String deploy_mode = "";

// compatibily with elder version.
// cloud_unique_id is introduced before cloud_instance_id, so it has higher priority.
// cloud_unique_id has higher priority than cluster_id.
@ConfField
public static String cloud_unique_id = "";

// If cloud_unique_id is empty, cloud_instance_id works, otherwise cloud_unique_id works.
@ConfField
public static String cloud_instance_id = "";

public static boolean isCloudMode() {
return deploy_mode.equals("cloud") || !cloud_unique_id.isEmpty() || !cloud_instance_id.isEmpty();
return deploy_mode.equals("cloud") || !cloud_unique_id.isEmpty();
}

public static boolean isNotCloudMode() {
Expand Down Expand Up @@ -2903,6 +2899,8 @@ public static int metaServiceRpcRetryTimes() {
@ConfField
public static boolean enable_cloud_snapshot_version = true;

// Interval in seconds for checking the status of compute groups (cloud clusters).
// Compute groups and cloud clusters refer to the same concept.
@ConfField
public static int cloud_cluster_check_interval_second = 10;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ COMMITTED: 'COMMITTED';
COMPACT: 'COMPACT';
COMPLETE: 'COMPLETE';
COMPRESS_TYPE: 'COMPRESS_TYPE';
COMPUTE: 'COMPUTE';
CONDITIONS: 'CONDITIONS';
CONFIG: 'CONFIG';
CONNECTION: 'CONNECTION';
Expand Down Expand Up @@ -554,6 +555,7 @@ VARIABLE: 'VARIABLE';
VARIABLES: 'VARIABLES';
VARIANT: 'VARIANT';
VAULT: 'VAULT';
VAULTS: 'VAULTS';
VERBOSE: 'VERBOSE';
VERSION: 'VERSION';
VIEW: 'VIEW';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,9 @@ unsupportedOtherStatement
| UNINSTALL PLUGIN name=identifierOrText #uninstallPlugin
| LOCK TABLES (lockTable (COMMA lockTable)*)? #lockTables
| UNLOCK TABLES #unlockTables
| WARM UP CLUSTER destination=identifier WITH
(CLUSTER source=identifier | (warmUpItem (COMMA warmUpItem)*)) FORCE? #warmUpCluster
| WARM UP (CLUSTER | COMPUTE GROUP) destination=identifier WITH
((CLUSTER | COMPUTE GROUP) source=identifier |
(warmUpItem (COMMA warmUpItem)*)) FORCE? #warmUpCluster
| BACKUP SNAPSHOT label=multipartIdentifier TO repo=identifier
((ON | EXCLUDE) LEFT_PAREN baseTableRef (COMMA baseTableRef)* RIGHT_PAREN)?
properties=propertyClause? #backup
Expand All @@ -208,7 +209,7 @@ unsupportedShowStatement
| SHOW ROW POLICY (FOR (userIdentify | (ROLE role=identifier)))? #showRowPolicy
| SHOW STORAGE POLICY (USING (FOR policy=identifierOrText)?)? #showStoragePolicy
| SHOW STAGES #showStages
| SHOW STORAGE VAULT #showStorageVault
| SHOW STORAGE (VAULT | VAULTS) #showStorageVault
| SHOW CREATE REPOSITORY FOR identifier #showCreateRepository
| SHOW WHITELIST #showWhitelist
| SHOW (GLOBAL | SESSION | LOCAL)? VARIABLES wildWhere? #showVariables
Expand Down Expand Up @@ -307,7 +308,7 @@ unsupportedShowStatement
| (FROM tableName=multipartIdentifier (ALL VERBOSE?)?))? #showQueryStats
| SHOW BUILD INDEX ((FROM | IN) database=multipartIdentifier)?
wildWhere? sortClause? limitClause? #showBuildIndex
| SHOW CLUSTERS #showClusters
| SHOW (CLUSTERS | (COMPUTE GROUPS)) #showClusters
| SHOW CONVERT_LSC ((FROM | IN) database=multipartIdentifier)? #showConvertLsc
| SHOW REPLICA STATUS FROM baseTableRef wildWhere? #showReplicaStatus
| SHOW REPLICA DISTRIBUTION FROM baseTableRef #showREplicaDistribution
Expand Down Expand Up @@ -495,13 +496,13 @@ unsupportedGrantRevokeStatement
: GRANT privilegeList ON multipartIdentifierOrAsterisk
TO (userIdentify | ROLE STRING_LITERAL) #grantTablePrivilege
| GRANT privilegeList ON
(RESOURCE | CLUSTER | STAGE | STORAGE VAULT | WORKLOAD GROUP)
(RESOURCE | CLUSTER | COMPUTE GROUP | STAGE | STORAGE VAULT | WORKLOAD GROUP)
identifierOrTextOrAsterisk TO (userIdentify | ROLE STRING_LITERAL) #grantResourcePrivilege
| GRANT roles+=STRING_LITERAL (COMMA roles+=STRING_LITERAL)* TO userIdentify #grantRole
| REVOKE privilegeList ON multipartIdentifierOrAsterisk
FROM (userIdentify | ROLE STRING_LITERAL) #grantTablePrivilege
| REVOKE privilegeList ON
(RESOURCE | CLUSTER | STAGE | STORAGE VAULT | WORKLOAD GROUP)
(RESOURCE | CLUSTER | COMPUTE GROUP | STAGE | STORAGE VAULT | WORKLOAD GROUP)
identifierOrTextOrAsterisk FROM (userIdentify | ROLE STRING_LITERAL) #grantResourcePrivilege
| REVOKE roles+=STRING_LITERAL (COMMA roles+=STRING_LITERAL)* FROM userIdentify #grantRole
;
Expand Down Expand Up @@ -1821,6 +1822,7 @@ nonReserved
| COMPACT
| COMPLETE
| COMPRESS_TYPE
| COMPUTE
| CONDITIONS
| CONFIG
| CONNECTION
Expand Down Expand Up @@ -2083,6 +2085,7 @@ nonReserved
| VARIABLES
| VARIANT
| VAULT
| VAULTS
| VERBOSE
| VERSION
| VIEW
Expand Down
Loading

0 comments on commit 9293045

Please sign in to comment.