Skip to content

Commit

Permalink
Merge branch 'dev' into pyflink
Browse files Browse the repository at this point in the history
# Conflicts:
#	streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
  • Loading branch information
wolfboys committed Sep 3, 2023
2 parents aa0eff2 + 6a5e6e4 commit 9ca15a0
Show file tree
Hide file tree
Showing 36 changed files with 2,744 additions and 2,244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.enums.PermissionType;
import org.apache.streampark.console.core.enums.UserType;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.CommonService;
import org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.console.system.entity.AccessToken;
import org.apache.streampark.console.system.entity.Member;
Expand Down Expand Up @@ -61,7 +61,7 @@ public class StreamParkAspect {
@Autowired private FlinkHttpWatcher flinkHttpWatcher;
@Autowired private CommonService commonService;
@Autowired private MemberService memberService;
@Autowired private ApplicationService applicationService;
@Autowired private ApplicationManageService applicationManageService;

@Pointcut(
"execution(public"
Expand Down Expand Up @@ -128,7 +128,7 @@ public RestResponse permissionAction(ProceedingJoinPoint joinPoint) throws Throw
"Permission denied, only user belongs to this team can access this permission");
break;
case APP:
Application app = applicationService.getById(paramId);
Application app = applicationManageService.getById(paramId);
ApiAlertException.throwIfTrue(app == null, "Invalid operation, application is null");
member = memberService.findByUserName(app.getTeamId(), currentUser.getUsername());
ApiAlertException.throwIfTrue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,17 @@
import org.apache.streampark.console.core.annotation.ApiAccess;
import org.apache.streampark.console.core.annotation.AppUpdated;
import org.apache.streampark.console.core.annotation.PermissionAction;
import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationBackUp;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.enums.AppExistsState;
import org.apache.streampark.console.core.enums.PermissionType;
import org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.ApplicationBackUpService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
import org.apache.streampark.console.core.service.application.ApplicationActionService;
import org.apache.streampark.console.core.service.application.ApplicationInfoService;
import org.apache.streampark.console.core.service.application.ApplicationManageService;

import org.apache.shiro.authz.annotation.RequiresPermissions;

Expand All @@ -61,10 +60,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Tag(name = "FLINK_APPLICATION_TAG")
@Slf4j
Expand All @@ -73,22 +69,24 @@
@RequestMapping("flink/app")
public class ApplicationController {

@Autowired private ApplicationService applicationService;
@Autowired private ApplicationManageService applicationManageService;

@Autowired private ApplicationActionService applicationActionService;

@Autowired private ApplicationInfoService applicationInfoService;

@Autowired private ApplicationBackUpService backUpService;

@Autowired private ApplicationLogService applicationLogService;

@Autowired private AppBuildPipeService appBuildPipeService;

@Autowired private ResourceService resourceService;

@Operation(summary = "Get application")
@ApiAccess
@PostMapping("get")
@RequiresPermissions("app:detail")
public RestResponse get(Application app) {
Application application = applicationService.getApp(app);
Application application = applicationManageService.getApp(app);
return RestResponse.success(application);
}

Expand All @@ -98,7 +96,7 @@ public RestResponse get(Application app) {
@PostMapping("create")
@RequiresPermissions("app:create")
public RestResponse create(Application app) throws IOException {
boolean saved = applicationService.create(app);
boolean saved = applicationManageService.create(app);
return RestResponse.success(saved);
}

Expand All @@ -124,12 +122,8 @@ public RestResponse create(Application app) throws IOException {
@PostMapping(value = "copy")
@RequiresPermissions("app:copy")
public RestResponse copy(@Parameter(hidden = true) Application app) throws IOException {
Long id = applicationService.copy(app);
Map<String, String> data = new HashMap<>();
data.put("id", Long.toString(id));
return id.equals(0L)
? RestResponse.success(false).data(data)
: RestResponse.success(true).data(data);
applicationManageService.copy(app);
return RestResponse.success();
}

@Operation(summary = "Update application")
Expand All @@ -138,14 +132,14 @@ public RestResponse copy(@Parameter(hidden = true) Application app) throws IOExc
@PostMapping("update")
@RequiresPermissions("app:update")
public RestResponse update(Application app) {
applicationService.update(app);
applicationManageService.update(app);
return RestResponse.success(true);
}

@Operation(summary = "Get applications dashboard data")
@PostMapping("dashboard")
public RestResponse dashboard(Long teamId) {
Map<String, Serializable> map = applicationService.dashboard(teamId);
Map<String, Serializable> map = applicationInfoService.dashboard(teamId);
return RestResponse.success(map);
}

Expand All @@ -154,35 +148,7 @@ public RestResponse dashboard(Long teamId) {
@PostMapping("list")
@RequiresPermissions("app:view")
public RestResponse list(Application app, RestRequest request) {
IPage<Application> applicationList = applicationService.page(app, request);
List<Application> appRecords = applicationList.getRecords();
List<Long> appIds = appRecords.stream().map(Application::getId).collect(Collectors.toList());
Map<Long, PipelineStatus> pipeStates = appBuildPipeService.listPipelineStatus(appIds);

// add building pipeline status info and app control info
appRecords =
appRecords.stream()
.peek(
e -> {
if (pipeStates.containsKey(e.getId())) {
e.setBuildStatus(pipeStates.get(e.getId()).getCode());
}
})
.peek(
e -> {
AppControl appControl =
new AppControl()
.setAllowBuild(
e.getBuildStatus() == null
|| !PipelineStatus.running.getCode().equals(e.getBuildStatus()))
.setAllowStart(
!e.shouldBeTrack()
&& PipelineStatus.success.getCode().equals(e.getBuildStatus()))
.setAllowStop(e.isRunning());
e.setAppControl(appControl);
})
.collect(Collectors.toList());
applicationList.setRecords(appRecords);
IPage<Application> applicationList = applicationManageService.page(app, request);
return RestResponse.success(applicationList);
}

Expand All @@ -191,7 +157,7 @@ public RestResponse list(Application app, RestRequest request) {
@PostMapping("mapping")
@RequiresPermissions("app:mapping")
public RestResponse mapping(Application app) {
boolean flag = applicationService.mapping(app);
boolean flag = applicationInfoService.mapping(app);
return RestResponse.success(flag);
}

Expand All @@ -201,7 +167,7 @@ public RestResponse mapping(Application app) {
@PostMapping("revoke")
@RequiresPermissions("app:release")
public RestResponse revoke(Application app) {
applicationService.revoke(app);
applicationActionService.revoke(app);
return RestResponse.success();
}

Expand Down Expand Up @@ -241,8 +207,7 @@ public RestResponse revoke(Application app) {
@RequiresPermissions("app:start")
public RestResponse start(@Parameter(hidden = true) Application app) {
try {
applicationService.checkEnv(app);
applicationService.start(app, false);
applicationActionService.start(app, false);
return RestResponse.success(true);
} catch (Exception e) {
return RestResponse.success(false).message(e.getMessage());
Expand Down Expand Up @@ -292,7 +257,7 @@ public RestResponse start(@Parameter(hidden = true) Application app) {
@PostMapping(value = "cancel")
@RequiresPermissions("app:cancel")
public RestResponse cancel(@Parameter(hidden = true) Application app) throws Exception {
applicationService.cancel(app);
applicationActionService.cancel(app);
return RestResponse.success();
}

Expand All @@ -303,7 +268,7 @@ public RestResponse cancel(@Parameter(hidden = true) Application app) throws Exc
@PostMapping("clean")
@RequiresPermissions("app:clean")
public RestResponse clean(Application app) {
applicationService.clean(app);
applicationManageService.clean(app);
return RestResponse.success(true);
}

Expand All @@ -313,7 +278,7 @@ public RestResponse clean(Application app) {
@PostMapping("forcedStop")
@RequiresPermissions("app:cancel")
public RestResponse forcedStop(Application app) {
applicationService.forcedStop(app);
applicationActionService.forcedStop(app);
return RestResponse.success();
}

Expand All @@ -326,28 +291,28 @@ public RestResponse yarn() {
@Operation(summary = "Get application on yarn name")
@PostMapping("name")
public RestResponse yarnName(Application app) {
String yarnName = applicationService.getYarnName(app);
String yarnName = applicationInfoService.getYarnName(app);
return RestResponse.success(yarnName);
}

@Operation(summary = "Check the application exist status")
@PostMapping("checkName")
public RestResponse checkName(Application app) {
AppExistsState exists = applicationService.checkExists(app);
AppExistsState exists = applicationInfoService.checkExists(app);
return RestResponse.success(exists.get());
}

@Operation(summary = "Get application conf")
@PostMapping("readConf")
public RestResponse readConf(Application app) throws IOException {
String config = applicationService.readConf(app);
String config = applicationInfoService.readConf(app);
return RestResponse.success(config);
}

@Operation(summary = "Get application main-class")
@PostMapping("main")
public RestResponse getMain(Application application) {
String mainClass = applicationService.getMain(application);
String mainClass = applicationInfoService.getMain(application);
return RestResponse.success(mainClass);
}

Expand Down Expand Up @@ -379,7 +344,7 @@ public RestResponse deleteOperationLog(ApplicationLog applicationLog) {
@PostMapping("delete")
@RequiresPermissions("app:delete")
public RestResponse delete(Application app) throws InternalException {
Boolean deleted = applicationService.delete(app);
Boolean deleted = applicationManageService.delete(app);
return RestResponse.success(deleted);
}

Expand Down Expand Up @@ -437,7 +402,7 @@ public RestResponse verifySchema(String path) {
@Operation(summary = "Check the application savepoint path")
@PostMapping("checkSavepointPath")
public RestResponse checkSavepointPath(Application app) throws Exception {
String error = applicationService.checkSavepointPath(app);
String error = applicationInfoService.checkSavepointPath(app);
if (error == null) {
return RestResponse.success(true);
} else {
Expand Down Expand Up @@ -468,7 +433,7 @@ public RestResponse checkSavepointPath(Application app) throws Exception {
})
@PostMapping(value = "k8sStartLog")
public RestResponse k8sStartLog(Long id, Integer offset, Integer limit) throws Exception {
String resp = applicationService.k8sStartLog(id, offset, limit);
String resp = applicationInfoService.k8sStartLog(id, offset, limit);
return RestResponse.success(resp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.application.ApplicationInfoService;

import org.apache.shiro.authz.annotation.RequiresPermissions;

Expand All @@ -42,21 +42,21 @@
@RequestMapping("flink/history")
public class ApplicationHistoryController {

@Autowired private ApplicationService applicationService;
@Autowired private ApplicationInfoService applicationInfoService;

@Operation(summary = "List the upload jar history records")
@PostMapping("uploadJars")
@RequiresPermissions("app:create")
public RestResponse listUploadJars() {
List<String> jars = applicationService.historyUploadJars();
List<String> jars = applicationInfoService.historyUploadJars();
return RestResponse.success(jars);
}

@Operation(summary = "List the k8s namespace history records")
@PostMapping("k8sNamespaces")
@RequiresPermissions("app:create")
public RestResponse listK8sNamespace() {
List<String> namespaces = applicationService.getRecentK8sNamespace();
List<String> namespaces = applicationInfoService.getRecentK8sNamespace();
return RestResponse.success(namespaces);
}

Expand All @@ -69,7 +69,7 @@ public RestResponse listSessionClusterId(int executionMode) {
case KUBERNETES_NATIVE_SESSION:
case YARN_SESSION:
case REMOTE:
clusterIds = applicationService.getRecentK8sClusterId(executionMode);
clusterIds = applicationInfoService.getRecentK8sClusterId(executionMode);
break;
default:
clusterIds = new ArrayList<>(0);
Expand All @@ -82,31 +82,31 @@ public RestResponse listSessionClusterId(int executionMode) {
@PostMapping("flinkBaseImages")
@RequiresPermissions("app:create")
public RestResponse listFlinkBaseImage() {
List<String> images = applicationService.getRecentFlinkBaseImage();
List<String> images = applicationInfoService.getRecentFlinkBaseImage();
return RestResponse.success(images);
}

@Operation(summary = "List the flink pod template history records")
@PostMapping("flinkPodTemplates")
@RequiresPermissions("app:create")
public RestResponse listPodTemplate() {
List<String> templates = applicationService.getRecentK8sPodTemplate();
List<String> templates = applicationInfoService.getRecentK8sPodTemplate();
return RestResponse.success(templates);
}

@Operation(summary = "List the flink JM pod template history records")
@PostMapping("flinkJmPodTemplates")
@RequiresPermissions("app:create")
public RestResponse listJmPodTemplate() {
List<String> templates = applicationService.getRecentK8sJmPodTemplate();
List<String> templates = applicationInfoService.getRecentK8sJmPodTemplate();
return RestResponse.success(templates);
}

@Operation(summary = "List the flink TM pod template history records")
@PostMapping("flinkTmPodTemplates")
@RequiresPermissions("app:create")
public RestResponse listTmPodTemplate() {
List<String> templates = applicationService.getRecentK8sTmPodTemplate();
List<String> templates = applicationInfoService.getRecentK8sTmPodTemplate();
return RestResponse.success(templates);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.streampark.console.core.annotation.ApiAccess;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.SavePoint;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.application.ApplicationManageService;

import org.apache.shiro.authz.annotation.RequiresPermissions;

Expand All @@ -50,7 +50,7 @@
@RequestMapping("flink/savepoint")
public class SavePointController {

@Autowired private ApplicationService applicationService;
@Autowired private ApplicationManageService applicationManageService;

@Autowired private SavePointService savePointService;

Expand All @@ -73,7 +73,7 @@ public RestResponse history(SavePoint savePoint, RestRequest request) {
@RequiresPermissions("savepoint:delete")
public RestResponse delete(Long id) throws InternalException {
SavePoint savePoint = savePointService.getById(id);
Application application = applicationService.getById(savePoint.getAppId());
Application application = applicationManageService.getById(savePoint.getAppId());
Boolean deleted = savePointService.delete(id, application);
return RestResponse.success(deleted);
}
Expand Down
Loading

0 comments on commit 9ca15a0

Please sign in to comment.