From 5e0ce99f623f2b7db527112174f8cc01c1eda16c Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 21 Jul 2023 23:22:00 +0800 Subject: [PATCH] Extract common const PARAM_PREFIX (#2874) * Extract common const PARAM_PREFIX --- .../apache/streampark/common/conf/ConfigConst.scala | 2 ++ .../flink/client/trait/FlinkClientTrait.scala | 10 +++++----- .../streampark/flink/core/SqlCommandParser.scala | 7 ++++--- .../flink/core/test/FlinkSqlExecuteFunSuite.scala | 4 ++-- .../scala/org/apache/streampark/spark/core/Spark.scala | 2 +- 5 files changed, 14 insertions(+), 11 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala index 6be8accb26..9f39a914c9 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala @@ -24,6 +24,8 @@ object ConfigConst { val DEFAULT_DATAMASK_STRING = "********" + val PARAM_PREFIX = "--" + /** about parameter... */ val KEY_APP_HOME = "app.home" diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 670d323da1..bc10ff0729 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -52,11 +52,11 @@ import scala.util.{Failure, Success, Try} trait FlinkClientTrait extends Logger { - private[client] lazy val PARAM_KEY_FLINK_CONF = KEY_FLINK_CONF("--") - private[client] lazy val PARAM_KEY_FLINK_SQL = KEY_FLINK_SQL("--") - private[client] lazy val PARAM_KEY_APP_CONF = KEY_APP_CONF("--") - private[client] lazy val PARAM_KEY_APP_NAME = KEY_APP_NAME("--") - private[client] lazy val PARAM_KEY_FLINK_PARALLELISM = KEY_FLINK_PARALLELISM("--") + private[client] lazy val PARAM_KEY_FLINK_CONF = KEY_FLINK_CONF(PARAM_PREFIX) + private[client] lazy val PARAM_KEY_FLINK_SQL = KEY_FLINK_SQL(PARAM_PREFIX) + private[client] lazy val PARAM_KEY_APP_CONF = KEY_APP_CONF(PARAM_PREFIX) + private[client] lazy val PARAM_KEY_APP_NAME = KEY_APP_NAME(PARAM_PREFIX) + private[client] lazy val PARAM_KEY_FLINK_PARALLELISM = KEY_FLINK_PARALLELISM(PARAM_PREFIX) @throws[Exception] def submit(submitRequest: SubmitRequest): SubmitResponse = { diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala index e77bf5349d..492e6e9cf8 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala @@ -16,6 +16,7 @@ */ package org.apache.streampark.flink.core +import org.apache.streampark.common.conf.ConfigConst.PARAM_PREFIX import org.apache.streampark.common.enums.FlinkSqlValidationFailedType import org.apache.streampark.common.util.Logger @@ -417,7 +418,7 @@ case class SqlSegment(start: Int, end: Int, sql: String) object SqlSplitter { - private lazy val singleLineCommentPrefixList = Set[String]("--") + private lazy val singleLineCommentPrefixList = Set[String](PARAM_PREFIX) /** * Split whole text into multiple sql statements. Two Steps: Step 1, split the whole text into @@ -453,7 +454,7 @@ object SqlSplitter { while (scanner.hasNextLine) { lineNumber += 1 val line = scanner.nextLine().trim - val nonEmpty = line.nonEmpty && !line.startsWith("--") + val nonEmpty = line.nonEmpty && !line.startsWith(PARAM_PREFIX) if (line.startsWith("/*")) { startComment = true hasComment = true @@ -618,7 +619,7 @@ object SqlSplitter { builder.toString } - private[this] def isSingleLineComment(text: String) = text.trim.startsWith("--") + private[this] def isSingleLineComment(text: String) = text.trim.startsWith(PARAM_PREFIX) private[this] def isMultipleLineComment(text: String) = text.trim.startsWith("/*") && text.trim.endsWith("*/") diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala index e559c52c64..a681018a39 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core.test -import org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL +import org.apache.streampark.common.conf.ConfigConst.{KEY_FLINK_SQL, PARAM_PREFIX} import org.apache.streampark.common.util.DeflaterUtils import org.apache.streampark.flink.core.{FlinkSqlExecutor, FlinkTableInitializer, StreamTableContext} @@ -28,7 +28,7 @@ import scala.collection.mutable.ArrayBuffer class FlinkSqlExecuteFunSuite extends AnyFunSuite { def execute(sql: String)(implicit func: String => Unit): Unit = { - val args = ArrayBuffer(KEY_FLINK_SQL("--"), DeflaterUtils.zipString(sql.stripMargin)) + val args = ArrayBuffer(KEY_FLINK_SQL(PARAM_PREFIX), DeflaterUtils.zipString(sql.stripMargin)) val context = new StreamTableContext(FlinkTableInitializer.initialize(args.toArray, null, null)) FlinkSqlExecutor.executeSql(KEY_FLINK_SQL(), context.parameter, context) } diff --git a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala index 186317fb8d..83aec63d06 100644 --- a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala +++ b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala @@ -101,7 +101,7 @@ trait Spark extends Logger { createOnError = value.toBoolean argv = tail case Nil => - case other :: value :: tail if other.startsWith("--") => + case other :: value :: tail if other.startsWith(PARAM_PREFIX) => userArgs += other.drop(2) -> value argv = tail case tail =>