Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore](cloud) Support starting both meta-service and recycler within single process #40223

Merged
merged 7 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions cloud/script/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ fi

echo "LIBHDFS3_CONF=${LIBHDFS3_CONF}"

export JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:5000,dirty_decay_ms:5000,oversize_threshold:0,prof:false,lg_prof_interval:-1"
# to enable dump jeprof heap stats prodigally, change `prof:false` to `prof:true`
# to control the dump interval change `lg_prof_interval` to a specific value, it is pow/exponent of 2 in size of bytes, default 34 means 2 ** 34 = 16GB
# to control the dump path, change `prof_prefix` to a specific path, e.g. /doris_cloud/log/ms_, by default it dumps at the path where the start command called
export JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:5000,dirty_decay_ms:5000,oversize_threshold:0,prof_prefix:ms_,prof:false,lg_prof_interval:34"

if [[ "${RUN_VERSION}" -eq 1 ]]; then
"${bin}" --version
Expand All @@ -131,14 +134,22 @@ fi

mkdir -p "${DORIS_HOME}/log"
echo "starts ${process} with args: $*"
out_file=${DORIS_HOME}/log/${process}.out
if [[ "${RUN_DAEMON}" -eq 1 ]]; then
date >>"${DORIS_HOME}/log/${process}.out"
nohup "${bin}" "$@" >>"${DORIS_HOME}/log/${process}.out" 2>&1 &
# wait for log flush
sleep 1.5
tail -n10 "${DORIS_HOME}/log/${process}.out" | grep 'working directory' -B1 -A10
echo "please check process log for more details"
echo ""
# append 10 blank lines to ensure the following tail -n10 works correctly
printf "\n\n\n\n\n\n\n\n\n\n" >>"${out_file}"
echo "$(date +'%F %T') try to start ${process}" >>"${out_file}"
nohup "${bin}" "$@" >>"${out_file}" 2>&1 &
echo "wait and check ${process} start successfully"
sleep 3
tail -n10 "${out_file}" | grep 'successfully started brpc'
ret=$?
if [[ ${ret} -ne 0 ]]; then
echo "${process} may not start successfully please check process log for more details"
exit 1
fi
echo "${process} start successfully"
exit 0
elif [[ "${RUN_CONSOLE}" -eq 1 ]]; then
export DORIS_LOG_TO_STDERR=1
date
Expand Down
1 change: 1 addition & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ CONF_mInt32(scan_instances_interval_seconds, "60"); // 1min
CONF_mInt32(check_object_interval_seconds, "43200"); // 12hours

CONF_mInt64(check_recycle_task_interval_seconds, "600"); // 10min
CONF_mInt64(recycler_sleep_before_scheduling_seconds, "60");
// log a warning if a recycle task takes longer than this duration
CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h

