Skip to content

Commit

Permalink
[Improve] Packer module code optimization (apache#3010)
Browse files Browse the repository at this point in the history
* DockerResolveProgress Code optimization

* uploadToHdfs rename

* Modified K8sPodTemplates

* Modified PodTemplateTool

* Modified FlinkK8sApplicationBuildPipeline
  • Loading branch information
ChengJie1053 authored and saLeox committed Sep 4, 2023
1 parent a3d4c7f commit 10a3bc3
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ object Utils {
*/
def calPercent(num1: Long, num2: Long): Double =
if (num1 == 0 || num2 == 0) 0.0
else (num1.toDouble / num2.toDouble * 100).formatted("%.1f").toDouble
else "%.1f".format(num1.toDouble / num2.toDouble * 100).toDouble

def hashCode(elements: Any*): Int = {
if (elements == null) return 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.streampark.flink.kubernetes
import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates

import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.StringUtils
import org.apache.flink.configuration.Configuration

import java.io.File
Expand Down Expand Up @@ -57,7 +58,7 @@ object PodTemplateTool {

val podTempleMap = mutable.Map[String, String]()
val outputTmplContent = (tmplContent: String, podTmpl: PodTemplateType) => {
if (tmplContent.nonEmpty) {
if (StringUtils.isNotBlank(tmplContent)) {
val outputPath = s"$buildWorkspace/${podTmpl.fileName}"
val outputFile = new File(outputPath)
FileUtils.write(outputFile, tmplContent, "UTF-8")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.streampark.flink.kubernetes.model

import org.apache.streampark.common.util.Utils

import org.apache.commons.lang3.StringUtils

import scala.util.Try

/** Pod template for flink k8s cluster */
Expand All @@ -27,9 +29,9 @@ case class K8sPodTemplates(
jmPodTemplate: String = "",
tmPodTemplate: String = "") {

def nonEmpty: Boolean = Option(podTemplate).exists(_.trim.nonEmpty) ||
Option(jmPodTemplate).exists(_.trim.nonEmpty) ||
Option(tmPodTemplate).exists(_.trim.nonEmpty)
def nonEmpty: Boolean = StringUtils.isNotBlank(podTemplate) ||
StringUtils.isNotBlank(jmPodTemplate) ||
StringUtils.isNotBlank(tmPodTemplate)

def isEmpty: Boolean = !nonEmpty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.streampark.flink.packer.pipeline
import org.apache.streampark.common.util.Utils

import com.github.dockerjava.api.model.{PullResponseItem, PushResponseItem}
import org.apache.commons.lang3.StringUtils

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand All @@ -36,7 +37,10 @@ class DockerPullProgress(
var lastTime: Long) {
// noinspection DuplicatedCode
def update(pullRsp: PullResponseItem): Unit = {
if (pullRsp == null || pullRsp.getId == null || pullRsp.getStatus == null) {
if (
pullRsp == null || StringUtils.isBlank(pullRsp.getId) || StringUtils.isBlank(
pullRsp.getStatus)
) {
return
}
if (pullRsp.getStatus.contains("complete")) {
Expand All @@ -60,7 +64,7 @@ class DockerPullProgress(

class DockerBuildProgress(val steps: ArrayBuffer[String], var lastTime: Long) {
def update(buildStep: String): Unit = {
if (buildStep != null && buildStep.nonEmpty) {
if (StringUtils.isNotBlank(buildStep)) {
steps += buildStep
lastTime = System.currentTimeMillis
}
Expand All @@ -75,7 +79,10 @@ class DockerPushProgress(
var lastTime: Long) {
// noinspection DuplicatedCode
def update(pushRsp: PushResponseItem): Unit = {
if (pushRsp == null || pushRsp.getId == null || pushRsp.getStatus == null) {
if (
pushRsp == null || StringUtils.isBlank(pushRsp.getId) || StringUtils.isBlank(
pushRsp.getStatus)
) {
return
}
if (pushRsp.getStatus.contains("complete")) {
Expand Down Expand Up @@ -125,8 +132,8 @@ case class DockerLayerProgress(layerId: String, status: String, current: Long, t
def percent: Double = Utils.calPercent(current, total)

def currentMb: Double =
if (current == 0) 0 else (current.toDouble / (1024 * 1024)).formatted("%.2f").toDouble
if (current == 0) 0 else "%.2f".format(current.toDouble / (1024 * 1024)).toDouble

def totalMb: Double =
if (total == 0) 0 else (total.toDouble / (1024 * 1024)).formatted("%.2f").toDouble
if (total == 0) 0 else "%.2f".format(total.toDouble / (1024 * 1024)).toDouble
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,7 @@ class FlinkK8sApplicationBuildPipeline(request: FlinkK8sApplicationBuildRequest)
registerAddress: String,
imageNamespace: String): String = {
var tagName = if (tag.contains("/")) tag else s"$imageNamespace/$tag"
if (
registerAddress != null && registerAddress.nonEmpty && !tagName.startsWith(registerAddress)
) {
if (StringUtils.isNotBlank(registerAddress) && !tagName.startsWith(registerAddress)) {
tagName = s"$registerAddress/$tagName"
}
tagName.toLowerCase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,19 @@ class FlinkYarnApplicationBuildPipeline(request: FlinkYarnApplicationBuildReques
execStep(3) {
mavenJars.foreach(
jar => {
uploadToHdfs(FsOperator.lfs, jar, request.localWorkspace)
uploadToHdfs(FsOperator.hdfs, jar, request.yarnProvidedPath)
uploadJarToHdfsOrLfs(FsOperator.lfs, jar, request.localWorkspace)
uploadJarToHdfsOrLfs(FsOperator.hdfs, jar, request.yarnProvidedPath)
})
}.getOrElse(throw getError.exception)

SimpleBuildResponse()
}

@throws[IOException]
private[this] def uploadToHdfs(fsOperator: FsOperator, origin: String, target: String): Unit = {
private[this] def uploadJarToHdfsOrLfs(
fsOperator: FsOperator,
origin: String,
target: String): Unit = {
val originFile = new File(origin)
if (!fsOperator.exists(target)) {
fsOperator.mkdirs(target)
Expand Down

0 comments on commit 10a3bc3

Please sign in to comment.