From 078cf7a0b5389a4284d5898092d18bb87a7f1d33 Mon Sep 17 00:00:00 2001 From: kaka11chen Date: Mon, 12 Aug 2024 15:09:19 +0800 Subject: [PATCH] [Enhancement](multi-catalog) Set hdfs native client logger to glog and redirect jvm stdout/stderr logger to jni.log. --- be/src/io/hdfs_builder.cpp | 61 +++++++++++++++++++ bin/start_be.sh | 4 +- bin/start_fe.sh | 4 +- .../common/classloader/ScannerLoader.java | 14 +++++ .../common/jni/utils/Log4jOutputStream.java | 44 +++++++++++++ 5 files changed, 123 insertions(+), 4 deletions(-) create mode 100644 fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/Log4jOutputStream.java diff --git a/be/src/io/hdfs_builder.cpp b/be/src/io/hdfs_builder.cpp index 945ef3ab02bd13..99ee89596ed9ac 100644 --- a/be/src/io/hdfs_builder.cpp +++ b/be/src/io/hdfs_builder.cpp @@ -34,7 +34,68 @@ namespace doris { +#ifdef USE_HADOOP_HDFS +void err_log_message(const char* fmt, ...) { + va_list args; + va_start(args, fmt); + + // First, call vsnprintf to get the required buffer size + int size = vsnprintf(nullptr, 0, fmt, args) + 1; // +1 for '\0' + if (size <= 0) { + LOG(ERROR) << "Error formatting log message, invalid size"; + va_end(args); + return; + } + + va_end(args); + va_start(args, fmt); // Reinitialize va_list + + // Allocate a buffer and format the string into it + std::vector buffer(size); + vsnprintf(buffer.data(), size, fmt, args); + + va_end(args); + + // Use glog to log the message + LOG(ERROR) << buffer.data(); +} + +void va_err_log_message(const char* fmt, va_list ap) { + va_list args_copy; + va_copy(args_copy, ap); + + // Call vsnprintf to get the required buffer size + int size = vsnprintf(nullptr, 0, fmt, args_copy) + 1; // +1 for '\0' + va_end(args_copy); // Release the copied va_list + + if (size <= 0) { + LOG(ERROR) << "Error formatting log message, invalid size"; + return; + } + + // Reinitialize va_list for the second vsnprintf call + va_copy(args_copy, ap); + + // Allocate a buffer and format the string into it + std::vector buffer(size); + vsnprintf(buffer.data(), size, fmt, args_copy); + + va_end(args_copy); + + // Use glog to log the message + LOG(ERROR) << buffer.data(); +} + +struct hdfsLogger logger = {.errLogMessage = err_log_message, + .vaErrLogMessage = va_err_log_message}; +#endif // #ifdef USE_HADOOP_HDFS + Status HDFSCommonBuilder::init_hdfs_builder() { +#ifdef USE_HADOOP_HDFS + static std::once_flag flag; + std::call_once(flag, []() { hdfsSetLogger(&logger); }); +#endif // #ifdef USE_HADOOP_HDFS + hdfs_builder = hdfsNewBuilder(); if (hdfs_builder == nullptr) { LOG(INFO) << "failed to init HDFSCommonBuilder, please check check be/conf/hdfs-site.xml"; diff --git a/bin/start_be.sh b/bin/start_be.sh index 01af80de0cbb18..8e4b8fa84e2e21 100755 --- a/bin/start_be.sh +++ b/bin/start_be.sh @@ -278,7 +278,7 @@ fi for var in http_proxy HTTP_PROXY https_proxy HTTPS_PROXY; do if [[ -n ${!var} ]]; then - log "env '${var}' = '${!var}', need unset it using 'unset ${var}'" + echo "env '${var}' = '${!var}', need unset it using 'unset ${var}'" exit 1 fi done @@ -350,7 +350,7 @@ set_tcmalloc_heap_limit() { fi if [[ "${mem_limit_mb}" -gt "${total_mem_mb}" ]]; then - log "mem_limit is larger than the total memory of the server. ${mem_limit_mb} > ${total_mem_mb}" + echo "mem_limit is larger than the total memory of the server. ${mem_limit_mb} > ${total_mem_mb}" return 1 fi export TCMALLOC_HEAP_LIMIT_MB=${mem_limit_mb} diff --git a/bin/start_fe.sh b/bin/start_fe.sh index 67a6925fea1901..ac5971072c306c 100755 --- a/bin/start_fe.sh +++ b/bin/start_fe.sh @@ -194,12 +194,12 @@ java_version="$( )" if [[ "${java_version}" -eq 17 ]]; then if [[ -z "${JAVA_OPTS_FOR_JDK_17}" ]]; then - log "JAVA_OPTS_FOR_JDK_17 is not set in fe.conf" + echo "JAVA_OPTS_FOR_JDK_17 is not set in fe.conf" exit 1 fi final_java_opt="${JAVA_OPTS_FOR_JDK_17}" else - log "ERROR: The jdk_version is ${java_version}, must be 17." + echo "ERROR: The jdk_version is ${java_version}, must be 17." exit 1 fi log "Using Java version ${java_version}" diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java index 2bbbd21838c9f2..59018da3fb425f 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java @@ -18,12 +18,15 @@ package org.apache.doris.common.classloader; import org.apache.doris.common.jni.utils.ExpiringMap; +import org.apache.doris.common.jni.utils.Log4jOutputStream; import com.google.common.collect.Streams; +import org.apache.log4j.Level; import org.apache.log4j.Logger; import java.io.File; import java.io.IOException; +import java.io.PrintStream; import java.io.UncheckedIOException; import java.net.MalformedURLException; import java.net.URL; @@ -53,6 +56,7 @@ public class ScannerLoader { * Load all classes from $DORIS_HOME/lib/java_extensions/* */ public void loadAllScannerJars() { + redirectStdStreamsToLog4j(); String basePath = System.getenv("DORIS_HOME"); File library = new File(basePath, "/lib/java_extensions/"); // TODO: add thread pool to load each scanner @@ -65,6 +69,16 @@ public void loadAllScannerJars() { }); } + private void redirectStdStreamsToLog4j() { + Logger outLogger = Logger.getLogger("stdout"); + PrintStream logPrintStream = new PrintStream(new Log4jOutputStream(outLogger, Level.INFO)); + System.setOut(logPrintStream); + + Logger errLogger = Logger.getLogger("stderr"); + PrintStream errorPrintStream = new PrintStream(new Log4jOutputStream(errLogger, Level.ERROR)); + System.setErr(errorPrintStream); + } + public static ClassLoader getUdfClassLoader(String functionSignature) { return udfLoadedClasses.get(functionSignature); } diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/Log4jOutputStream.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/Log4jOutputStream.java new file mode 100644 index 00000000000000..bb4e4281ee1da2 --- /dev/null +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/Log4jOutputStream.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.jni.utils; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import java.io.OutputStream; + +public class Log4jOutputStream extends OutputStream { + private final Logger logger; + private final StringBuilder buffer = new StringBuilder(); + private final Level level; + + public Log4jOutputStream(Logger logger, Level level) { + this.logger = logger; + this.level = level; + } + + @Override + public void write(int b) { + if (b == '\n') { + logger.log(level, buffer.toString()); + buffer.setLength(0); + } else { + buffer.append((char) b); + } + } +}