diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala index fa88df7dd9..183a5de3d0 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala @@ -33,8 +33,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId import java.lang.{Boolean => JavaBool} import java.lang.reflect.Method -import scala.util.Try - /** yarn application mode submit */ trait YarnClientTrait extends FlinkClientTrait { @@ -54,14 +52,16 @@ trait YarnClientTrait extends FlinkClientTrait { val clusterDescriptor = clusterClientFactory.createClusterDescriptor(flinkConf) clusterDescriptor.retrieve(applicationId).getClusterClient } - Try { + try { actionFunc(jobID, clusterClient) - }.recover { - case e => + } catch { + case e: Exception => throw new FlinkException( s"[StreamPark] Do ${request.getClass.getSimpleName} for the job ${request.jobId} failed. " + s"detail: ${Utils.stringifyException(e)}"); - }.get + } finally { + if (clusterClient != null) clusterClient.close() + } } override def doTriggerSavepoint(