Skip to content

Commit

Permalink
[improvement](cloud) manage node via sql like non cloud mode (#40264)
Browse files Browse the repository at this point in the history
1. add a config deploy_mode to enable disaggregated mode. In this mode
users have to config cloud_instance_id and meta_service_endpoint.

2. When a fe starts from empty, it would try to create instance.

3. If the instance does not exists, it creates and starts from master
role. If the instance exists, then it stats with its role got from ms.

4. Frontends are added via sql alter system add frontend. Backends are
added via sql alter system add backend.

5. Users do not need config cloud_instance_id and meta_service_endpoint
in be.conf, because fe sends them to be via heartbeat.

6. Builtin vault is not needed any more, internal tabels are stored in
default vault.

TODO:

1. decomission fe and be via sql
2. change cloud_instance_id to cluster id.

doc pr apache/doris-website#1072
  • Loading branch information
dataroaring committed Sep 13, 2024
1 parent aca268a commit a0588eb
Show file tree
Hide file tree
Showing 41 changed files with 1,269 additions and 142 deletions.
32 changes: 32 additions & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <ostream>
#include <string>

#include "cloud/config.h"
#include "common/config.h"
#include "common/status.h"
#include "olap/storage_engine.h"
Expand Down Expand Up @@ -244,6 +245,37 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
_engine.notify_listeners();
}

if (master_info.__isset.meta_service_endpoint != config::is_cloud_mode()) {
return Status::InvalidArgument(
"fe and be do not work in same mode, fe cloud mode: {},"
" be cloud mode: {}",
master_info.__isset.meta_service_endpoint, config::is_cloud_mode());
}

if (master_info.__isset.meta_service_endpoint && config::meta_service_endpoint.empty() &&
!master_info.meta_service_endpoint.empty()) {
auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint,
true);
LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint << " "
<< st;
}

if (master_info.__isset.cloud_instance_id) {
if (!config::cloud_instance_id.empty() &&
config::cloud_instance_id != master_info.cloud_instance_id) {
return Status::InvalidArgument(
"cloud_instance_id in fe.conf and be.conf are not same, fe: {}, be: {}",
master_info.cloud_instance_id, config::cloud_instance_id);
}

if (config::cloud_instance_id.empty() && !master_info.cloud_instance_id.empty()) {
auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id, true);
config::set_cloud_unique_id(master_info.cloud_instance_id);
LOG(INFO) << "set config cloud_instance_id " << master_info.cloud_instance_id << " "
<< st;
}
}

return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos, bool
j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx");
}

LOG(INFO) << "get storage vault, enable_storage_vault=" << is_vault_mode
LOG(INFO) << "get storage vault, enable_storage_vault=" << *is_vault_mode
<< " response=" << resp.ShortDebugString();
return Status::OK();
}
Expand Down
24 changes: 1 addition & 23 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,29 +160,7 @@ struct RefreshFSVaultVisitor {
};

Status CloudStorageEngine::open() {
cloud::StorageVaultInfos vault_infos;
bool enable_storage_vault = false;
do {
auto st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault);
if (st.ok()) {
break;
}

LOG(WARNING) << "failed to get vault info, retry after 5s, err=" << st;
std::this_thread::sleep_for(5s);
} while (vault_infos.empty());

for (auto& [id, vault_info, path_format] : vault_infos) {
if (auto st = std::visit(VaultCreateFSVisitor {id, path_format}, vault_info); !st.ok())
[[unlikely]] {
return vault_process_error(id, vault_info, std::move(st));
}
}

// vault mode should not support latest_fs to get rid of unexpected storage backends choosen
if (!enable_storage_vault) {
set_latest_fs(get_filesystem(std::get<0>(vault_infos.back())));
}
sync_storage_vault();

// TODO(plat1ko): DeleteBitmapTxnManager

Expand Down
21 changes: 17 additions & 4 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,27 @@ class CloudStorageEngine final : public BaseStorageEngine {
}
void _check_file_cache_ttl_block_valid();

std::optional<StorageResource> get_storage_resource(const std::string& vault_id) const {
std::optional<StorageResource> get_storage_resource(const std::string& vault_id) {
LOG(INFO) << "Getting storage resource for vault_id: " << vault_id;
if (vault_id.empty()) {
if (latest_fs() == nullptr) {
LOG(INFO) << "there is not latest fs";
return std::nullopt;
}
return StorageResource {latest_fs()};
}

if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) {
return storage_resource->first;
}
bool synced = false;
do {
if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) {
return storage_resource->first;
}
if (synced) {
break;
}
sync_storage_vault();
synced = true;
} while (true);

return std::nullopt;
}
Expand Down
14 changes: 12 additions & 2 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

