Skip to content

Commit

Permalink
The YarnClientTrait add clusterClient close
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Jul 28, 2023
1 parent af0df56 commit 8e31e06
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId
import java.lang.{Boolean => JavaBool}
import java.lang.reflect.Method

import scala.util.Try

/** yarn application mode submit */
trait YarnClientTrait extends FlinkClientTrait {

Expand All @@ -54,14 +52,16 @@ trait YarnClientTrait extends FlinkClientTrait {
val clusterDescriptor = clusterClientFactory.createClusterDescriptor(flinkConf)
clusterDescriptor.retrieve(applicationId).getClusterClient
}
Try {
try {
actionFunc(jobID, clusterClient)
}.recover {
case e =>
} catch {
case e: Exception =>
throw new FlinkException(
s"[StreamPark] Do ${request.getClass.getSimpleName} for the job ${request.jobId} failed. " +
s"detail: ${Utils.stringifyException(e)}");
}.get
} finally {
if (clusterClient != null) clusterClient.close()
}
}

override def doTriggerSavepoint(
Expand Down

0 comments on commit 8e31e06

Please sign in to comment.