diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala index 719715e22d..40dd1d120d 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala @@ -18,13 +18,17 @@ package org.apache.streampark.common.util import java.io._ import java.net.URL +import java.nio.ByteBuffer +import java.nio.channels.Channels +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} import java.util -import java.util.Scanner +import java.util.stream.Collectors import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable -object FileUtils { +object FileUtils extends org.apache.commons.io.FileUtils { private[this] def bytesToHexString(src: Array[Byte]): String = { val stringBuilder = new mutable.StringBuilder @@ -73,12 +77,6 @@ object FileUtils { s"[StreamPark] Failed to create directory within $TEMP_DIR_ATTEMPTS attempts (tried $baseName 0 to $baseName ${TEMP_DIR_ATTEMPTS - 1})") } - def exists(path: String): Unit = { - require( - path != null && path.nonEmpty && new File(path).exists(), - s"[StreamPark] FileUtils.exists: file $path is not exist!") - } - def getPathFromEnv(env: String): String = { val path = Option(System.getenv(env)).getOrElse(System.getProperty(env)) require( @@ -115,6 +113,14 @@ object FileUtils { } } + def exists(file: Serializable): Boolean = { + file match { + case null => false + case f: java.io.File => f.exists() + case p => new java.io.File(p.toString).exists() + } + } + def equals(file1: File, file2: File): Boolean = { (file1, file2) match { case (a, b) if a == null || b == null => false @@ -143,20 +149,112 @@ object FileUtils { } @throws[IOException] - def readString(file: File): String = { - require(file != null && file.isFile) - val reader = new FileReader(file) - val scanner = new Scanner(reader) - val buffer = new mutable.StringBuilder() - if (scanner.hasNextLine) { - buffer.append(scanner.nextLine()) + def readInputStream(in: InputStream, array: Array[Byte]): Unit = { + var toRead = array.length + var ret = 0 + var off = 0 + while (toRead > 0) { + ret = in.read(array, off, toRead) + if (ret < 0) throw new IOException("Bad inputStream, premature EOF") + toRead -= ret + off += ret + } + in.close() + } + + @throws[IOException] + def readFile(file: File): String = { + if (file.length >= Int.MaxValue) { + throw new IOException("Too large file, unexpected!") + } else { + val len = file.length + val array = new Array[Byte](len.toInt) + val is = Files.newInputStream(file.toPath) + readInputStream(is, array) + is.close() + new String(array, StandardCharsets.UTF_8) + } + } + + @throws[IOException] + def writeFile(content: String, file: File): Unit = { + val outputStream = Files.newOutputStream(file.toPath) + val channel = Channels.newChannel(outputStream) + val buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)) + channel.write(buffer) + channel.close() + outputStream.flush() + outputStream.close() + } + + @throws[IOException] + def readEndOfFile(file: File, maxSize: Long): Array[Byte] = { + var readSize = maxSize + var fileContent: Array[Byte] = null + try { + val raFile = new RandomAccessFile(file, "r") + try { + if (raFile.length > maxSize) raFile.seek(raFile.length - maxSize) + else if (raFile.length < maxSize) readSize = raFile.length.toInt + fileContent = new Array[Byte](readSize.toInt) + raFile.read(fileContent) + } finally if (raFile != null) raFile.close() } - while (scanner.hasNextLine) { - buffer.append("\r\n") - buffer.append(scanner.nextLine()) + fileContent + } + + /** + * Read the content of a file from a specified offset. + * + * @param file + * The file to read from + * @param startOffset + * The offset from where to start reading the file + * @param maxSize + * The maximum size of the file to read + * @return + * The content of the file as a byte array + * @throws IOException + * if an I/O error occurs while reading the file + * @throws IllegalArgumentException + * if the startOffset is greater than the file length + */ + @throws[IOException] + def readFileFromOffset(file: File, startOffset: Long, maxSize: Long): Array[Byte] = { + if (file.length < startOffset) { + throw new IllegalArgumentException( + s"The startOffset $startOffset is great than the file length ${file.length}") + } + Utils.using(new RandomAccessFile(file, "r")) { + raFile => + val readSize = Math.min(maxSize, file.length - startOffset) + raFile.seek(startOffset) + val fileContent = new Array[Byte](readSize.toInt) + raFile.read(fileContent) + fileContent + } + } + + /** + * Roll View Log. + * + * @param path + * The file path. + * @param offset + * The offset. + * @param limit + * The limit. + * @return + * The content of the file. + */ + def tailOf(path: String, offset: Int, limit: Int): String = try { + val file = new File(path) + if (file.exists && file.isFile) { + Utils.using(Files.lines(Paths.get(path))) { + stream => stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n")) + } } - Utils.close(scanner, reader) - buffer.toString() + null } } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala index b9823d8f07..715afa00da 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala @@ -137,7 +137,10 @@ object HadoopUtils extends Logger { def getConfigurationFromHadoopConfDir(confDir: String = hadoopConfDir): Configuration = { if (!configurationCache.containsKey(confDir)) { - FileUtils.exists(confDir) + if (!FileUtils.exists(confDir)) { + throw new ExceptionInInitializerError( + s"[StreamPark] hadoop conf file " + confDir + " is not exist!") + } val hadoopConfDir = new File(confDir) val confName = List("hdfs-default.xml", "core-site.xml", "hdfs-site.xml", "yarn-site.xml") val files = diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala index 583fc91f22..0b97498b17 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala @@ -140,7 +140,7 @@ private[this] object LoggerFactory extends LoggerFactoryBinder { val configurator = new JoranConfigurator() configurator.setContext(loggerContext) val text = FileUtils - .readString(new File(path)) + .readFile(new File(path)) .replaceAll("org.slf4j", s"$shadedPackage.org.slf4j") .replaceAll("ch.qos.logback", s"$shadedPackage.ch.qos.logback") .replaceAll("org.apache.log4j", s"$shadedPackage.org.apache.log4j") diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java deleted file mode 100644 index 4aaa624f21..0000000000 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.console.base.util; - -import org.apache.streampark.console.base.exception.ApiDetailException; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** The file utils. */ -public class FileUtils { - - private FileUtils() {} - - /** - * Reads the last portion of a file as a byte array. - * - * @param file the file to read - * @param maxSize the maximum number of bytes to read from the end of the file - * @return the byte array containing the content read from the file - * @throws IOException if an I/O error occurs - */ - public static byte[] readEndOfFile(File file, long maxSize) throws IOException { - long readSize = maxSize; - byte[] fileContent; - try (RandomAccessFile raFile = new RandomAccessFile(file, "r")) { - if (raFile.length() > maxSize) { - raFile.seek(raFile.length() - maxSize); - } else if (raFile.length() < maxSize) { - readSize = (int) raFile.length(); - } - fileContent = new byte[(int) readSize]; - raFile.read(fileContent); - } - return fileContent; - } - - /** - * Read the content of a file from a specified offset. - * - * @param file The file to read from - * @param startOffset The offset from where to start reading the file - * @param maxSize The maximum size of the file to read - * @return The content of the file as a byte array - * @throws IOException if an I/O error occurs while reading the file - * @throws IllegalArgumentException if the startOffset is greater than the file length - */ - public static byte[] readFileFromOffset(File file, long startOffset, long maxSize) - throws IOException { - if (file.length() < startOffset) { - throw new IllegalArgumentException( - String.format( - "The startOffset %s is great than the file length %s", startOffset, file.length())); - } - byte[] fileContent; - try (RandomAccessFile raFile = new RandomAccessFile(file, "r")) { - long readSize = Math.min(maxSize, file.length() - startOffset); - raFile.seek(startOffset); - fileContent = new byte[(int) readSize]; - raFile.read(fileContent); - } - return fileContent; - } - - /** - * Roll View Log. - * - * @param path The file path. - * @param offset The offset. - * @param limit The limit. - * @return The content of the file. - * @throws ApiDetailException if there's an error rolling the view log. - */ - public static String rollViewLog(String path, int offset, int limit) { - try { - File file = new File(path); - if (file.exists() && file.isFile()) { - try (Stream stream = Files.lines(Paths.get(path))) { - return stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n")); - } - } - return null; - } catch (Exception e) { - throw new ApiDetailException("roll view log error: " + e); - } - } -} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index f9017a7249..a20f5a08c4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -424,7 +424,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId()); String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app); - if (!new File(flinkUserJar).exists()) { + if (!FileUtils.exists(flinkUserJar)) { Resource resource = resourceService.findByResourceName(app.getTeamId(), app.getJar()); if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) { flinkUserJar = resource.getFilePath(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index a948796401..6806825b56 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -29,6 +29,7 @@ import org.apache.streampark.common.fs.LfsOperator; import org.apache.streampark.common.util.CompletableFutureUtils; import org.apache.streampark.common.util.DeflaterUtils; +import org.apache.streampark.common.util.FileUtils; import org.apache.streampark.common.util.HadoopUtils; import org.apache.streampark.common.util.PropertiesUtils; import org.apache.streampark.common.util.ThreadUtils; @@ -97,7 +98,6 @@ import org.apache.streampark.flink.packer.pipeline.BuildResult; import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.CoreOptions; @@ -129,7 +129,6 @@ import java.io.IOException; import java.io.Serializable; import java.net.URI; -import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Base64; import java.util.Collection; @@ -619,9 +618,8 @@ public String k8sStartLog(Long id, Integer offset, Integer limit) throws Excepti if (!future.isDone()) { future.cancel(true); } - if (path != null) { - return org.apache.streampark.console.base.util.FileUtils.rollViewLog( - path, offset, limit); + if (FileUtils.exists(path)) { + return FileUtils.tailOf(path, offset, limit); } return null; }) @@ -738,7 +736,7 @@ public boolean create(Application appParam) { jarPath = resource.getFilePath(); } } - appParam.setJarCheckSum(FileUtils.checksumCRC32(new File(jarPath))); + appParam.setJarCheckSum(org.apache.commons.io.FileUtils.checksumCRC32(new File(jarPath))); } if (save(appParam)) { @@ -866,7 +864,7 @@ public boolean update(Application appParam) { File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar()); if (jarFile.exists()) { try { - long checkSum = FileUtils.checksumCRC32(jarFile); + long checkSum = org.apache.commons.io.FileUtils.checksumCRC32(jarFile); if (!Objects.equals(checkSum, application.getJarCheckSum())) { application.setBuild(true); } @@ -1144,7 +1142,7 @@ public void clean(Application appParam) { @Override public String readConf(Application appParam) throws IOException { File file = new File(appParam.getConfig()); - String conf = FileUtils.readFileToString(file, StandardCharsets.UTF_8); + String conf = FileUtils.readFile(file); return Base64.getEncoder().encodeToString(conf.getBytes()); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java index e568d976b3..d48228706e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java @@ -21,6 +21,7 @@ import org.apache.streampark.common.conf.InternalConfigHolder; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.util.CompletableFutureUtils; +import org.apache.streampark.common.util.FileUtils; import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.ResponseCode; @@ -28,7 +29,6 @@ import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; -import org.apache.streampark.console.base.util.FileUtils; import org.apache.streampark.console.base.util.GZipUtils; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.Project; diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java index 57db8581ce..d310833278 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java @@ -17,6 +17,8 @@ package org.apache.streampark.console.base.util; +import org.apache.streampark.common.util.FileUtils; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala index b3ee24c33e..bf2e2c68b9 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala @@ -130,7 +130,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api require( configFile.exists(), s"[StreamPark] Usage: application config file: $configFile is not found!!!") - val text = FileUtils.readString(configFile) + val text = FileUtils.readFile(configFile) readConfig(text) } map.filter(_._2.nonEmpty)