#include "cloud/config.h"

#include "common/status.h"

namespace doris::config {

DEFINE_String(cloud_unique_id, "");
DEFINE_String(meta_service_endpoint, "");
DEFINE_String(deploy_mode, "");
DEFINE_mString(cloud_instance_id, "");
DEFINE_mString(cloud_unique_id, "");
DEFINE_mString(meta_service_endpoint, "");
DEFINE_Bool(meta_service_use_load_balancer, "false");
DEFINE_mInt32(meta_service_rpc_timeout_ms, "10000");
DEFINE_Bool(meta_service_connection_pooled, "true");
Expand Down Expand Up @@ -64,4 +68,10 @@ DEFINE_mBool(enable_new_tablet_do_compaction, "false");

DEFINE_Bool(enable_cloud_txn_lazy_commit, "false");

void set_cloud_unique_id(std::string instance_id) {
if (cloud_unique_id.empty() && !instance_id.empty()) {
static_cast<void>(set_config("cloud_unique_id", "1:" + instance_id + ":compute", true));
}
}

} // namespace doris::config
11 changes: 8 additions & 3 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@

namespace doris::config {

DECLARE_String(cloud_unique_id);
DECLARE_String(deploy_mode);
// deprecated do not configure directly
DECLARE_mString(cloud_instance_id);
DECLARE_mString(cloud_unique_id);

static inline bool is_cloud_mode() {
return !cloud_unique_id.empty();
return deploy_mode == "cloud" || !cloud_unique_id.empty();
}

void set_cloud_unique_id(std::string instance_id);

// Set the endpoint of meta service.
//
// If meta services are deployed behind a load balancer, set this config to "host:port" of the load balancer.
Expand All @@ -40,7 +45,7 @@ static inline bool is_cloud_mode() {
// If you want to access a group of meta services directly, set the addresses of meta services to this config,
// separated by a comma, like "host:port,host:port,host:port", then BE will choose a server to connect in randomly.
// In this mode, The config meta_service_connection_pooled is still useful, but the other two configs will be ignored.
DECLARE_String(meta_service_endpoint);
DECLARE_mString(meta_service_endpoint);
// Set the underlying connection type to pooled.
DECLARE_Bool(meta_service_connection_pooled);
DECLARE_mInt64(meta_service_connection_pool_size);
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <utility>
#include <vector>

#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
Expand Down Expand Up @@ -1673,6 +1674,8 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t
SET_FIELD(it.second, std::vector<std::string>, fill_conf_map, set_to_default);
}

set_cloud_unique_id(cloud_instance_id);

return true;
}

Expand Down
5 changes: 2 additions & 3 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2338,9 +2338,8 @@ void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller,
std::find_if(instance.storage_vault_names().begin(), instance.storage_vault_names().end(),
[](const std::string& name) { return name == BUILT_IN_STORAGE_VAULT_NAME; }) ==
instance.storage_vault_names().end()) {
code = MetaServiceCode::STORAGE_VAULT_NOT_FOUND;
msg = "instance has no built in storage vault";
return;
LOG_EVERY_N(INFO, 100) << "There is no builtin vault in instance "
<< instance.instance_id();
}

auto get_cluster_mysql_user = [](const ClusterPB& c, std::set<std::string>* mysql_users) {
Expand Down
4 changes: 3 additions & 1 deletion cloud/src/resource-manager/resource_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std::
n.heartbeat_port()) {
continue;
}
ss << "check cluster params failed, node : " << proto_to_json(n);
ss << "check cluster params failed, edit_log_port is required for frontends while "
"heatbeat_port is required for banckens, node : "
<< proto_to_json(n);
*err = ss.str();
no_err = false;
break;
Expand Down
80 changes: 62 additions & 18 deletions docker/runtime/doris-compose/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,27 @@ def docker_env(self):
enable_coverage = self.cluster.coverage_dir

envs = {
"MY_IP": self.get_ip(),
"MY_ID": self.id,
"MY_TYPE": self.node_type(),
"FE_QUERY_PORT": FE_QUERY_PORT,
"FE_EDITLOG_PORT": FE_EDITLOG_PORT,
"BE_HEARTBEAT_PORT": BE_HEARTBEAT_PORT,
"DORIS_HOME": os.path.join(self.docker_home_dir()),
"STOP_GRACE": 1 if enable_coverage else 0,
"IS_CLOUD": 1 if self.cluster.is_cloud else 0,
"MY_IP":
self.get_ip(),
"MY_ID":
self.id,
"MY_TYPE":
self.node_type(),
"FE_QUERY_PORT":
FE_QUERY_PORT,
"FE_EDITLOG_PORT":
FE_EDITLOG_PORT,
"BE_HEARTBEAT_PORT":
BE_HEARTBEAT_PORT,
"DORIS_HOME":
os.path.join(self.docker_home_dir()),
"STOP_GRACE":
1 if enable_coverage else 0,
"IS_CLOUD":
1 if self.cluster.is_cloud else 0,
"SQL_MODE_NODE_MGR":
1 if hasattr(self.cluster, 'sql_mode_node_mgr')
and self.cluster.sql_mode_node_mgr else 0
}

