Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] Remove LogClientService and minor code refactor #2979

Merged
merged 4 commits into from
Aug 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
* Apache Flink & Spark stream processing application development framework
* Out-of-the-box connectors
* Support multiple versions of Flink & Spark
* Scala 2.11 / 2.12 support
* Scala 2.12 support
* One-stop stream processing management platform
* Support catalog、olap、streaming-warehouse etc.
* ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,28 @@

package org.apache.streampark.console.base.util;

import org.apache.streampark.console.base.exception.ApiDetailException;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** The file utils. */
public class FileUtils {

private FileUtils() {}

/**
* Read the end of the file.
* Reads the last portion of a file as a byte array.
*
* @param file The file
* @param maxSize Maximum size of read file
* @return The file content
* @throws IOException
* @param file the file to read
* @param maxSize the maximum number of bytes to read from the end of the file
* @return the byte array containing the content read from the file
* @throws IOException if an I/O error occurs
*/
public static byte[] readEndOfFile(File file, long maxSize) throws IOException {
long readSize = maxSize;
Expand All @@ -50,12 +56,14 @@ public static byte[] readEndOfFile(File file, long maxSize) throws IOException {
}

/**
* Read the end of the file.
* Read the content of a file from a specified offset.
*
* @param file The file
* @param maxSize Maximum size of read file
* @return The file content
* @throws IOException
* @param file The file to read from
* @param startOffset The offset from where to start reading the file
* @param maxSize The maximum size of the file to read
* @return The content of the file as a byte array
* @throws IOException if an I/O error occurs while reading the file
* @throws IllegalArgumentException if the startOffset is greater than the file length
*/
public static byte[] readFileFromOffset(File file, long startOffset, long maxSize)
throws IOException {
Expand All @@ -73,4 +81,27 @@ public static byte[] readFileFromOffset(File file, long startOffset, long maxSiz
}
return fileContent;
}

/**
* Roll View Log.
*
* @param path The file path.
* @param offset The offset.
* @param limit The limit.
* @return The content of the file.
* @throws ApiDetailException if there's an error rolling the view log.
*/
public static String rollViewLog(String path, int offset, int limit) {
try {
File file = new File(path);
if (file.exists() && file.isFile()) {
try (Stream<String> stream = Files.lines(Paths.get(path))) {
return stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n"));
}
}
return null;
} catch (Exception e) {
throw new ApiDetailException("roll view log error: " + e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,10 @@ List<String> getRecentK8sClusterId(

void resetOptionState();

Boolean existsByTeamId(@Param("teamId") Long teamId);

Boolean existsByJobName(@Param("jobName") String jobName);

Boolean existsByUserId(@Param("userId") Long userId);

List<Application> getByProjectId(@Param("projectId") Long id);

boolean existsRunningJobByClusterId(@Param("clusterId") Long clusterId);

boolean existsJobByClusterId(@Param("clusterId") Long clusterId);

Integer countJobsByClusterId(@Param("clusterId") Long clusterId);

Integer countAffectedJobsByClusterId(
Integer countAffectedByClusterId(
@Param("clusterId") Long clusterId, @Param("dbType") String dbType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ List<Application> getByTeamIdAndExecutionModes(

void forcedStop(Application app);

boolean existsRunningJobByClusterId(Long clusterId);
boolean existsRunningByClusterId(Long clusterId);

boolean existsJobByClusterId(Long clusterId);
boolean existsByClusterId(Long clusterId);

Integer countJobsByClusterId(Long clusterId);
Integer countByClusterId(Long clusterId);

Integer countAffectedJobsByClusterId(Long clusterId, String dbType);
Integer countAffectedByClusterId(Long clusterId, String dbType);

boolean existsJobByFlinkEnvId(Long flinkEnvId);
boolean existsByFlinkEnvId(Long flinkEnvId);

List<String> getRecentK8sNamespace();

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.LogClientService;
import org.apache.streampark.console.core.service.ProjectService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.SavePointService;
Expand Down Expand Up @@ -214,8 +213,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli

@Autowired private VariableService variableService;

@Autowired private LogClientService logClient;

@Autowired private YarnQueueService yarnQueueService;

@Autowired private FlinkClusterWatcher flinkClusterWatcher;
Expand Down Expand Up @@ -504,49 +501,49 @@ record -> {

@Override
public boolean existsByTeamId(Long teamId) {
return baseMapper.existsByTeamId(teamId);
return baseMapper.exists(
new LambdaQueryWrapper<Application>().eq(Application::getTeamId, teamId));
}

@Override
public boolean existsByUserId(Long userId) {
return baseMapper.existsByUserId(userId);
return baseMapper.exists(
new LambdaQueryWrapper<Application>().eq(Application::getUserId, userId));
}

@Override
public boolean existsRunningJobByClusterId(Long clusterId) {
boolean exists = baseMapper.existsRunningJobByClusterId(clusterId);
if (exists) {
return true;
}
for (Application application : FlinkHttpWatcher.getWatchingApps()) {
if (clusterId.equals(application.getFlinkClusterId())
&& FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum())) {
return true;
}
}
return false;
public boolean existsRunningByClusterId(Long clusterId) {
return baseMapper.existsRunningJobByClusterId(clusterId)
|| FlinkHttpWatcher.getWatchingApps().stream()
.anyMatch(
application ->
clusterId.equals(application.getFlinkClusterId())
&& FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum()));
}

@Override
public boolean existsJobByClusterId(Long clusterId) {
return baseMapper.existsJobByClusterId(clusterId);
public boolean existsByClusterId(Long clusterId) {
return baseMapper.exists(
new LambdaQueryWrapper<Application>().eq(Application::getFlinkClusterId, clusterId));
}

@Override
public Integer countJobsByClusterId(Long clusterId) {
return baseMapper.countJobsByClusterId(clusterId);
public Integer countByClusterId(Long clusterId) {
return baseMapper
.selectCount(
new LambdaQueryWrapper<Application>().eq(Application::getFlinkClusterId, clusterId))
.intValue();
}

@Override
public Integer countAffectedJobsByClusterId(Long clusterId, String dbType) {
return baseMapper.countAffectedJobsByClusterId(clusterId, dbType);
public Integer countAffectedByClusterId(Long clusterId, String dbType) {
return baseMapper.countAffectedByClusterId(clusterId, dbType);
}

@Override
public boolean existsJobByFlinkEnvId(Long flinkEnvId) {
LambdaQueryWrapper<Application> lambdaQueryWrapper =
new LambdaQueryWrapper<Application>().eq(Application::getVersionId, flinkEnvId);
return getBaseMapper().exists(lambdaQueryWrapper);
public boolean existsByFlinkEnvId(Long flinkEnvId) {
return baseMapper.exists(
new LambdaQueryWrapper<Application>().eq(Application::getVersionId, flinkEnvId));
}

@Override
Expand Down Expand Up @@ -623,7 +620,8 @@ public String k8sStartLog(Long id, Integer offset, Integer limit) throws Excepti
future.cancel(true);
}
if (path != null) {
return logClient.rollViewLog(path, offset, limit);
return org.apache.streampark.console.base.util.FileUtils.rollViewLog(
path, offset, limit);
}
return null;
})
Expand Down Expand Up @@ -652,7 +650,13 @@ public String getYarnName(Application appParam) {
return ParameterCli.read(args);
}

/** Check if the current jobName and other key identifiers already exist in db and yarn/k8s */
/**
* Check if the current jobName and other key identifiers already exist in the database and
* yarn/k8s.
*
* @param appParam The application to check for existence.
* @return The state of the application's existence.
*/
@Override
public AppExistsState checkExists(Application appParam) {

Expand All @@ -672,14 +676,8 @@ public AppExistsState checkExists(Application appParam) {
return AppExistsState.IN_DB;
}

FlinkAppState state = FlinkAppState.of(app.getState());
// has stopped status
if (state.equals(FlinkAppState.ADDED)
|| state.equals(FlinkAppState.CREATED)
|| state.equals(FlinkAppState.FAILED)
|| state.equals(FlinkAppState.CANCELED)
|| state.equals(FlinkAppState.LOST)
|| state.equals(FlinkAppState.KILLED)) {
if (FlinkAppState.isEndState(app.getState())) {
// check whether jobName exists on yarn
if (ExecutionMode.isYarnMode(appParam.getExecutionMode())
&& YarnUtils.isContains(appParam.getJobName())) {
Expand Down Expand Up @@ -758,7 +756,8 @@ public boolean create(Application appParam) {
}

private boolean existsByJobName(String jobName) {
return this.baseMapper.existsByJobName(jobName);
return baseMapper.exists(
new LambdaQueryWrapper<Application>().eq(Application::getJobName, jobName));
}

@SuppressWarnings("checkstyle:WhitespaceAround")
Expand Down Expand Up @@ -879,7 +878,7 @@ public boolean update(Application appParam) {
}
}

// 2) k8s podTemplate changed..
// 2) k8s podTemplate changed.
if (application.getBuild() && ExecutionMode.isKubernetesMode(appParam.getExecutionMode())) {
if (ObjectUtils.trimNoEquals(
application.getK8sRestExposedType(), appParam.getK8sRestExposedType())
Expand Down Expand Up @@ -1386,7 +1385,7 @@ public String checkSavepointPath(Application appParam) throws Exception {
"This state.savepoints.dir value "
+ savepointPath
+ " path part to store the checkpoint data in is null. Please specify a directory path for the checkpoint data.";
} else if (pathPart.length() == 0 || "/".equals(pathPart)) {
} else if (pathPart.isEmpty() || "/".equals(pathPart)) {
error =
"This state.savepoints.dir value "
+ savepointPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public Boolean allowShutdownCluster(FlinkCluster cluster) {
checkActiveIfNeeded(flinkCluster);

// 3) check job if running on cluster
boolean existsRunningJob = applicationService.existsRunningJobByClusterId(flinkCluster.getId());
boolean existsRunningJob = applicationService.existsRunningByClusterId(flinkCluster.getId());
ApiAlertException.throwIfTrue(
existsRunningJob, "Some app is running on this cluster, the cluster cannot be shutdown");

Expand Down Expand Up @@ -330,7 +330,7 @@ public void delete(FlinkCluster cluster) {
}

ApiAlertException.throwIfTrue(
applicationService.existsJobByClusterId(id),
applicationService.existsByClusterId(id),
"Some app on this cluster, the cluster cannot be delete, please check.");
removeById(id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private void checkOrElseAlert(FlinkEnv flinkEnv) {

// 3.check if it is being used by any application
ApiAlertException.throwIfTrue(
applicationService.existsJobByFlinkEnvId(flinkEnv.getId()),
applicationService.existsByFlinkEnvId(flinkEnv.getId()),
"The flink home is still in use by some application, please check.");
}
}
Loading
Loading