Skip to content

Commit

Permalink
[Improve] FileUtils minor improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Aug 27, 2023
1 parent 5ee6739 commit ea968bb
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ package org.apache.streampark.common.util

import java.io._
import java.net.URL
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util
import java.util.Scanner
import java.util.stream.Collectors

import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable

object FileUtils {
object FileUtils extends org.apache.commons.io.FileUtils {

private[this] def bytesToHexString(src: Array[Byte]): String = {
val stringBuilder = new mutable.StringBuilder
Expand Down Expand Up @@ -73,12 +77,6 @@ object FileUtils {
s"[StreamPark] Failed to create directory within $TEMP_DIR_ATTEMPTS attempts (tried $baseName 0 to $baseName ${TEMP_DIR_ATTEMPTS - 1})")
}

def exists(path: String): Unit = {
require(
path != null && path.nonEmpty && new File(path).exists(),
s"[StreamPark] FileUtils.exists: file $path is not exist!")
}

def getPathFromEnv(env: String): String = {
val path = Option(System.getenv(env)).getOrElse(System.getProperty(env))
require(
Expand Down Expand Up @@ -115,6 +113,14 @@ object FileUtils {
}
}

def exists(file: Serializable): Boolean = {
file match {
case null => false
case f: java.io.File => f.exists()
case p => new java.io.File(p.toString).exists()
}
}

def equals(file1: File, file2: File): Boolean = {
(file1, file2) match {
case (a, b) if a == null || b == null => false
Expand Down Expand Up @@ -143,20 +149,112 @@ object FileUtils {
}

@throws[IOException]
def readString(file: File): String = {
require(file != null && file.isFile)
val reader = new FileReader(file)
val scanner = new Scanner(reader)
val buffer = new mutable.StringBuilder()
if (scanner.hasNextLine) {
buffer.append(scanner.nextLine())
def readInputStream(in: InputStream, array: Array[Byte]): Unit = {
var toRead = array.length
var ret = 0
var off = 0
while (toRead > 0) {
ret = in.read(array, off, toRead)
if (ret < 0) throw new IOException("Bad inputStream, premature EOF")
toRead -= ret
off += ret
}
in.close()
}

@throws[IOException]
def readFile(file: File): String = {
if (file.length >= Int.MaxValue) {
throw new IOException("Too large file, unexpected!")
} else {
val len = file.length
val array = new Array[Byte](len.toInt)
val is = Files.newInputStream(file.toPath)
readInputStream(is, array)
is.close()
new String(array, StandardCharsets.UTF_8)
}
}

@throws[IOException]
def writeFile(content: String, file: File): Unit = {
val outputStream = Files.newOutputStream(file.toPath)
val channel = Channels.newChannel(outputStream)
val buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))
channel.write(buffer)
channel.close()
outputStream.flush()
outputStream.close()
}

@throws[IOException]
def readEndOfFile(file: File, maxSize: Long): Array[Byte] = {
var readSize = maxSize
var fileContent: Array[Byte] = null
try {
val raFile = new RandomAccessFile(file, "r")
try {
if (raFile.length > maxSize) raFile.seek(raFile.length - maxSize)
else if (raFile.length < maxSize) readSize = raFile.length.toInt
fileContent = new Array[Byte](readSize.toInt)
raFile.read(fileContent)
} finally if (raFile != null) raFile.close()
}
while (scanner.hasNextLine) {
buffer.append("\r\n")
buffer.append(scanner.nextLine())
fileContent
}

/**
* Read the content of a file from a specified offset.
*
* @param file
* The file to read from
* @param startOffset
* The offset from where to start reading the file
* @param maxSize
* The maximum size of the file to read
* @return
* The content of the file as a byte array
* @throws IOException
* if an I/O error occurs while reading the file
* @throws IllegalArgumentException
* if the startOffset is greater than the file length
*/
@throws[IOException]
def readFileFromOffset(file: File, startOffset: Long, maxSize: Long): Array[Byte] = {
if (file.length < startOffset) {
throw new IllegalArgumentException(
s"The startOffset $startOffset is great than the file length ${file.length}")
}
Utils.using(new RandomAccessFile(file, "r")) {
raFile =>
val readSize = Math.min(maxSize, file.length - startOffset)
raFile.seek(startOffset)
val fileContent = new Array[Byte](readSize.toInt)
raFile.read(fileContent)
fileContent
}
}

/**
* Roll View Log.
*
* @param path
* The file path.
* @param offset
* The offset.
* @param limit
* The limit.
* @return
* The content of the file.
*/
def tailOf(path: String, offset: Int, limit: Int): String = try {
val file = new File(path)
if (file.exists && file.isFile) {
Utils.using(Files.lines(Paths.get(path))) {
stream => stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n"))
}
}
Utils.close(scanner, reader)
buffer.toString()
null
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ object HadoopUtils extends Logger {

def getConfigurationFromHadoopConfDir(confDir: String = hadoopConfDir): Configuration = {
if (!configurationCache.containsKey(confDir)) {
FileUtils.exists(confDir)
if (!FileUtils.exists(confDir)) {
throw new ExceptionInInitializerError(
s"[StreamPark] hadoop conf file " + confDir + " is not exist!")
}
val hadoopConfDir = new File(confDir)
val confName = List("hdfs-default.xml", "core-site.xml", "hdfs-site.xml", "yarn-site.xml")
val files =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private[this] object LoggerFactory extends LoggerFactoryBinder {
val configurator = new JoranConfigurator()
configurator.setContext(loggerContext)
val text = FileUtils
.readString(new File(path))
.readFile(new File(path))
.replaceAll("org.slf4j", s"$shadedPackage.org.slf4j")
.replaceAll("ch.qos.logback", s"$shadedPackage.ch.qos.logback")
.replaceAll("org.apache.log4j", s"$shadedPackage.org.apache.log4j")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId());
String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app);

if (!new File(flinkUserJar).exists()) {
if (!FileUtils.exists(flinkUserJar)) {
Resource resource = resourceService.findByResourceName(app.getTeamId(), app.getJar());
if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) {
flinkUserJar = resource.getFilePath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.streampark.common.fs.LfsOperator;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.FileUtils;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.common.util.ThreadUtils;
Expand Down Expand Up @@ -97,7 +98,6 @@
import org.apache.streampark.flink.packer.pipeline.BuildResult;
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CoreOptions;
Expand Down Expand Up @@ -129,7 +129,6 @@
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
Expand Down Expand Up @@ -619,9 +618,8 @@ public String k8sStartLog(Long id, Integer offset, Integer limit) throws Excepti
if (!future.isDone()) {
future.cancel(true);
}
if (path != null) {
return org.apache.streampark.console.base.util.FileUtils.rollViewLog(
path, offset, limit);
if (FileUtils.exists(path)) {
return FileUtils.tailOf(path, offset, limit);
}
return null;
})
Expand Down Expand Up @@ -738,7 +736,7 @@ public boolean create(Application appParam) {
jarPath = resource.getFilePath();
}
}
appParam.setJarCheckSum(FileUtils.checksumCRC32(new File(jarPath)));
appParam.setJarCheckSum(org.apache.commons.io.FileUtils.checksumCRC32(new File(jarPath)));
}

if (save(appParam)) {
Expand Down Expand Up @@ -866,7 +864,7 @@ public boolean update(Application appParam) {
File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar());
if (jarFile.exists()) {
try {
long checkSum = FileUtils.checksumCRC32(jarFile);
long checkSum = org.apache.commons.io.FileUtils.checksumCRC32(jarFile);
if (!Objects.equals(checkSum, application.getJarCheckSum())) {
application.setBuild(true);
}
Expand Down Expand Up @@ -1144,7 +1142,7 @@ public void clean(Application appParam) {
@Override
public String readConf(Application appParam) throws IOException {
File file = new File(appParam.getConfig());
String conf = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
String conf = FileUtils.readFile(file);
return Base64.getEncoder().encodeToString(conf.getBytes());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import org.apache.streampark.common.conf.InternalConfigHolder;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.FileUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.ResponseCode;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.FileUtils;
import org.apache.streampark.console.base.util.GZipUtils;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.Project;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.streampark.console.base.util;

import org.apache.streampark.common.util.FileUtils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down
Loading

0 comments on commit ea968bb

Please sign in to comment.