if self.cluster.is_cloud:
Expand Down Expand Up @@ -390,15 +402,22 @@ def get_add_init_config(self):
cfg += self.cluster.fe_config
if self.cluster.is_cloud:
cfg += [
"cloud_unique_id = " + self.cloud_unique_id(),
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
"",
self.cluster.get_meta_server_addr()), "",
"# For regression-test",
"ignore_unsupported_properties_in_cloud_mode = true",
"merge_on_write_forced_to_false = true",
"deploy_mode = cloud"
]

if self.cluster.sql_mode_node_mgr:
cfg += [
"cloud_instance_id = " + self.cloud_instance_id(),
]
else:
cfg += [
"cloud_unique_id = " + self.cloud_unique_id(),
]
return cfg

def init_is_follower(self):
Expand All @@ -420,6 +439,9 @@ def docker_env(self):
def cloud_unique_id(self):
return "sql_server_{}".format(self.id)

def cloud_instance_id(self):
return "reg_cloud_instance"

def entrypoint(self):
return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_fe.sh")]

Expand Down Expand Up @@ -450,14 +472,26 @@ def get_add_init_config(self):
cfg += self.cluster.be_config
if self.cluster.is_cloud:
cfg += [
"cloud_unique_id = " + self.cloud_unique_id(),
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
'tmp_file_dirs = [ {"path":"./storage/tmp","max_cache_bytes":10240000, "max_upload_bytes":10240000}]',
'enable_file_cache = true',
'file_cache_path = [ {{"path": "{}/storage/file_cache", "total_size":53687091200, "query_limit": 10737418240}}]'
.format(self.docker_home_dir()),
"deploy_mode = cloud",
]

if self.cluster.be_metaservice_endpoint:
cfg += [
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
]
if self.cluster.be_cloud_instanceid:
cfg += [
"cloud_instance_id = " + self.cloud_instance_id(),
]
if not self.cluster.sql_mode_node_mgr:
cfg += [
"cloud_unique_id = " + self.cloud_unique_id(),
]
return cfg

def init_cluster_name(self):
Expand Down Expand Up @@ -519,6 +553,9 @@ def docker_env(self):
def cloud_unique_id(self):
return "compute_node_{}".format(self.id)

def cloud_instance_id(self):
return "reg_cloud_instance"

def docker_home_dir(self):
return os.path.join(DOCKER_DORIS_PATH, "be")

Expand Down Expand Up @@ -628,7 +665,8 @@ class Cluster(object):

def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
ms_config, recycle_config, fe_follower, be_disks, be_cluster,
reg_be, coverage_dir, cloud_store_config):
reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr,
be_metaservice_endpoint, be_cloud_instanceid):
self.name = name
self.subnet = subnet
self.image = image
Expand All @@ -647,11 +685,15 @@ def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
node_type: Group(node_type)
for node_type in Node.TYPE_ALL
}
self.sql_mode_node_mgr = sql_mode_node_mgr
self.be_metaservice_endpoint = be_metaservice_endpoint
self.be_cloud_instanceid = be_cloud_instanceid

@staticmethod
def new(name, image, is_cloud, fe_config, be_config, ms_config,
recycle_config, fe_follower, be_disks, be_cluster, reg_be,
coverage_dir, cloud_store_config):
coverage_dir, cloud_store_config, sql_mode_node_mgr,
be_metaservice_endpoint, be_cloud_instanceid):
if not os.path.exists(LOCAL_DORIS_PATH):
os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
os.chmod(LOCAL_DORIS_PATH, 0o777)
Expand All @@ -663,7 +705,9 @@ def new(name, image, is_cloud, fe_config, be_config, ms_config,
cluster = Cluster(name, subnet, image, is_cloud, fe_config,
be_config, ms_config, recycle_config,
fe_follower, be_disks, be_cluster, reg_be,
coverage_dir, cloud_store_config)
coverage_dir, cloud_store_config,
sql_mode_node_mgr, be_metaservice_endpoint,
be_cloud_instanceid)
os.makedirs(cluster.get_path(), exist_ok=True)
os.makedirs(get_status_path(name), exist_ok=True)
cluster._save_meta()
Expand Down
Loading

0 comments on commit a0588eb

Please sign in to comment.