Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support spark task distribution for streampark #4102

Merged
merged 1 commit into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.bean;

import lombok.Data;

import java.io.Serializable;

@Data
public class SparkTaskItem implements Serializable {

/** appId */
private Long appId;

private Boolean autoStart;

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.console.base.mybatis.entity.BaseEntity;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.bean.Dependency;
Expand Down Expand Up @@ -53,7 +54,6 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand All @@ -64,7 +64,7 @@
@Data
@TableName("t_flink_app")
@Slf4j
public class FlinkApplication implements Serializable {
public class FlinkApplication extends BaseEntity {

@TableId(type = IdType.INPUT)
private Long id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,17 @@ public enum DistributedTaskEnum {
/**
* Forces the given application to stop.
*/
ABORT(4);
ABORT(4),

/**
* Stop the given application.
*/
STOP(5),

/**
* Forces the given application to stop.
*/
FORCED_STOP(6);

private final int value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.streampark.console.core.service;

import org.apache.streampark.console.base.mybatis.entity.BaseEntity;
import org.apache.streampark.console.core.entity.DistributedTask;
import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.enums.DistributedTaskEnum;

import com.baomidou.mybatisplus.extension.service.IService;

import java.util.List;
import java.util.Set;

/**
Expand All @@ -40,16 +39,9 @@ public interface DistributedTaskService extends IService<DistributedTask> {

/**
* This interface is responsible for polling the database to retrieve task records and execute the corresponding operations.
* @param DistributedTask DistributedTask
* @param distributedTask distributedTask
*/
void executeDistributedTask(DistributedTask DistributedTask) throws Exception;

/**
* Through this interface, the watcher obtains the list of tasks that need to be monitored.
* @param applications List<Application>
* @return List<Application> List of tasks that need to be monitored
*/
List<FlinkApplication> getMonitoredTaskList(List<FlinkApplication> applications);
void executeDistributedTask(DistributedTask distributedTask) throws Exception;

/**
* This interface handles task redistribution when server nodes are added.
Expand All @@ -74,9 +66,9 @@ public interface DistributedTaskService extends IService<DistributedTask> {
/**
* Save Distributed Task.
*
* @param appParam Application
* @param appParam It may be one of the following values: FlinkApplication, SparkApplication
* @param autoStart boolean
* @param action It may be one of the following values: START, RESTART, REVOKE, CANCEL, ABORT
*/
public void saveDistributedTask(FlinkApplication appParam, boolean autoStart, DistributedTaskEnum action);
public void saveDistributedTask(BaseEntity appParam, boolean autoStart, DistributedTaskEnum action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
import org.apache.streampark.console.core.entity.SparkEnv;
import org.apache.streampark.console.core.entity.SparkSql;
import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
import org.apache.streampark.console.core.enums.DistributedTaskEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.enums.SparkAppStateEnum;
import org.apache.streampark.console.core.enums.SparkOperationEnum;
import org.apache.streampark.console.core.enums.SparkOptionStateEnum;
import org.apache.streampark.console.core.mapper.SparkApplicationMapper;
import org.apache.streampark.console.core.service.DistributedTaskService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.SparkEnvService;
import org.apache.streampark.console.core.service.SparkSqlService;
Expand Down Expand Up @@ -129,13 +131,21 @@ public class SparkApplicationActionServiceImpl
@Autowired
private ResourceService resourceService;

@Autowired
private DistributedTaskService distributedTaskService;

private final Map<Long, CompletableFuture<SubmitResponse>> startJobFutureMap = new ConcurrentHashMap<>();

private final Map<Long, CompletableFuture<CancelResponse>> cancelJobFutureMap = new ConcurrentHashMap<>();

@Override
public void revoke(Long appId) throws ApplicationException {
SparkApplication application = getById(appId);
// For HA purposes, if the task is not processed locally, save the Distribution task and return
if (!distributedTaskService.isLocalProcessing(appId)) {
distributedTaskService.saveDistributedTask(application, false, DistributedTaskEnum.REVOKE);
return;
}
ApiAlertException.throwIfNull(
application, String.format("The application id=%s not found, revoke failed.", appId));

Expand All @@ -161,15 +171,25 @@ public void revoke(Long appId) throws ApplicationException {

@Override
public void restart(SparkApplication appParam) throws Exception {
// For HA purposes, if the task is not processed locally, save the Distribution task and return
if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.RESTART);
return;
}
this.stop(appParam);
this.start(appParam, false);
}

@Override
public void forcedStop(Long id) {
SparkApplication application = this.baseMapper.selectApp(id);
// For HA purposes, if the task is not processed locally, save the Distribution task and return
if (!distributedTaskService.isLocalProcessing(id)) {
distributedTaskService.saveDistributedTask(application, false, DistributedTaskEnum.FORCED_STOP);
return;
}
CompletableFuture<SubmitResponse> startFuture = startJobFutureMap.remove(id);
CompletableFuture<CancelResponse> stopFuture = cancelJobFutureMap.remove(id);
SparkApplication application = this.baseMapper.selectApp(id);
if (startFuture != null) {
startFuture.cancel(true);
}
Expand All @@ -183,6 +203,11 @@ public void forcedStop(Long id) {

@Override
public void stop(SparkApplication appParam) throws Exception {
// For HA purposes, if the task is not processed locally, save the Distribution task and return
if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.STOP);
return;
}
SparkAppHttpWatcher.setOptionState(appParam.getId(), SparkOptionStateEnum.STOPPING);
SparkApplication application = getById(appParam.getId());
application.setState(SparkAppStateEnum.STOPPING.getValue());
Expand Down Expand Up @@ -245,6 +270,11 @@ public void stop(SparkApplication appParam) throws Exception {

@Override
public void start(SparkApplication appParam, boolean auto) throws Exception {
// For HA purposes, if the task is not processed locally, save the Distribution task and return
if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.START);
return;
}
// 1) check application
final SparkApplication application = getById(appParam.getId());
AssertUtils.notNull(application);
Expand Down
Loading
Loading