From e4edc61156f192dc535fd4c8669560a352498d3c Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Tue, 8 Oct 2024 11:40:40 +0800 Subject: [PATCH] [Enhancement](multi-catalog) Set hdfs native client logger to glog and redirect jvm stdout/stderr logger to jni.log. (#39540) ## Proposed changes [Enhancement] (multi-catalog) Set hdfs native client logger to glog and redirect jvm stdout/stderr logger to jni.log. - Set hdfs native client logger to glog with https://github.com/apache/doris-thirdparty/pull/236. - Redirect jvm stdout/stderr logger to `jni.log`. Co-authored-by: Mingyu Chen --- be/src/io/hdfs_builder.cpp | 61 +++++++++++++++++++ bin/start_be.sh | 4 +- bin/start_fe.sh | 4 +- .../common/classloader/ScannerLoader.java | 14 +++++ .../common/jni/utils/Log4jOutputStream.java | 44 +++++++++++++ 5 files changed, 123 insertions(+), 4 deletions(-) create mode 100644 fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/Log4jOutputStream.java diff --git a/be/src/io/hdfs_builder.cpp b/be/src/io/hdfs_builder.cpp index 945ef3ab02bd13..99ee89596ed9ac 100644 --- a/be/src/io/hdfs_builder.cpp +++ b/be/src/io/hdfs_builder.cpp @@ -34,7 +34,68 @@ namespace doris { +#ifdef USE_HADOOP_HDFS +void err_log_message(const char* fmt, ...) { + va_list args; + va_start(args, fmt); + + // First, call vsnprintf to get the required buffer size + int size = vsnprintf(nullptr, 0, fmt, args) + 1; // +1 for '\0' + if (size <= 0) { + LOG(ERROR) << "Error formatting log message, invalid size"; + va_end(args); + return; + } + + va_end(args); + va_start(args, fmt); // Reinitialize va_list + + // Allocate a buffer and format the string into it + std::vector buffer(size); + vsnprintf(buffer.data(), size, fmt, args); + + va_end(args); + + // Use glog to log the message + LOG(ERROR) << buffer.data(); +} + +void va_err_log_message(const char* fmt, va_list ap) { + va_list args_copy; + va_copy(args_copy, ap); + + // Call vsnprintf to get the required buffer size + int size = vsnprintf(nullptr, 0, fmt, args_copy) + 1; // +1 for '\0' + va_end(args_copy); // Release the copied va_list + + if (size <= 0) { + LOG(ERROR) << "Error formatting log message, invalid size"; + return; + } + + // Reinitialize va_list for the second vsnprintf call + va_copy(args_copy, ap); + + // Allocate a buffer and format the string into it + std::vector buffer(size); + vsnprintf(buffer.data(), size, fmt, args_copy); + + va_end(args_copy); + + // Use glog to log the message + LOG(ERROR) << buffer.data(); +} + +struct hdfsLogger logger = {.errLogMessage = err_log_message, + .vaErrLogMessage = va_err_log_message}; +#endif // #ifdef USE_HADOOP_HDFS + Status HDFSCommonBuilder::init_hdfs_builder() { +#ifdef USE_HADOOP_HDFS + static std::once_flag flag; + std::call_once(flag, []() { hdfsSetLogger(&logger); }); +#endif // #ifdef USE_HADOOP_HDFS + hdfs_builder = hdfsNewBuilder(); if (hdfs_builder == nullptr) { LOG(INFO) << "failed to init HDFSCommonBuilder, please check check be/conf/hdfs-site.xml"; diff --git a/bin/start_be.sh b/bin/start_be.sh index 5029fc98bf5664..a410912ea06c1d 100755 --- a/bin/start_be.sh +++ b/bin/start_be.sh @@ -282,7 +282,7 @@ fi for var in http_proxy HTTP_PROXY https_proxy HTTPS_PROXY; do if [[ -n ${!var} ]]; then - log "env '${var}' = '${!var}', need unset it using 'unset ${var}'" + echo "env '${var}' = '${!var}', need unset it using 'unset ${var}'" exit 1 fi done @@ -354,7 +354,7 @@ set_tcmalloc_heap_limit() { fi if [[ "${mem_limit_mb}" -gt "${total_mem_mb}" ]]; then - log "mem_limit is larger than the total memory of the server. ${mem_limit_mb} > ${total_mem_mb}" + echo "mem_limit is larger than the total memory of the server. ${mem_limit_mb} > ${total_mem_mb}" return 1 fi export TCMALLOC_HEAP_LIMIT_MB=${mem_limit_mb} diff --git a/bin/start_fe.sh b/bin/start_fe.sh index 67a6925fea1901..ac5971072c306c 100755 --- a/bin/start_fe.sh +++ b/bin/start_fe.sh @@ -194,12 +194,12 @@ java_version="$( )" if [[ "${java_version}" -eq 17 ]]; then if [[ -z "${JAVA_OPTS_FOR_JDK_17}" ]]; then - log "JAVA_OPTS_FOR_JDK_17 is not set in fe.conf" + echo "JAVA_OPTS_FOR_JDK_17 is not set in fe.conf" exit 1 fi final_java_opt="${JAVA_OPTS_FOR_JDK_17}" else - log "ERROR: The jdk_version is ${java_version}, must be 17." + echo "ERROR: The jdk_version is ${java_version}, must be 17." exit 1 fi log "Using Java version ${java_version}" diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java index 0fb9cfd6d126d1..bcfa0d17985980 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java @@ -18,13 +18,16 @@ package org.apache.doris.common.classloader; import org.apache.doris.common.jni.utils.ExpiringMap; +import org.apache.doris.common.jni.utils.Log4jOutputStream; import org.apache.doris.common.jni.utils.UdfClassCache; import com.google.common.collect.Streams; +import org.apache.log4j.Level; import org.apache.log4j.Logger; import java.io.File; import java.io.IOException; +import java.io.PrintStream; import java.io.UncheckedIOException; import java.net.MalformedURLException; import java.net.URL; @@ -54,6 +57,7 @@ public class ScannerLoader { * Load all classes from $DORIS_HOME/lib/java_extensions/* */ public void loadAllScannerJars() { + redirectStdStreamsToLog4j(); String basePath = System.getenv("DORIS_HOME"); File library = new File(basePath, "/lib/java_extensions/"); // TODO: add thread pool to load each scanner @@ -66,6 +70,16 @@ public void loadAllScannerJars() { }); } + private void redirectStdStreamsToLog4j() { + Logger outLogger = Logger.getLogger("stdout"); + PrintStream logPrintStream = new PrintStream(new Log4jOutputStream(outLogger, Level.INFO)); + System.setOut(logPrintStream); + + Logger errLogger = Logger.getLogger("stderr"); + PrintStream errorPrintStream = new PrintStream(new Log4jOutputStream(errLogger, Level.ERROR)); + System.setErr(errorPrintStream); + } + public static UdfClassCache getUdfClassLoader(String functionSignature) { return udfLoadedClasses.get(functionSignature); } diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/Log4jOutputStream.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/Log4jOutputStream.java new file mode 100644 index 00000000000000..bb4e4281ee1da2 --- /dev/null +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/Log4jOutputStream.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.jni.utils; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import java.io.OutputStream; + +public class Log4jOutputStream extends OutputStream { + private final Logger logger; + private final StringBuilder buffer = new StringBuilder(); + private final Level level; + + public Log4jOutputStream(Logger logger, Level level) { + this.logger = logger; + this.level = level; + } + + @Override + public void write(int b) { + if (b == '\n') { + logger.log(level, buffer.toString()); + buffer.setLength(0); + } else { + buffer.append((char) b); + } + } +}