Skip to content

Commit

Permalink
[Flink-K8s-V2] Port the new module codes to Streampark-flink-kubernet…
Browse files Browse the repository at this point in the history
…es-v2 (#2916)

* Initialize flink kubernetes v2 modules. #2881

Supplement additional license statement. #2881

Initialize flink kubernetes v2 modules. #2881

* Delete configuration A to obtain a more compact lambda format.

* Migrate util codes #2881

* Migrate zio extension to srteampark-common #2881

* mark Deprecated flink-k8s config #2881

* flink-k8s v2 configuration #2881

* Embedded http file server implementation. #2881

* update scalafmt

* remove util package. #2881

* Init flink-k8s-v2 model and rest request wrapper. #2881

* format code. #2881

* Conversion of Flink K8s JobSnapshot to FlinkAppState. #2881

* replace deprecated anno. #2881

* Implementation of flink k8s resources operators and observers. #2881

* Reimplement ZIO StreamPark logger backend. #2881

* Fix the incorrect mirror path . #2881

* template store . #2881

* Refactor: Remove flink-kubenretes-api shaded module and generate code directly from CRD yaml file. #2881

* Replace sttp client to zio-http client due to Java 8 incompatibility #2881

* Move the evaluation process of the final status of flink k8s tasks to the flink-k8s-v2 module for better readability. #2881

* Format code

* Solve compilation issues. #2881
  • Loading branch information
Al-assad authored Aug 4, 2023
1 parent a672ffe commit 5be85ca
Show file tree
Hide file tree
Showing 64 changed files with 14,077 additions and 88 deletions.
88 changes: 88 additions & 0 deletions .scalafmt-fp.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

runner.dialect = scala212

# Version is required to make sure IntelliJ picks the right version
version = 3.4.3
preset = default

# Max column
maxColumn = 120

# This parameter simply says the .stripMargin method was not redefined by the user to assign
# special meaning to indentation preceding the | character. Hence, that indentation can be modified.
assumeStandardLibraryStripMargin = true
align.stripMargin = true

# Align settings
align.preset = most
align.closeParenSite = false
align.openParenCallSite = false
danglingParentheses.defnSite = false
danglingParentheses.callSite = false
danglingParentheses.ctrlSite = true
danglingParentheses.tupleSite = false
align.openParenCallSite = false
align.openParenDefnSite = false
align.openParenTupleSite = false

# Newlines
newlines.alwaysBeforeElseAfterCurlyIf = false
newlines.afterCurlyLambdaParams = squash # No newline after lambda params
newlines.inInterpolation = "avoid"
newlines.avoidInResultType = true
optIn.annotationNewlines = true

# Scaladoc
docstrings.style = Asterisk # Javadoc style
docstrings.removeEmpty = true
docstrings.oneline = fold
docstrings.forceBlankLineBefore = true
docstrings.wrap = no

# Indentation
indent.extendSite = 2 # This makes sure extend is not indented as the ctor parameters
indentOperator.preset = spray

# Rewrites
rewrite.rules = [AvoidInfix, Imports, RedundantBraces, SortModifiers]

# Imports
rewrite.imports.sort = scalastyle
rewrite.imports.groups = [
["org.apache.streampark\\..*"],
["org.apache.streampark.shaded\\..*"],
[".*"],
["javax\\..*"],
["java\\..*"],
["scala\\..*"]
]
rewrite.imports.contiguousGroups = no
importSelectors = singleline # Imports in a single line, like IntelliJ

# Remove redundant braces in string interpolation.
rewrite.redundantBraces.stringInterpolation = true
rewrite.redundantBraces.defnBodies = false
rewrite.redundantBraces.generalExpressions = false
rewrite.redundantBraces.ifElseExpressions = false
rewrite.redundantBraces.methodBodies = false
rewrite.redundantBraces.includeUnitMethods = false
rewrite.redundantBraces.maxBreaks = 1

# Remove trailing commas
rewrite.trailingCommas.style = "never"
9 changes: 9 additions & 0 deletions dist-material/release-docs/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,11 @@ The text of each license is the standard Apache 2.0 license. https://www.apache.
https://mvnrepository.com/artifact/org.pac4j/pac4j-springboot/4.5.7 Apache-2.0
https://mvnrepository.com/artifact/org.pac4j/pac4j-oauth/4.5.7 Apache-2.0
https://mvnrepository.com/artifact/org.pac4j/pac4j-oidc/4.5.7 Apache-2.0
https://mvnrepository.com/artifact/io.fabric8/kubernetes-client/6.8.0 Apache-2.0
https://mvnrepository.com/artifact/dev.zio/zio_2.12/2.0.15 Apache-2.0
https://mvnrepository.com/artifact/dev.zio/zio-streams_2.12/2.0.15 Apache-2.0
https://mvnrepository.com/artifact/dev.zio/zio-concurrent_2.12/2.0.15 Apache-2.0
https://mvnrepository.com/artifact/dev.zio/zio-http_2.12/3.0.0-RC2 Apache-2.0
https://maven.apache.org/wrapper Apache-2.0
mvnw files from https://github.com/apache/maven-wrapper Apache 2.0
streampark-console/streampark-console-service/src/main/assembly/bin/setclasspath.sh from https://github.com/apache/tomcat
Expand Down Expand Up @@ -716,6 +721,10 @@ The text of each license is also included in licenses/LICENSE-[project].txt.
https://mvnrepository.com/artifact/org.slf4j/slf4j-api/1.7.30 MIT
https://mvnrepository.com/artifact/org.projectlombok/lombok/1.18.24 MIT
https://mvnrepository.com/artifact/com.auth0/java-jwt/4.0.0 MIT
https://mvnrepository.com/artifact/com.lihaoyi/pprint_2.12/0.8.1 MIT
https://mvnrepository.com/artifact/com.lihaoyi/os-lib_2.12/0.8.1 MIT
https://mvnrepository.com/artifact/com.lihaoyi/upickle_2.12/0.8.1 MIT



========================================================================
Expand Down
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@
<commons-lang3.version>3.8.1</commons-lang3.version>
<enumeratum.version>1.6.1</enumeratum.version>
<assertj.version>3.23.1</assertj.version>
<zio.version>2.0.15</zio.version>
<zio-logging.version>2.1.13</zio-logging.version>
<pprint.version>0.8.1</pprint.version>

<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
<maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
Expand Down Expand Up @@ -523,7 +526,9 @@
</resourceIncludes>
<resourceExcludes>
**/.asf.yaml,
**/.github/**
**/.github/**,
**/crd/**.yml,
**/crd/**.yaml
</resourceExcludes>
<excludes>
</excludes>
Expand Down
26 changes: 26 additions & 0 deletions streampark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,32 @@
<version>${streampark.shaded.version}</version>
</dependency>

<!-- ZIO -->
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-logging_${scala.binary.version}</artifactId>
<version>${zio-logging.version}</version>
</dependency>

<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-streams_${scala.binary.version}</artifactId>
<version>${zio.version}</version>
</dependency>

<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-concurrent_${scala.binary.version}</artifactId>
<version>${zio.version}</version>
</dependency>

<!-- pprint -->
<dependency>
<groupId>com.lihaoyi</groupId>
<artifactId>pprint_${scala.binary.version}</artifactId>
<version>${pprint.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.streampark.common.conf

/** Flink kubernetes Configuration */
/** Flink kubernetes Configuration for v1 version */
@deprecated("see: org.apache.streampark.flink.kubernetes.v2.Config")
object K8sFlinkConfig {

val jobStatusTrackTaskTimeoutSec: InternalOption = InternalOption(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class Workspace(storageType: StorageType) {
}
}

private[conf] lazy val WORKSPACE: String = {
lazy val WORKSPACE: String = {
storageType match {
case StorageType.LFS =>
val path: String = getConfigValue[String](CommonConfig.STREAMPARK_WORKSPACE_LOCAL)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.common.zio

import org.apache.streampark.common.util.Logger

import zio.{Cause, FiberId, FiberRefs, LogLevel, LogSpan, Runtime, Trace, ZLayer, ZLogger}
import zio.logging.LoggerNameExtractor

import scala.collection.concurrent.TrieMap

/** ZIO logging Backend that bridging to [[org.apache.streampark.common.util.Logger]] */
object LoggerBackend {

lazy val default: ZLayer[Any, Nothing, Unit] = Runtime.addLogger(provideLogger())

private val defaultLoggerName = getClass.getName
private val loggers = TrieMap[String, BridgeLogger]()

private def getLogger(loggerName: String): BridgeLogger = {
loggers.getOrElseUpdate(loggerName, BridgeLogger(loggerName))
}

private case class BridgeLogger(loggerName: String) extends Logger {
override protected def logName: String = loggerName

def trace(msg: String): Unit = super.logTrace(msg)
def info(msg: String): Unit = super.logInfo(msg)
def warn(msg: String): Unit = super.logWarn(msg)
def error(msg: String): Unit = super.logError(msg)
def debug(msg: String): Unit = super.logDebug(msg)
}

private def provideLogger(): ZLogger[String, Unit] = (
trace: Trace,
fiberId: FiberId,
logLevel: LogLevel,
message: () => String,
cause: Cause[Any],
context: FiberRefs,
spans: List[LogSpan],
annotations: Map[String, String]) => {

val loggerName =
LoggerNameExtractor.trace(trace, FiberRefs.empty, Map.empty).getOrElse(defaultLoggerName)
val logger = getLogger(loggerName)
val msg =
if (annotations.nonEmpty)
s"${annotations.map { case (k, v) => s"[$k=$v]" }.mkString(" ")} ${message()}"
else message()

logLevel match {
case LogLevel.None => logger.trace(msg)
case LogLevel.All => logger.trace(msg)
case LogLevel.Trace => logger.trace(msg)
case LogLevel.Debug => logger.debug(msg)
case LogLevel.Info => logger.info(msg)
case LogLevel.Warning => logger.warn(msg)
case LogLevel.Error => logger.error(msg)
case LogLevel.Fatal => logger.error(msg)
}
}

}
Loading

0 comments on commit 5be85ca

Please sign in to comment.