Skip to content

Commit

Permalink
[Improve] FileUtils minor improvement (apache#2981)
Browse files Browse the repository at this point in the history
* [Improve] FileUtils minor improvement

* minor improvement

* minor improvement

* Implicits improvement

* check style improvement

* minor improvement

* minor bug fixed.
  • Loading branch information
wolfboys authored and saLeox committed Sep 4, 2023
1 parent 09ae6b9 commit a3d4c7f
Show file tree
Hide file tree
Showing 22 changed files with 372 additions and 339 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.streampark.common.conf

import org.apache.streampark.common.util.Utils.StringCasts
import org.apache.streampark.common.util.ImplicitsUtils._

import java.util.Properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.streampark.common.conf

import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
import org.apache.streampark.common.util.Utils.StringCasts
import org.apache.streampark.common.util.ImplicitsUtils._

import javax.annotation.{Nonnull, Nullable}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.streampark.common.conf

import org.apache.streampark.common.enums.StorageType
import org.apache.streampark.common.util.{HdfsUtils, SystemPropertyUtils}
import org.apache.streampark.common.util.Utils.StringCasts
import org.apache.streampark.common.util.ImplicitsUtils._

import java.net.URI

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@
*/
package org.apache.streampark.common.util

import org.apache.streampark.common.util.ImplicitsUtils._

import java.io._
import java.net.URL
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util
import java.util.Scanner
import java.util.stream.Collectors

import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable
Expand All @@ -44,12 +50,12 @@ object FileUtils {
if (input == null) {
throw new RuntimeException("The inputStream can not be null")
}
Utils.using(input) {
in =>
input.autoClose(
in => {
val b = new Array[Byte](4)
in.read(b, 0, b.length)
bytesToHexString(b)
} == "504B0304"
}) == "504B0304"
}

def isJarFileType(file: File): Boolean = {
Expand All @@ -73,12 +79,6 @@ object FileUtils {
s"[StreamPark] Failed to create directory within $TEMP_DIR_ATTEMPTS attempts (tried $baseName 0 to $baseName ${TEMP_DIR_ATTEMPTS - 1})")
}

def exists(path: String): Unit = {
require(
path != null && path.nonEmpty && new File(path).exists(),
s"[StreamPark] FileUtils.exists: file $path is not exist!")
}

def getPathFromEnv(env: String): String = {
val path = Option(System.getenv(env)).getOrElse(System.getProperty(env))
require(
Expand Down Expand Up @@ -115,6 +115,14 @@ object FileUtils {
}
}

def exists(file: Serializable): Boolean = {
file match {
case null => false
case f: File => f.exists()
case p => new File(p.toString).exists()
}
}

def equals(file1: File, file2: File): Boolean = {
(file1, file2) match {
case (a, b) if a == null || b == null => false
Expand Down Expand Up @@ -143,20 +151,115 @@ object FileUtils {
}

@throws[IOException]
def readString(file: File): String = {
require(file != null && file.isFile)
val reader = new FileReader(file)
val scanner = new Scanner(reader)
val buffer = new mutable.StringBuilder()
if (scanner.hasNextLine) {
buffer.append(scanner.nextLine())
def readInputStream(in: InputStream, array: Array[Byte]): Unit = {
in.autoClose(
is => {
var toRead = array.length
var ret = 0
var off = 0
while (toRead > 0) {
ret = is.read(array, off, toRead)
if (ret < 0) throw new IOException("Bad inputStream, premature EOF")
toRead -= ret
off += ret
}
})
}

@throws[IOException]
def readFile(file: File): String = {
if (file.length >= Int.MaxValue) {
throw new IOException("Too large file, unexpected!")
} else {
val len = file.length
val array = new Array[Byte](len.toInt)
Files
.newInputStream(file.toPath)
.autoClose(
is => {
readInputStream(is, array)
new String(array, StandardCharsets.UTF_8)
})
}
while (scanner.hasNextLine) {
buffer.append("\r\n")
buffer.append(scanner.nextLine())
}

@throws[IOException]
def writeFile(content: String, file: File): Unit = {
val outputStream = Files.newOutputStream(file.toPath)
val channel = Channels.newChannel(outputStream)
val buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))
channel.write(buffer)
Utils.close(channel, outputStream)
}

@throws[IOException]
def readEndOfFile(file: File, maxSize: Long): Array[Byte] = {
var readSize = maxSize
new RandomAccessFile(file, "r").autoClose(
raFile => {
if (raFile.length > maxSize) {
raFile.seek(raFile.length - maxSize)
} else if (raFile.length < maxSize) {
readSize = raFile.length.toInt
}
val fileContent = new Array[Byte](readSize.toInt)
raFile.read(fileContent)
fileContent
})
}

/**
* Read the content of a file from a specified offset.
*
* @param file
* The file to read from
* @param startOffset
* The offset from where to start reading the file
* @param maxSize
* The maximum size of the file to read
* @return
* The content of the file as a byte array
* @throws IOException
* if an I/O error occurs while reading the file
* @throws IllegalArgumentException
* if the startOffset is greater than the file length
*/
@throws[IOException]
def readFileFromOffset(file: File, startOffset: Long, maxSize: Long): Array[Byte] = {
if (file.length < startOffset) {
throw new IllegalArgumentException(
s"The startOffset $startOffset is great than the file length ${file.length}")
}
new RandomAccessFile(file, "r").autoClose(
raFile => {
val readSize = Math.min(maxSize, file.length - startOffset)
raFile.seek(startOffset)
val fileContent = new Array[Byte](readSize.toInt)
raFile.read(fileContent)
fileContent
})
}

/**
* Roll View Log.
*
* @param path
* The file path.
* @param offset
* The offset.
* @param limit
* The limit.
* @return
* The content of the file.
*/
def tailOf(path: String, offset: Int, limit: Int): String = try {
val file = new File(path)
if (file.exists && file.isFile) {
Files
.lines(Paths.get(path))
.autoClose(stream => stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n")))
}
Utils.close(scanner, reader)
buffer.toString()
null
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ object HadoopUtils extends Logger {

def getConfigurationFromHadoopConfDir(confDir: String = hadoopConfDir): Configuration = {
if (!configurationCache.containsKey(confDir)) {
FileUtils.exists(confDir)
if (!FileUtils.exists(confDir)) {
throw new ExceptionInInitializerError(
s"[StreamPark] hadoop conf file " + confDir + " is not exist!")
}
val hadoopConfDir = new File(confDir)
val confName = List("hdfs-default.xml", "core-site.xml", "hdfs-site.xml", "yarn-site.xml")
val files =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.common.util

import org.apache.streampark.common.util.Utils.close

import java.lang.{Boolean => JavaBool, Byte => JavaByte, Double => JavaDouble, Float => JavaFloat, Integer => JavaInt, Long => JavaLong, Short => JavaShort}

object ImplicitsUtils {

implicit class AutoCloseImplicits[T <: AutoCloseable](autoCloseable: T) {

implicit def autoClose[R](func: T => R)(implicit excFunc: Throwable => R = null): R = {
try {
func(autoCloseable)
} catch {
case e: Throwable if excFunc != null => excFunc(e)
} finally {
close(autoCloseable)
}
}

}

implicit class StringImplicits(v: String) {
def cast[T](classType: Class[_]): T = {
classType match {
case c if c == classOf[String] => v.asInstanceOf[T]
case c if c == classOf[Byte] => v.toByte.asInstanceOf[T]
case c if c == classOf[Int] => v.toInt.asInstanceOf[T]
case c if c == classOf[Long] => v.toLong.asInstanceOf[T]
case c if c == classOf[Float] => v.toFloat.asInstanceOf[T]
case c if c == classOf[Double] => v.toDouble.asInstanceOf[T]
case c if c == classOf[Short] => v.toShort.asInstanceOf[T]
case c if c == classOf[Boolean] => v.toBoolean.asInstanceOf[T]
case c if c == classOf[JavaByte] => v.toByte.asInstanceOf[T]
case c if c == classOf[JavaInt] => JavaInt.valueOf(v).asInstanceOf[T]
case c if c == classOf[JavaLong] => JavaLong.valueOf(v).asInstanceOf[T]
case c if c == classOf[JavaFloat] => JavaFloat.valueOf(v).asInstanceOf[T]
case c if c == classOf[JavaDouble] => JavaDouble.valueOf(v).asInstanceOf[T]
case c if c == classOf[JavaShort] => JavaShort.valueOf(v).asInstanceOf[T]
case c if c == classOf[JavaBool] => JavaBool.valueOf(v).asInstanceOf[T]
case _ =>
throw new IllegalArgumentException(s"Unsupported type: $classType")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private[this] object LoggerFactory extends LoggerFactoryBinder {
val configurator = new JoranConfigurator()
configurator.setContext(loggerContext)
val text = FileUtils
.readString(new File(path))
.readFile(new File(path))
.replaceAll("org.slf4j", s"$shadedPackage.org.slf4j")
.replaceAll("ch.qos.logback", s"$shadedPackage.ch.qos.logback")
.replaceAll("org.apache.log4j", s"$shadedPackage.org.apache.log4j")
Expand Down
Loading

0 comments on commit a3d4c7f

Please sign in to comment.