Skip to content

Commit

Permalink
[chore](cloud) Support starting both meta-service and recycler within…
Browse files Browse the repository at this point in the history
… single process (#40223)

e.g. the following will start meta-service and recycler within single
process.
```
./bin/start.sh --daemon
```
the log file will be meta_service.INFO*

and it is the same effect as `./bin/start.sh --meta-service --recycler
--daemon`

doc PR apache/doris-website#1073
  • Loading branch information
gavinchou committed Sep 5, 2024
1 parent c5dc79c commit eda09de
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 35 deletions.
27 changes: 19 additions & 8 deletions cloud/script/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ fi

echo "LIBHDFS3_CONF=${LIBHDFS3_CONF}"

export JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:15000,dirty_decay_ms:15000,oversize_threshold:0,prof:true,prof_prefix:jeprof.out"
# to enable dump jeprof heap stats prodigally, change `prof:false` to `prof:true`
# to control the dump interval change `lg_prof_interval` to a specific value, it is pow/exponent of 2 in size of bytes, default 34 means 2 ** 34 = 16GB
# to control the dump path, change `prof_prefix` to a specific path, e.g. /doris_cloud/log/ms_, by default it dumps at the path where the start command called
export JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:5000,dirty_decay_ms:5000,oversize_threshold:0,prof_prefix:ms_,prof:false,lg_prof_interval:34"

if [[ "${RUN_VERSION}" -eq 1 ]]; then
"${bin}" --version
Expand All @@ -131,14 +134,22 @@ fi

mkdir -p "${DORIS_HOME}/log"
echo "starts ${process} with args: $*"
out_file=${DORIS_HOME}/log/${process}.out
if [[ "${RUN_DAEMON}" -eq 1 ]]; then
date >>"${DORIS_HOME}/log/${process}.out"
nohup "${bin}" "$@" >>"${DORIS_HOME}/log/${process}.out" 2>&1 &
# wait for log flush
sleep 1.5
tail -n10 "${DORIS_HOME}/log/${process}.out" | grep 'working directory' -B1 -A10
echo "please check process log for more details"
echo ""
# append 10 blank lines to ensure the following tail -n10 works correctly
printf "\n\n\n\n\n\n\n\n\n\n" >>"${out_file}"
echo "$(date +'%F %T') try to start ${process}" >>"${out_file}"
nohup "${bin}" "$@" >>"${out_file}" 2>&1 &
echo "wait and check ${process} start successfully"
sleep 3
tail -n10 "${out_file}" | grep 'successfully started brpc'
ret=$?
if [[ ${ret} -ne 0 ]]; then
echo "${process} may not start successfully please check process log for more details"
exit 1
fi
echo "${process} start successfully"
exit 0
elif [[ "${RUN_CONSOLE}" -eq 1 ]]; then
export DORIS_LOG_TO_STDERR=1
date
Expand Down
1 change: 1 addition & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ CONF_mInt32(scan_instances_interval_seconds, "60"); // 1min
CONF_mInt32(check_object_interval_seconds, "43200"); // 12hours

CONF_mInt64(check_recycle_task_interval_seconds, "600"); // 10min
CONF_mInt64(recycler_sleep_before_scheduling_seconds, "60");
// log a warning if a recycle task takes longer than this duration
CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h

Expand Down
60 changes: 34 additions & 26 deletions cloud/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ DECLARE_int64(socket_max_unwritten_bytes);
int main(int argc, char** argv) {
if (argc > 1) {
if (auto ret = args.parse(argc - 1, argv + 1); !ret.empty()) {
std::cerr << ret << std::endl;
std::cerr << "parse arguments error: " << ret << std::endl;
help();
return -1;
}
}

if (argc < 2 || args.get<bool>(ARG_HELP)) {
if (args.get<bool>(ARG_HELP)) {
help();
return 0;
}
Expand All @@ -177,21 +177,16 @@ int main(int argc, char** argv) {
return 0;
}

// FIXME(gavin): do we need to enable running both MS and recycler within
// single process
if (!(args.get<bool>(ARG_META_SERVICE) ^ args.get<bool>(ARG_RECYCLER))) {
std::cerr << "only one of --meta-service and --recycler must be specified" << std::endl;
return 1;
}

// There may be more roles to play
// There may be more roles to play in the future, if there are multi roles specified,
// use meta_service as the process name
std::string process_name = args.get<bool>(ARG_META_SERVICE) ? "meta_service"
: args.get<bool>(ARG_RECYCLER) ? "recycler"
: "";
if (process_name.empty()) {
std::cerr << "failed to determine prcess name with given args" << std::endl;
return 1;
}
: "meta_service";

using namespace std::chrono;

auto start = steady_clock::now();
auto end = start;

auto pid_file_fd_holder = gen_pidfile("doris_cloud");
if (pid_file_fd_holder == nullptr) {
Expand All @@ -215,11 +210,19 @@ int main(int argc, char** argv) {
}

// We can invoke glog from now on

std::string msg;
LOG(INFO) << "try to start doris_cloud";
LOG(INFO) << build_info();
std::cout << build_info() << std::endl;

if (!args.get<bool>(ARG_META_SERVICE) && !args.get<bool>(ARG_RECYCLER)) {
std::get<0>(args.args()[ARG_META_SERVICE]) = true;
std::get<0>(args.args()[ARG_RECYCLER]) = true;
LOG(INFO) << "meta_service and recycler are both not specified, "
"run doris_cloud as meta_service and recycler by default";
std::cout << "run doris_cloud as meta_service and recycler by default" << std::endl;
}

brpc::Server server;
brpc::FLAGS_max_body_size = config::brpc_max_body_size;
brpc::FLAGS_socket_max_unwritten_bytes = config::brpc_socket_max_unwritten_bytes;
Expand All @@ -238,19 +241,22 @@ int main(int argc, char** argv) {
return 1;
}
LOG(INFO) << "begin to init txn kv";
auto start_init_kv = steady_clock::now();
int ret = txn_kv->init();
if (ret != 0) {
LOG(WARNING) << "failed to init txnkv, ret=" << ret;
return 1;
}
LOG(INFO) << "successfully init txn kv";
end = steady_clock::now();
LOG(INFO) << "successfully init txn kv, elapsed milliseconds: "
<< duration_cast<milliseconds>(end - start_init_kv).count();

if (init_global_encryption_key_info_map(txn_kv.get()) != 0) {
LOG(WARNING) << "failed to init global encryption key map";
return -1;
}

std::unique_ptr<MetaServer> meta_server;
std::unique_ptr<MetaServer> meta_server; // meta-service
std::unique_ptr<Recycler> recycler;
std::thread periodiccally_log_thread;
std::mutex periodiccally_log_thread_lock;
Expand All @@ -269,7 +275,8 @@ int main(int argc, char** argv) {
msg = "meta-service started";
LOG(INFO) << msg;
std::cout << msg << std::endl;
} else if (args.get<bool>(ARG_RECYCLER)) {
}
if (args.get<bool>(ARG_RECYCLER)) {
recycler = std::make_unique<Recycler>(txn_kv);
int ret = recycler->start(&server);
if (ret != 0) {
Expand All @@ -284,15 +291,12 @@ int main(int argc, char** argv) {
auto periodiccally_log = [&]() {
while (periodiccally_log_thread_run) {
std::unique_lock<std::mutex> lck {periodiccally_log_thread_lock};
periodiccally_log_thread_cv.wait_for(
lck, std::chrono::milliseconds(config::periodically_log_ms));
periodiccally_log_thread_cv.wait_for(lck,
milliseconds(config::periodically_log_ms));
LOG(INFO) << "Periodically log for recycler";
}
};
periodiccally_log_thread = std::thread {periodiccally_log};
} else {
std::cerr << "cloud starts without doing anything and exits" << std::endl;
return -1;
}
// start service
brpc::ServerOptions options;
Expand All @@ -309,7 +313,11 @@ int main(int argc, char** argv) {
<< ", errmsg=" << strerror_r(errno, buf, 64) << ", port=" << port;
return -1;
}
LOG(INFO) << "successfully started brpc listening on port=" << port;
end = steady_clock::now();
msg = "successfully started brpc listening on port=" + std::to_string(port) +
" time_elapsed_ms=" + std::to_string(duration_cast<milliseconds>(end - start).count());
LOG(INFO) << msg;
std::cout << msg << std::endl;

server.RunUntilAskedToQuit(); // Wait for signals
server.ClearServices();
Expand All @@ -326,7 +334,7 @@ int main(int argc, char** argv) {
periodiccally_log_thread_run = false;
// immediately notify the log thread to quickly exit in case it block the
// whole procedure
periodiccally_log_thread_cv.notify_one();
periodiccally_log_thread_cv.notify_all();
}
periodiccally_log_thread.join();
}
Expand Down
2 changes: 2 additions & 0 deletions cloud/src/recycler/checker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ int Checker::start() {

// launch instance scanner
auto scanner_func = [this]() {
std::this_thread::sleep_for(
std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
while (!stopped()) {
std::vector<InstanceInfoPB> instances;
get_all_instances(txn_kv_.get(), instances);
Expand Down
5 changes: 5 additions & 0 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ Recycler::~Recycler() {
}

void Recycler::instance_scanner_callback() {
// sleep 60 seconds before scheduling for the launch procedure to complete:
// some bad hdfs connection may cause some log to stdout stderr
// which may pollute .out file and affect the script to check success
std::this_thread::sleep_for(
std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
while (!stopped()) {
std::vector<InstanceInfoPB> instances;
get_all_instances(txn_kv_.get(), instances);
Expand Down
2 changes: 1 addition & 1 deletion cloud/src/recycler/recycler_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ void RecyclerServiceImpl::http(::google::protobuf::RpcController* controller,
}

status_code = 404;
msg = "not found";
msg = "http path " + uri.path() + " not found, it may be not implemented";
response_body = msg;
}

Expand Down
1 change: 1 addition & 0 deletions cloud/test/recycler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ int main(int argc, char** argv) {

using namespace std::chrono;
current_time = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
config::recycler_sleep_before_scheduling_seconds = 0; // we dont have to wait in UT

::testing::InitGoogleTest(&argc, argv);
auto s3_producer_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
Expand Down

0 comments on commit eda09de

Please sign in to comment.