Skip to content

Commit

Permalink
[improvement] improvement flink cluster verifyClusterConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
xujiangfeng001 committed Jul 27, 2023
1 parent af0df56 commit 83021ef
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.util.CommonUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.core.metrics.flink.Overview;
import org.apache.streampark.console.core.utils.YarnQueueLabelExpression;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -156,51 +153,6 @@ public URI getRemoteURI() {
return null;
}

/**
* Verify the cluster connection whether is valid.
*
* @return <code>false</code> if the connection of the cluster is invalid, <code>true</code> else.
*/
public boolean verifyClusterConnection() {
if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
if (address == null) {
return false;
}
// 1) check url is Legal
if (!CommonUtils.isLegalUrl(address)) {
return false;
}
// 2) check connection
try {
String restUrl = address + "/overview";
String result =
HttpClientUtils.httpGetRequest(
restUrl,
RequestConfig.custom().setConnectTimeout(2000, TimeUnit.MILLISECONDS).build());
JacksonUtils.read(result, Overview.class);
return true;
} catch (Exception ignored) {
//
}
return false;
}
if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
try {
String restUrl = YarnUtils.getRMWebAppURL(true) + "/proxy/" + this.clusterId + "/overview";
String result =
HttpClientUtils.httpGetRequest(
restUrl,
RequestConfig.custom().setConnectTimeout(2000, TimeUnit.MILLISECONDS).build());
JacksonUtils.read(result, Overview.class);
return true;
} catch (Exception ignored) {
//
}
return false;
}
return false;
}

@JsonIgnore
public Map<String, String> getFlinkConfig() throws JsonProcessingException {
String restUrl = this.address + "/jobmanager/config";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.VariableService;
import org.apache.streampark.console.core.service.YarnQueueService;
import org.apache.streampark.console.core.task.FlinkClusterWatcher;
import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.CancelRequest;
Expand Down Expand Up @@ -216,6 +217,8 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli

@Autowired private YarnQueueService yarnQueueService;

@Autowired private FlinkClusterWatcher flinkClusterWatcher;

@PostConstruct
public void resetOptionState() {
this.baseMapper.resetOptionState();
Expand Down Expand Up @@ -430,7 +433,7 @@ public boolean checkEnv(Application appParam) throws ApplicationException {
if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())
|| ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
FlinkCluster flinkCluster = flinkClusterService.getById(application.getFlinkClusterId());
boolean conned = flinkCluster.verifyClusterConnection();
boolean conned = flinkClusterWatcher.verifyClusterConnection(flinkCluster);
if (!conned) {
throw new ApiAlertException("the target cluster is unavailable, please check!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli

@Autowired private YarnQueueService yarnQueueService;

@Autowired private FlinkClusterWatcher flinkClusterWatcher;

@Override
public ResponseResult check(FlinkCluster cluster) {
ResponseResult result = new ResponseResult();
Expand All @@ -118,14 +120,14 @@ public ResponseResult check(FlinkCluster cluster) {

// 3) Check connection
if (ExecutionMode.isRemoteMode(cluster.getExecutionModeEnum())
&& !cluster.verifyClusterConnection()) {
&& !flinkClusterWatcher.verifyClusterConnection(cluster)) {
result.setMsg("The remote cluster connection failed, please check!");
result.setStatus(3);
return result;
}
if (ExecutionMode.isYarnMode(cluster.getExecutionModeEnum())
&& cluster.getClusterId() != null
&& !cluster.verifyClusterConnection()) {
&& !flinkClusterWatcher.verifyClusterConnection(cluster)) {
result.setMsg("The flink cluster connection failed, please check!");
result.setStatus(4);
return result;
Expand Down Expand Up @@ -411,7 +413,7 @@ private void checkActiveIfNeeded(FlinkCluster flinkCluster) {
ApiAlertException.throwIfFalse(
ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
"Current cluster is not active, please check!");
if (!flinkCluster.verifyClusterConnection()) {
if (!flinkClusterWatcher.verifyClusterConnection(flinkCluster)) {
flinkCluster.setClusterState(ClusterState.LOST.getValue());
updateById(flinkCluster);
throw new ApiAlertException("Current cluster is not active, please check!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,7 @@ public ClusterState getClusterState(FlinkCluster flinkCluster) {
if (state != null) {
return state;
}
switch (flinkCluster.getExecutionModeEnum()) {
case REMOTE:
state = httpRemoteClusterState(flinkCluster);
break;
case YARN_SESSION:
state = httpYarnSessionClusterState(flinkCluster);
break;
default:
state = ClusterState.UNKNOWN;
break;
}
state = httpClusterState(flinkCluster);
if (ClusterState.isRunning(state)) {
FAILED_STATES.invalidate(flinkCluster.getId());
} else {
Expand Down Expand Up @@ -192,6 +182,23 @@ private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) {
return state;
}

/**
* get flink cluster state
*
* @param flinkCluster
* @return
*/
private ClusterState httpClusterState(FlinkCluster flinkCluster) {
switch (flinkCluster.getExecutionModeEnum()) {
case REMOTE:
return httpRemoteClusterState(flinkCluster);
case YARN_SESSION:
return httpYarnSessionClusterState(flinkCluster);
default:
return ClusterState.UNKNOWN;
}
}

/**
* cluster get state from flink rest api
*
Expand Down Expand Up @@ -272,9 +279,18 @@ public static void unWatching(FlinkCluster flinkCluster) {
* @return
*/
private ClusterState yarnStateConvertClusterState(YarnApplicationState state) {
if (state == YarnApplicationState.FINISHED) {
return ClusterState.CANCELED;
}
return ClusterState.of(state.toString());
return state == YarnApplicationState.FINISHED
? ClusterState.CANCELED
: ClusterState.of(state.toString());
}

/**
* Verify the cluster connection whether is valid.
*
* @return <code>false</code> if the connection of the cluster is invalid, <code>true</code> else.
*/
public Boolean verifyClusterConnection(FlinkCluster flinkCluster) {
ClusterState clusterState = httpClusterState(flinkCluster);
return ClusterState.isRunning(clusterState) ? true : false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.streampark.console.core.task;

import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.ThreadUtils;
Expand Down Expand Up @@ -784,8 +783,7 @@ private void doAlert(Application app, FlinkAppState appState) {
case YARN_SESSION:
case REMOTE:
FlinkCluster flinkCluster = flinkClusterService.getById(app.getFlinkClusterId());
ClusterState clusterState = flinkClusterWatcher.getClusterState(flinkCluster);
if (ClusterState.isRunning(clusterState)) {
if (flinkClusterWatcher.verifyClusterConnection(flinkCluster)) {
log.info(
"application with id {} is yarn session or remote and flink cluster with id {} is alive, application send alert",
app.getId(),
Expand Down

0 comments on commit 83021ef

Please sign in to comment.