From a3d4c7f6bff4d5d53a42a6d994767d6f1a24c8c6 Mon Sep 17 00:00:00 2001 From: benjobs Date: Fri, 1 Sep 2023 09:31:14 -0500 Subject: [PATCH] [Improve] FileUtils minor improvement (#2981) * [Improve] FileUtils minor improvement * minor improvement * minor improvement * Implicits improvement * check style improvement * minor improvement * minor bug fixed. --- .../streampark/common/conf/ConfigOption.scala | 2 +- .../common/conf/InternalConfigHolder.scala | 2 +- .../streampark/common/conf/Workspace.scala | 2 +- .../streampark/common/util/FileUtils.scala | 147 +++++++++++++++--- .../streampark/common/util/HadoopUtils.scala | 5 +- .../common/util/ImplicitsUtils.scala | 62 ++++++++ .../streampark/common/util/Logger.scala | 2 +- .../apache/streampark/common/util/Utils.scala | 79 +++------- .../console/base/util/FileUtils.java | 107 ------------- .../service/impl/AppBuildPipeServiceImpl.java | 2 +- .../service/impl/ApplicationServiceImpl.java | 14 +- .../core/service/impl/ProjectServiceImpl.java | 2 +- .../console/base/util/FileUtilsTest.java | 2 + .../flink/client/trait/YarnClientTrait.scala | 37 ++--- .../kubernetes/KubernetesRetriever.scala | 51 +++--- .../helper/KubernetesDeploymentHelper.scala | 136 ++++++++-------- .../ingress/IngressController.scala | 8 +- .../ingress/IngressStrategyV1.scala | 15 +- .../ingress/IngressStrategyV1beta1.scala | 14 +- .../FlinkYarnApplicationBuildPipeline.scala | 4 +- .../flink/proxy/FlinkShimsProxy.scala | 16 +- .../core/FlinkStreamingInitializer.scala | 2 +- 22 files changed, 372 insertions(+), 339 deletions(-) create mode 100644 streampark-common/src/main/scala/org/apache/streampark/common/util/ImplicitsUtils.scala delete mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigOption.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigOption.scala index 500fcf3f6e..22ac7de647 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigOption.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigOption.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.common.conf -import org.apache.streampark.common.util.Utils.StringCasts +import org.apache.streampark.common.util.ImplicitsUtils._ import java.util.Properties diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala index 73c23a4db3..5f00c20880 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala @@ -18,7 +18,7 @@ package org.apache.streampark.common.conf import org.apache.streampark.common.util.{Logger, SystemPropertyUtils} -import org.apache.streampark.common.util.Utils.StringCasts +import org.apache.streampark.common.util.ImplicitsUtils._ import javax.annotation.{Nonnull, Nullable} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala index d914780a1a..8f3e7b10c0 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala @@ -18,7 +18,7 @@ package org.apache.streampark.common.conf import org.apache.streampark.common.enums.StorageType import org.apache.streampark.common.util.{HdfsUtils, SystemPropertyUtils} -import org.apache.streampark.common.util.Utils.StringCasts +import org.apache.streampark.common.util.ImplicitsUtils._ import java.net.URI diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala index 719715e22d..1522a4e8a6 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala @@ -16,10 +16,16 @@ */ package org.apache.streampark.common.util +import org.apache.streampark.common.util.ImplicitsUtils._ + import java.io._ import java.net.URL +import java.nio.ByteBuffer +import java.nio.channels.Channels +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} import java.util -import java.util.Scanner +import java.util.stream.Collectors import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable @@ -44,12 +50,12 @@ object FileUtils { if (input == null) { throw new RuntimeException("The inputStream can not be null") } - Utils.using(input) { - in => + input.autoClose( + in => { val b = new Array[Byte](4) in.read(b, 0, b.length) bytesToHexString(b) - } == "504B0304" + }) == "504B0304" } def isJarFileType(file: File): Boolean = { @@ -73,12 +79,6 @@ object FileUtils { s"[StreamPark] Failed to create directory within $TEMP_DIR_ATTEMPTS attempts (tried $baseName 0 to $baseName ${TEMP_DIR_ATTEMPTS - 1})") } - def exists(path: String): Unit = { - require( - path != null && path.nonEmpty && new File(path).exists(), - s"[StreamPark] FileUtils.exists: file $path is not exist!") - } - def getPathFromEnv(env: String): String = { val path = Option(System.getenv(env)).getOrElse(System.getProperty(env)) require( @@ -115,6 +115,14 @@ object FileUtils { } } + def exists(file: Serializable): Boolean = { + file match { + case null => false + case f: File => f.exists() + case p => new File(p.toString).exists() + } + } + def equals(file1: File, file2: File): Boolean = { (file1, file2) match { case (a, b) if a == null || b == null => false @@ -143,20 +151,115 @@ object FileUtils { } @throws[IOException] - def readString(file: File): String = { - require(file != null && file.isFile) - val reader = new FileReader(file) - val scanner = new Scanner(reader) - val buffer = new mutable.StringBuilder() - if (scanner.hasNextLine) { - buffer.append(scanner.nextLine()) + def readInputStream(in: InputStream, array: Array[Byte]): Unit = { + in.autoClose( + is => { + var toRead = array.length + var ret = 0 + var off = 0 + while (toRead > 0) { + ret = is.read(array, off, toRead) + if (ret < 0) throw new IOException("Bad inputStream, premature EOF") + toRead -= ret + off += ret + } + }) + } + + @throws[IOException] + def readFile(file: File): String = { + if (file.length >= Int.MaxValue) { + throw new IOException("Too large file, unexpected!") + } else { + val len = file.length + val array = new Array[Byte](len.toInt) + Files + .newInputStream(file.toPath) + .autoClose( + is => { + readInputStream(is, array) + new String(array, StandardCharsets.UTF_8) + }) } - while (scanner.hasNextLine) { - buffer.append("\r\n") - buffer.append(scanner.nextLine()) + } + + @throws[IOException] + def writeFile(content: String, file: File): Unit = { + val outputStream = Files.newOutputStream(file.toPath) + val channel = Channels.newChannel(outputStream) + val buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)) + channel.write(buffer) + Utils.close(channel, outputStream) + } + + @throws[IOException] + def readEndOfFile(file: File, maxSize: Long): Array[Byte] = { + var readSize = maxSize + new RandomAccessFile(file, "r").autoClose( + raFile => { + if (raFile.length > maxSize) { + raFile.seek(raFile.length - maxSize) + } else if (raFile.length < maxSize) { + readSize = raFile.length.toInt + } + val fileContent = new Array[Byte](readSize.toInt) + raFile.read(fileContent) + fileContent + }) + } + + /** + * Read the content of a file from a specified offset. + * + * @param file + * The file to read from + * @param startOffset + * The offset from where to start reading the file + * @param maxSize + * The maximum size of the file to read + * @return + * The content of the file as a byte array + * @throws IOException + * if an I/O error occurs while reading the file + * @throws IllegalArgumentException + * if the startOffset is greater than the file length + */ + @throws[IOException] + def readFileFromOffset(file: File, startOffset: Long, maxSize: Long): Array[Byte] = { + if (file.length < startOffset) { + throw new IllegalArgumentException( + s"The startOffset $startOffset is great than the file length ${file.length}") + } + new RandomAccessFile(file, "r").autoClose( + raFile => { + val readSize = Math.min(maxSize, file.length - startOffset) + raFile.seek(startOffset) + val fileContent = new Array[Byte](readSize.toInt) + raFile.read(fileContent) + fileContent + }) + } + + /** + * Roll View Log. + * + * @param path + * The file path. + * @param offset + * The offset. + * @param limit + * The limit. + * @return + * The content of the file. + */ + def tailOf(path: String, offset: Int, limit: Int): String = try { + val file = new File(path) + if (file.exists && file.isFile) { + Files + .lines(Paths.get(path)) + .autoClose(stream => stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n"))) } - Utils.close(scanner, reader) - buffer.toString() + null } } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala index b9823d8f07..715afa00da 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala @@ -137,7 +137,10 @@ object HadoopUtils extends Logger { def getConfigurationFromHadoopConfDir(confDir: String = hadoopConfDir): Configuration = { if (!configurationCache.containsKey(confDir)) { - FileUtils.exists(confDir) + if (!FileUtils.exists(confDir)) { + throw new ExceptionInInitializerError( + s"[StreamPark] hadoop conf file " + confDir + " is not exist!") + } val hadoopConfDir = new File(confDir) val confName = List("hdfs-default.xml", "core-site.xml", "hdfs-site.xml", "yarn-site.xml") val files = diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/ImplicitsUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/ImplicitsUtils.scala new file mode 100644 index 0000000000..cfcb951079 --- /dev/null +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/ImplicitsUtils.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.streampark.common.util + +import org.apache.streampark.common.util.Utils.close + +import java.lang.{Boolean => JavaBool, Byte => JavaByte, Double => JavaDouble, Float => JavaFloat, Integer => JavaInt, Long => JavaLong, Short => JavaShort} + +object ImplicitsUtils { + + implicit class AutoCloseImplicits[T <: AutoCloseable](autoCloseable: T) { + + implicit def autoClose[R](func: T => R)(implicit excFunc: Throwable => R = null): R = { + try { + func(autoCloseable) + } catch { + case e: Throwable if excFunc != null => excFunc(e) + } finally { + close(autoCloseable) + } + } + + } + + implicit class StringImplicits(v: String) { + def cast[T](classType: Class[_]): T = { + classType match { + case c if c == classOf[String] => v.asInstanceOf[T] + case c if c == classOf[Byte] => v.toByte.asInstanceOf[T] + case c if c == classOf[Int] => v.toInt.asInstanceOf[T] + case c if c == classOf[Long] => v.toLong.asInstanceOf[T] + case c if c == classOf[Float] => v.toFloat.asInstanceOf[T] + case c if c == classOf[Double] => v.toDouble.asInstanceOf[T] + case c if c == classOf[Short] => v.toShort.asInstanceOf[T] + case c if c == classOf[Boolean] => v.toBoolean.asInstanceOf[T] + case c if c == classOf[JavaByte] => v.toByte.asInstanceOf[T] + case c if c == classOf[JavaInt] => JavaInt.valueOf(v).asInstanceOf[T] + case c if c == classOf[JavaLong] => JavaLong.valueOf(v).asInstanceOf[T] + case c if c == classOf[JavaFloat] => JavaFloat.valueOf(v).asInstanceOf[T] + case c if c == classOf[JavaDouble] => JavaDouble.valueOf(v).asInstanceOf[T] + case c if c == classOf[JavaShort] => JavaShort.valueOf(v).asInstanceOf[T] + case c if c == classOf[JavaBool] => JavaBool.valueOf(v).asInstanceOf[T] + case _ => + throw new IllegalArgumentException(s"Unsupported type: $classType") + } + } + } +} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala index 583fc91f22..0b97498b17 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala @@ -140,7 +140,7 @@ private[this] object LoggerFactory extends LoggerFactoryBinder { val configurator = new JoranConfigurator() configurator.setContext(loggerContext) val text = FileUtils - .readString(new File(path)) + .readFile(new File(path)) .replaceAll("org.slf4j", s"$shadedPackage.org.slf4j") .replaceAll("ch.qos.logback", s"$shadedPackage.ch.qos.logback") .replaceAll("org.apache.log4j", s"$shadedPackage.org.apache.log4j") diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala index 98ec6ce6c6..30534cf5da 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala @@ -16,10 +16,11 @@ */ package org.apache.streampark.common.util +import org.apache.streampark.common.util.ImplicitsUtils._ + import org.apache.commons.lang3.StringUtils -import java.io.{BufferedInputStream, File, FileInputStream, IOException, PrintWriter, StringWriter} -import java.lang.{Boolean => JavaBool, Byte => JavaByte, Double => JavaDouble, Float => JavaFloat, Integer => JavaInt, Long => JavaLong, Short => JavaShort} +import java.io._ import java.net.URL import java.util.{jar, Collection => JavaCollection, Map => JavaMap, Properties, UUID} import java.util.jar.{JarFile, JarInputStream} @@ -107,35 +108,6 @@ object Utils { /** if any blank strings exist */ def isAnyBank(items: String*): Boolean = items == null || items.exists(StringUtils.isBlank) - /* - * Mimicking the try-with-resource syntax of Java-8+ - */ - def using[R, T <: AutoCloseable](handle: T)(func: T => R)(implicit - excFunc: Throwable => R = null): R = { - try { - func(handle) - } catch { - case e: Throwable if excFunc != null => excFunc(e) - } finally { - if (handle != null) { - handle.close() - } - } - } - - def close(closeable: AutoCloseable*)(implicit func: Throwable => Unit = null): Unit = { - closeable.foreach( - c => { - try { - if (c != null) { - c.close() - } - } catch { - case e: Throwable if func != null => func(e) - } - }) - } - /** * calculate the percentage of num1 / num2, the result range from 0 to 100, with one small digit * reserve. @@ -159,38 +131,31 @@ object Utils { else { try { val stm = new StringWriter - val wrt = new PrintWriter(stm) - e.printStackTrace(wrt) - wrt.close() - stm.toString + new PrintWriter(stm).autoClose { + writer => + e.printStackTrace(writer) + stm.toString + } } catch { case _: Throwable => e.getClass.getName + " (error while printing stack trace)" } } } - implicit class StringCasts(v: String) { - def cast[T](classType: Class[_]): T = { - classType match { - case c if c == classOf[String] => v.asInstanceOf[T] - case c if c == classOf[Byte] => v.toByte.asInstanceOf[T] - case c if c == classOf[Int] => v.toInt.asInstanceOf[T] - case c if c == classOf[Long] => v.toLong.asInstanceOf[T] - case c if c == classOf[Float] => v.toFloat.asInstanceOf[T] - case c if c == classOf[Double] => v.toDouble.asInstanceOf[T] - case c if c == classOf[Short] => v.toShort.asInstanceOf[T] - case c if c == classOf[Boolean] => v.toBoolean.asInstanceOf[T] - case c if c == classOf[JavaByte] => v.toByte.asInstanceOf[T] - case c if c == classOf[JavaInt] => JavaInt.valueOf(v).asInstanceOf[T] - case c if c == classOf[JavaLong] => JavaLong.valueOf(v).asInstanceOf[T] - case c if c == classOf[JavaFloat] => JavaFloat.valueOf(v).asInstanceOf[T] - case c if c == classOf[JavaDouble] => JavaDouble.valueOf(v).asInstanceOf[T] - case c if c == classOf[JavaShort] => JavaShort.valueOf(v).asInstanceOf[T] - case c if c == classOf[JavaBool] => JavaBool.valueOf(v).asInstanceOf[T] - case _ => - throw new IllegalArgumentException(s"Unsupported type: $classType") - } - } + def close(closeable: AutoCloseable*)(implicit func: Throwable => Unit = null): Unit = { + closeable.foreach( + c => { + try { + if (c != null) { + if (c.isInstanceOf[Flushable]) { + c.asInstanceOf[Flushable].flush() + } + c.close() + } + } catch { + case e: Throwable if func != null => func(e) + } + }) } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java deleted file mode 100644 index 4aaa624f21..0000000000 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.console.base.util; - -import org.apache.streampark.console.base.exception.ApiDetailException; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** The file utils. */ -public class FileUtils { - - private FileUtils() {} - - /** - * Reads the last portion of a file as a byte array. - * - * @param file the file to read - * @param maxSize the maximum number of bytes to read from the end of the file - * @return the byte array containing the content read from the file - * @throws IOException if an I/O error occurs - */ - public static byte[] readEndOfFile(File file, long maxSize) throws IOException { - long readSize = maxSize; - byte[] fileContent; - try (RandomAccessFile raFile = new RandomAccessFile(file, "r")) { - if (raFile.length() > maxSize) { - raFile.seek(raFile.length() - maxSize); - } else if (raFile.length() < maxSize) { - readSize = (int) raFile.length(); - } - fileContent = new byte[(int) readSize]; - raFile.read(fileContent); - } - return fileContent; - } - - /** - * Read the content of a file from a specified offset. - * - * @param file The file to read from - * @param startOffset The offset from where to start reading the file - * @param maxSize The maximum size of the file to read - * @return The content of the file as a byte array - * @throws IOException if an I/O error occurs while reading the file - * @throws IllegalArgumentException if the startOffset is greater than the file length - */ - public static byte[] readFileFromOffset(File file, long startOffset, long maxSize) - throws IOException { - if (file.length() < startOffset) { - throw new IllegalArgumentException( - String.format( - "The startOffset %s is great than the file length %s", startOffset, file.length())); - } - byte[] fileContent; - try (RandomAccessFile raFile = new RandomAccessFile(file, "r")) { - long readSize = Math.min(maxSize, file.length() - startOffset); - raFile.seek(startOffset); - fileContent = new byte[(int) readSize]; - raFile.read(fileContent); - } - return fileContent; - } - - /** - * Roll View Log. - * - * @param path The file path. - * @param offset The offset. - * @param limit The limit. - * @return The content of the file. - * @throws ApiDetailException if there's an error rolling the view log. - */ - public static String rollViewLog(String path, int offset, int limit) { - try { - File file = new File(path); - if (file.exists() && file.isFile()) { - try (Stream stream = Files.lines(Paths.get(path))) { - return stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n")); - } - } - return null; - } catch (Exception e) { - throw new ApiDetailException("roll view log error: " + e); - } - } -} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index f9017a7249..a20f5a08c4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -424,7 +424,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId()); String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app); - if (!new File(flinkUserJar).exists()) { + if (!FileUtils.exists(flinkUserJar)) { Resource resource = resourceService.findByResourceName(app.getTeamId(), app.getJar()); if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) { flinkUserJar = resource.getFilePath(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 9e970408ff..c76540eb4b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -29,6 +29,7 @@ import org.apache.streampark.common.fs.LfsOperator; import org.apache.streampark.common.util.CompletableFutureUtils; import org.apache.streampark.common.util.DeflaterUtils; +import org.apache.streampark.common.util.FileUtils; import org.apache.streampark.common.util.HadoopUtils; import org.apache.streampark.common.util.PropertiesUtils; import org.apache.streampark.common.util.ThreadUtils; @@ -97,7 +98,6 @@ import org.apache.streampark.flink.packer.pipeline.BuildResult; import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.CoreOptions; @@ -129,7 +129,6 @@ import java.io.IOException; import java.io.Serializable; import java.net.URI; -import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Base64; import java.util.Collection; @@ -619,9 +618,8 @@ public String k8sStartLog(Long id, Integer offset, Integer limit) throws Excepti if (!future.isDone()) { future.cancel(true); } - if (path != null) { - return org.apache.streampark.console.base.util.FileUtils.rollViewLog( - path, offset, limit); + if (FileUtils.exists(path)) { + return FileUtils.tailOf(path, offset, limit); } return null; }) @@ -738,7 +736,7 @@ public boolean create(Application appParam) { jarPath = resource.getFilePath(); } } - appParam.setJarCheckSum(FileUtils.checksumCRC32(new File(jarPath))); + appParam.setJarCheckSum(org.apache.commons.io.FileUtils.checksumCRC32(new File(jarPath))); } if (save(appParam)) { @@ -866,7 +864,7 @@ public boolean update(Application appParam) { File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar()); if (jarFile.exists()) { try { - long checkSum = FileUtils.checksumCRC32(jarFile); + long checkSum = org.apache.commons.io.FileUtils.checksumCRC32(jarFile); if (!Objects.equals(checkSum, application.getJarCheckSum())) { application.setBuild(true); } @@ -1144,7 +1142,7 @@ public void clean(Application appParam) { @Override public String readConf(Application appParam) throws IOException { File file = new File(appParam.getConfig()); - String conf = FileUtils.readFileToString(file, StandardCharsets.UTF_8); + String conf = FileUtils.readFile(file); return Base64.getEncoder().encodeToString(conf.getBytes()); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java index e568d976b3..d48228706e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java @@ -21,6 +21,7 @@ import org.apache.streampark.common.conf.InternalConfigHolder; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.util.CompletableFutureUtils; +import org.apache.streampark.common.util.FileUtils; import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.ResponseCode; @@ -28,7 +29,6 @@ import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; -import org.apache.streampark.console.base.util.FileUtils; import org.apache.streampark.console.base.util.GZipUtils; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.Project; diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java index 57db8581ce..d310833278 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java @@ -17,6 +17,8 @@ package org.apache.streampark.console.base.util; +import org.apache.streampark.common.util.FileUtils; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala index 0ce7cb9c78..6cc667c6e5 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala @@ -17,6 +17,7 @@ package org.apache.streampark.flink.client.`trait` +import org.apache.streampark.common.util.ImplicitsUtils._ import org.apache.streampark.common.util.Utils import org.apache.streampark.flink.client.bean._ @@ -43,25 +44,25 @@ trait YarnClientTrait extends FlinkClientTrait { flinkConf: Configuration, actionFunc: (JobID, ClusterClient[_]) => O): O = { - Utils.using { - flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId) - val clusterClientFactory = new YarnClusterClientFactory - val applicationId = clusterClientFactory.getClusterId(flinkConf) - if (applicationId == null) { - throw new FlinkException( - "[StreamPark] getClusterClient error. No cluster id was specified. Please specify a cluster to which you would like to connect.") - } - val clusterDescriptor = clusterClientFactory.createClusterDescriptor(flinkConf) - clusterDescriptor.retrieve(applicationId).getClusterClient - } { - client => - Try(actionFunc(getJobID(request.jobId), client)).recover { - case e => - throw new FlinkException( - s"[StreamPark] Do ${request.getClass.getSimpleName} for the job ${request.jobId} failed. " + - s"detail: ${Utils.stringifyException(e)}"); - }.get + flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId) + val clusterClientFactory = new YarnClusterClientFactory + val applicationId = clusterClientFactory.getClusterId(flinkConf) + if (applicationId == null) { + throw new FlinkException( + "[StreamPark] getClusterClient error. No cluster id was specified. Please specify a cluster to which you would like to connect.") } + val clusterDescriptor = clusterClientFactory.createClusterDescriptor(flinkConf) + clusterDescriptor + .retrieve(applicationId) + .getClusterClient + .autoClose( + client => + Try(actionFunc(getJobID(request.jobId), client)).recover { + case e => + throw new FlinkException( + s"[StreamPark] Do ${request.getClass.getSimpleName} for the job ${request.jobId} failed. " + + s"detail: ${Utils.stringifyException(e)}"); + }.get) } override def doTriggerSavepoint( diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala index 01b86f1ca8..7418e8c50c 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala @@ -18,8 +18,8 @@ package org.apache.streampark.flink.kubernetes import org.apache.streampark.common.conf.ConfigConst -import org.apache.streampark.common.util.{Logger, Utils} -import org.apache.streampark.common.util.Utils.using +import org.apache.streampark.common.util.ImplicitsUtils._ +import org.apache.streampark.common.util.Logger import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode import org.apache.streampark.flink.kubernetes.ingress.IngressController import org.apache.streampark.flink.kubernetes.model.ClusterKey @@ -108,32 +108,33 @@ object KubernetesRetriever extends Logger { * deployment namespace */ def isDeploymentExists(name: String, namespace: String): Boolean = { - using(KubernetesRetriever.newK8sClient()) { - client => - client - .apps() - .deployments() - .inNamespace(namespace) - .withLabel("type", ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL) - .list() - .getItems - .asScala - .exists(e => e.getMetadata.getName == name) - }(_ => false) + KubernetesRetriever + .newK8sClient() + .autoClose( + client => + client + .apps() + .deployments() + .inNamespace(namespace) + .withLabel("type", ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL) + .list() + .getItems + .asScala + .exists(e => e.getMetadata.getName == name))(_ => false) } /** retrieve flink jobManager rest url */ def retrieveFlinkRestUrl(clusterKey: ClusterKey): Option[String] = { - Utils.using( - KubernetesRetriever - .newFinkClusterClient(clusterKey.clusterId, clusterKey.namespace, clusterKey.executeMode) - .getOrElse(return None)) { - client => - val url = - IngressController.ingressUrlAddress(clusterKey.namespace, clusterKey.clusterId, client) - logger.info(s"retrieve flink jobManager rest url: $url") - client.close() - Some(url) - } + KubernetesRetriever + .newFinkClusterClient(clusterKey.clusterId, clusterKey.namespace, clusterKey.executeMode) + .getOrElse(return None) + .autoClose( + client => { + val url = + IngressController.ingressUrlAddress(clusterKey.namespace, clusterKey.clusterId, client) + logger.info(s"retrieve flink jobManager rest url: $url") + Some(url) + }) } + } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala index 8e622fe048..ff2dc7161e 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala @@ -18,7 +18,7 @@ package org.apache.streampark.flink.kubernetes.helper import org.apache.streampark.common.util.{Logger, SystemPropertyUtils} -import org.apache.streampark.common.util.Utils.using +import org.apache.streampark.common.util.ImplicitsUtils._ import org.apache.streampark.flink.kubernetes.KubernetesRetriever import com.google.common.base.Charsets @@ -34,25 +34,26 @@ import scala.util.{Success, Try} object KubernetesDeploymentHelper extends Logger { private[this] def getPods(nameSpace: String, deploymentName: String): List[Pod] = { - using(KubernetesRetriever.newK8sClient()) { - client => - Try { - client.pods - .inNamespace(nameSpace) - .withLabels { - client.apps.deployments - .inNamespace(nameSpace) - .withName(deploymentName) - .get - .getSpec - .getSelector - .getMatchLabels - } - .list - .getItems - .toList - }.getOrElse(List.empty[Pod]) - } + KubernetesRetriever + .newK8sClient() + .autoClose( + client => + Try { + client.pods + .inNamespace(nameSpace) + .withLabels { + client.apps.deployments + .inNamespace(nameSpace) + .withName(deploymentName) + .get + .getSpec + .getSelector + .getMatchLabels + } + .list + .getItems + .toList + }.getOrElse(List.empty[Pod])) } def getDeploymentStatusChanges(nameSpace: String, deploymentName: String): Boolean = { @@ -74,16 +75,17 @@ object KubernetesDeploymentHelper extends Logger { } def deleteTaskDeployment(nameSpace: String, deploymentName: String): Boolean = { - using(KubernetesRetriever.newK8sClient()) { - client => - Try { - val r = client.apps.deployments - .inNamespace(nameSpace) - .withName(deploymentName) - .delete - Boolean.unbox(r) - }.getOrElse(false) - } + KubernetesRetriever + .newK8sClient() + .autoClose( + client => + Try { + val r = client.apps.deployments + .inNamespace(nameSpace) + .withName(deploymentName) + .delete + Boolean.unbox(r) + }.getOrElse(false)) } def isTheK8sConnectionNormal(): Boolean = { @@ -96,47 +98,51 @@ object KubernetesDeploymentHelper extends Logger { } def watchDeploymentLog(nameSpace: String, jobName: String, jobId: String): String = { - using(KubernetesRetriever.newK8sClient()) { - client => - val path = KubernetesDeploymentHelper.getJobLog(jobId) - val file = new File(path) - val log = client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog - Files.asCharSink(file, Charsets.UTF_8).write(log) - path - } - } - - def watchPodTerminatedLog(nameSpace: String, jobName: String, jobId: String): String = { - using(KubernetesRetriever.newK8sClient()) { - client => - Try { - val podName = getPods(nameSpace, jobName).head.getMetadata.getName - val path = KubernetesDeploymentHelper.getJobErrorLog(jobId) + KubernetesRetriever + .newK8sClient() + .autoClose( + client => { + val path = KubernetesDeploymentHelper.getJobLog(jobId) val file = new File(path) - val log = client.pods - .inNamespace(nameSpace) - .withName(podName) - .terminated() - .withPrettyOutput - .getLog + val log = client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog Files.asCharSink(file, Charsets.UTF_8).write(log) path - }.getOrElse(null) - }(error => throw error) + }) + } + + def watchPodTerminatedLog(nameSpace: String, jobName: String, jobId: String): String = { + KubernetesRetriever + .newK8sClient() + .autoClose( + client => + Try { + val podName = getPods(nameSpace, jobName).head.getMetadata.getName + val path = KubernetesDeploymentHelper.getJobErrorLog(jobId) + val file = new File(path) + val log = client.pods + .inNamespace(nameSpace) + .withName(podName) + .terminated() + .withPrettyOutput + .getLog + Files.asCharSink(file, Charsets.UTF_8).write(log) + path + }.getOrElse(null))(error => throw error) } def deleteTaskConfigMap(nameSpace: String, deploymentName: String): Boolean = { - using(KubernetesRetriever.newK8sClient()) { - client => - Try { - val r = client - .configMaps() - .inNamespace(nameSpace) - .withLabel("app", deploymentName) - .delete - Boolean.unbox(r) - }.getOrElse(false) - } + KubernetesRetriever + .newK8sClient() + .autoClose( + client => + Try { + val r = client + .configMaps() + .inNamespace(nameSpace) + .withLabel("app", deploymentName) + .delete + Boolean.unbox(r) + }.getOrElse(false)) } private[kubernetes] def getJobLog(jobId: String): String = { diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala index d443a22bdd..b8fc0e817a 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala @@ -17,8 +17,8 @@ package org.apache.streampark.flink.kubernetes.ingress +import org.apache.streampark.common.util.ImplicitsUtils._ import org.apache.streampark.common.util.Logger -import org.apache.streampark.common.util.Utils.using import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.flink.client.program.ClusterClient @@ -30,15 +30,15 @@ object IngressController extends Logger { private[this] val VERSION_REGEXP = "(\\d+\\.\\d+)".r private lazy val ingressStrategy: IngressStrategy = { - using(new DefaultKubernetesClient()) { - client => + new DefaultKubernetesClient().autoClose( + client => { val version = VERSION_REGEXP.findFirstIn(client.getVersion.getGitVersion).get.toDouble if (version >= 1.19) { new IngressStrategyV1() } else { new IngressStrategyV1beta1() } - } + }) } def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = { diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala index c5c2fdd0b1..b8346cc1dc 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.kubernetes.ingress -import org.apache.streampark.common.util.Utils +import org.apache.streampark.common.util.ImplicitsUtils._ import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder import io.fabric8.kubernetes.client.DefaultKubernetesClient @@ -34,7 +34,7 @@ class IngressStrategyV1 extends IngressStrategy { clusterId: String, clusterClient: ClusterClient[_]): String = { - Utils.using(new DefaultKubernetesClient) { + new DefaultKubernetesClient().autoClose( client => Try { Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get) @@ -45,14 +45,12 @@ class IngressStrategyV1 extends IngressStrategy { }.recover { case e => throw new RuntimeException(s"[StreamPark] get ingressUrlAddress error: $e") - }.get - } - + }.get) } override def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = { - Utils.using(new DefaultKubernetesClient) { - client => + new DefaultKubernetesClient().autoClose( + client => { val ownerReference = getOwnerReference(nameSpace, clusterId, client) val ingress = new IngressBuilder() .withNewMetadata() @@ -94,6 +92,7 @@ class IngressStrategyV1 extends IngressStrategy { .endSpec() .build() client.network.v1.ingresses().inNamespace(nameSpace).create(ingress) - } + }) } + } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala index 8cbc276e49..a5ca4258ec 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.kubernetes.ingress -import org.apache.streampark.common.util.Utils +import org.apache.streampark.common.util.ImplicitsUtils._ import io.fabric8.kubernetes.api.model.IntOrString import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder @@ -35,7 +35,7 @@ class IngressStrategyV1beta1 extends IngressStrategy { clusterId: String, clusterClient: ClusterClient[_]): String = { - Utils.using(new DefaultKubernetesClient) { + new DefaultKubernetesClient().autoClose( client => Try { Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get) @@ -46,13 +46,12 @@ class IngressStrategyV1beta1 extends IngressStrategy { }.recover { case e => throw new RuntimeException(s"[StreamPark] get ingressUrlAddress error: $e") - }.get - } + }.get) } override def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = { - Utils.using(new DefaultKubernetesClient) { - client => + new DefaultKubernetesClient().autoClose( + client => { val ownerReference = getOwnerReference(nameSpace, clusterId, client) val ingress = new IngressBuilder() .withNewMetadata() @@ -83,8 +82,7 @@ class IngressStrategyV1beta1 extends IngressStrategy { .endRule() .endSpec() .build() - client.network.ingress.inNamespace(nameSpace).create(ingress) - } + }) } } diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala index b9167d437c..6fbb0de508 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala @@ -20,7 +20,7 @@ package org.apache.streampark.flink.packer.pipeline.impl import org.apache.streampark.common.conf.Workspace import org.apache.streampark.common.enums.DevelopmentMode import org.apache.streampark.common.fs.{FsOperator, HdfsOperator, LfsOperator} -import org.apache.streampark.common.util.Utils +import org.apache.streampark.common.util.ImplicitsUtils._ import org.apache.streampark.flink.packer.maven.MavenTool import org.apache.streampark.flink.packer.pipeline._ @@ -90,7 +90,7 @@ class FlinkYarnApplicationBuildPipeline(request: FlinkYarnApplicationBuildReques case FsOperator.hdfs => val uploadFile = s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}" if (fsOperator.exists(uploadFile)) { - Utils.using(new FileInputStream(originFile))( + new FileInputStream(originFile).autoClose( inputStream => { if (DigestUtils.md5Hex(inputStream) != fsOperator.fileMd5(uploadFile)) { fsOperator.upload(originFile.getAbsolutePath, uploadFile) diff --git a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala index 56dede3e63..44294d7566 100644 --- a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala +++ b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala @@ -19,6 +19,7 @@ package org.apache.streampark.flink.proxy import org.apache.streampark.common.conf.{ConfigConst, FlinkVersion} import org.apache.streampark.common.util.{ClassLoaderUtils, Logger, Utils} +import org.apache.streampark.common.util.ImplicitsUtils._ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream} import java.net.URL @@ -196,13 +197,14 @@ object FlinkShimsProxy extends Logger { @throws[Exception] def getObject[T](loader: ClassLoader, obj: Object): T = { val arrayOutputStream = new ByteArrayOutputStream - val result = Utils.using(new ObjectOutputStream(arrayOutputStream))( - objectOutputStream => { - objectOutputStream.writeObject(obj) - val byteArrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray) - Utils.using(new ClassLoaderObjectInputStream(loader, byteArrayInputStream))(_.readObject) - }) - result.asInstanceOf[T] + new ObjectOutputStream(arrayOutputStream) + .autoClose( + objectOutputStream => { + objectOutputStream.writeObject(obj) + val byteArrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray) + new ClassLoaderObjectInputStream(loader, byteArrayInputStream).autoClose(_.readObject()) + }) + .asInstanceOf[T] } } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala index b3ee24c33e..bf2e2c68b9 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala @@ -130,7 +130,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api require( configFile.exists(), s"[StreamPark] Usage: application config file: $configFile is not found!!!") - val text = FileUtils.readString(configFile) + val text = FileUtils.readFile(configFile) readConfig(text) } map.filter(_._2.nonEmpty)