Expand Down
60 changes: 34 additions & 26 deletions cloud/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ DECLARE_int64(socket_max_unwritten_bytes);
int main(int argc, char** argv) {
if (argc > 1) {
if (auto ret = args.parse(argc - 1, argv + 1); !ret.empty()) {
std::cerr << ret << std::endl;
std::cerr << "parse arguments error: " << ret << std::endl;
help();
return -1;
}
}

if (argc < 2 || args.get<bool>(ARG_HELP)) {
if (args.get<bool>(ARG_HELP)) {
help();
return 0;
}
Expand All @@ -177,21 +177,16 @@ int main(int argc, char** argv) {
return 0;
}

// FIXME(gavin): do we need to enable running both MS and recycler within
// single process
if (!(args.get<bool>(ARG_META_SERVICE) ^ args.get<bool>(ARG_RECYCLER))) {
std::cerr << "only one of --meta-service and --recycler must be specified" << std::endl;
return 1;
}

// There may be more roles to play
// There may be more roles to play in the future, if there are multi roles specified,
// use meta_service as the process name
std::string process_name = args.get<bool>(ARG_META_SERVICE) ? "meta_service"
: args.get<bool>(ARG_RECYCLER) ? "recycler"
: "";
if (process_name.empty()) {
std::cerr << "failed to determine prcess name with given args" << std::endl;
return 1;
}
: "meta_service";

using namespace std::chrono;

auto start = steady_clock::now();
auto end = start;

auto pid_file_fd_holder = gen_pidfile("doris_cloud");
if (pid_file_fd_holder == nullptr) {
Expand All @@ -215,11 +210,19 @@ int main(int argc, char** argv) {
}

// We can invoke glog from now on

std::string msg;
LOG(INFO) << "try to start doris_cloud";
LOG(INFO) << build_info();
std::cout << build_info() << std::endl;

if (!args.get<bool>(ARG_META_SERVICE) && !args.get<bool>(ARG_RECYCLER)) {
std::get<0>(args.args()[ARG_META_SERVICE]) = true;
std::get<0>(args.args()[ARG_RECYCLER]) = true;
LOG(INFO) << "meta_service and recycler are both not specified, "
"run doris_cloud as meta_service and recycler by default";
std::cout << "run doris_cloud as meta_service and recycler by default" << std::endl;
}

brpc::Server server;
brpc::FLAGS_max_body_size = config::brpc_max_body_size;
brpc::FLAGS_socket_max_unwritten_bytes = config::brpc_socket_max_unwritten_bytes;
Expand All @@ -238,19 +241,22 @@ int main(int argc, char** argv) {
return 1;
}
LOG(INFO) << "begin to init txn kv";
auto start_init_kv = steady_clock::now();
int ret = txn_kv->init();
if (ret != 0) {
LOG(WARNING) << "failed to init txnkv, ret=" << ret;
return 1;
}
LOG(INFO) << "successfully init txn kv";
end = steady_clock::now();
LOG(INFO) << "successfully init txn kv, elapsed milliseconds: "
<< duration_cast<milliseconds>(end - start_init_kv).count();

if (init_global_encryption_key_info_map(txn_kv.get()) != 0) {
LOG(WARNING) << "failed to init global encryption key map";
return -1;
}

std::unique_ptr<MetaServer> meta_server;
std::unique_ptr<MetaServer> meta_server; // meta-service
std::unique_ptr<Recycler> recycler;
std::thread periodiccally_log_thread;
std::mutex periodiccally_log_thread_lock;
Expand All @@ -269,7 +275,8 @@ int main(int argc, char** argv) {
msg = "meta-service started";
LOG(INFO) << msg;
std::cout << msg << std::endl;
} else if (args.get<bool>(ARG_RECYCLER)) {
}
if (args.get<bool>(ARG_RECYCLER)) {
recycler = std::make_unique<Recycler>(txn_kv);
int ret = recycler->start(&server);
if (ret != 0) {
Expand All @@ -284,15 +291,12 @@ int main(int argc, char** argv) {
auto periodiccally_log = [&]() {
while (periodiccally_log_thread_run) {
std::unique_lock<std::mutex> lck {periodiccally_log_thread_lock};
periodiccally_log_thread_cv.wait_for(
lck, std::chrono::milliseconds(config::periodically_log_ms));
periodiccally_log_thread_cv.wait_for(lck,
milliseconds(config::periodically_log_ms));
LOG(INFO) << "Periodically log for recycler";
}
};
periodiccally_log_thread = std::thread {periodiccally_log};
} else {
std::cerr << "cloud starts without doing anything and exits" << std::endl;
return -1;
}
// start service
brpc::ServerOptions options;
Expand All @@ -309,7 +313,11 @@ int main(int argc, char** argv) {
<< ", errmsg=" << strerror_r(errno, buf, 64) << ", port=" << port;
return -1;
}
LOG(INFO) << "successfully started brpc listening on port=" << port;
end = steady_clock::now();
msg = "successfully started brpc listening on port=" + std::to_string(port) +
" time_elapsed_ms=" + std::to_string(duration_cast<milliseconds>(end - start).count());
LOG(INFO) << msg;
std::cout << msg << std::endl;

server.RunUntilAskedToQuit(); // Wait for signals
server.ClearServices();
Expand All @@ -326,7 +334,7 @@ int main(int argc, char** argv) {
periodiccally_log_thread_run = false;
// immediately notify the log thread to quickly exit in case it block the
// whole procedure
periodiccally_log_thread_cv.notify_one();
periodiccally_log_thread_cv.notify_all();
}
periodiccally_log_thread.join();
}
Expand Down
2 changes: 2 additions & 0 deletions cloud/src/recycler/checker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ int Checker::start() {

// launch instance scanner
auto scanner_func = [this]() {
std::this_thread::sleep_for(
std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
while (!stopped()) {
std::vector<InstanceInfoPB> instances;
get_all_instances(txn_kv_.get(), instances);
Expand Down
5 changes: 5 additions & 0 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ Recycler::~Recycler() {
}

void Recycler::instance_scanner_callback() {
// sleep 60 seconds before scheduling for the launch procedure to complete:
// some bad hdfs connection may cause some log to stdout stderr
// which may pollute .out file and affect the script to check success
std::this_thread::sleep_for(
std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
while (!stopped()) {
std::vector<InstanceInfoPB> instances;
get_all_instances(txn_kv_.get(), instances);
Expand Down
2 changes: 1 addition & 1 deletion cloud/src/recycler/recycler_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ void RecyclerServiceImpl::http(::google::protobuf::RpcController* controller,
}

status_code = 404;
msg = "not found";
msg = "http path " + uri.path() + " not found, it may be not implemented";
response_body = msg;
}

Expand Down
1 change: 1 addition & 0 deletions cloud/test/recycler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ int main(int argc, char** argv) {

using namespace std::chrono;
current_time = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
config::recycler_sleep_before_scheduling_seconds = 0; // we dont have to wait in UT

::testing::InitGoogleTest(&argc, argv);
auto s3_producer_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
Expand Down
Loading