Skip to content

Commit

Permalink
[Feature-2989][dolphinscheduler] Dolphinscheduler supports common sql…
Browse files Browse the repository at this point in the history
… and variables
  • Loading branch information
aiwenmo committed Jan 12, 2024
1 parent 2b71dfe commit 7d5327f
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 22 deletions.
27 changes: 15 additions & 12 deletions dinky-admin/src/main/java/org/dinky/controller/APIController.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.dinky.controller;

import org.dinky.DinkyVersion;
import org.dinky.data.annotations.Log;
import org.dinky.data.dto.APISavePointTaskDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskSubmitDto;
import org.dinky.data.enums.BusinessType;
Expand Down Expand Up @@ -65,18 +67,10 @@ public class APIController {
private final TaskService taskService;
private final JobInstanceService jobInstanceService;

// Interface compatible with DolphinScheduler
@GetMapping("/submitTask")
@ApiOperation("Submit Task")
public Result<JobResult> submitTask(@RequestParam Integer id) throws Exception {
taskService.initTenantByTaskId(id);
JobResult jobResult =
taskService.submitTask(TaskSubmitDto.builder().id(id).build());
if (jobResult.isSuccess()) {
return Result.succeed(jobResult, Status.EXECUTE_SUCCESS);
} else {
return Result.failed(jobResult, jobResult.getError());
}
@GetMapping("/version")
@ApiOperation(value = "Query Service Version", notes = "Query Dinky Service Version Number")
public Result<String> getVersionInfo() {
return Result.succeed(DinkyVersion.getVersion(), "Get success");
}

@PostMapping("/submitTask")
Expand All @@ -92,6 +86,15 @@ public Result<JobResult> submitTask(@RequestBody TaskSubmitDto submitDto) throws
}
}

@PostMapping("/savepointTask")
public Result savepointTask(@RequestBody APISavePointTaskDTO apiSavePointTaskDTO) {
return Result.succeed(
taskService.savepointTaskJob(
taskService.getTaskInfoById(apiSavePointTaskDTO.getTaskId()),
SavePointType.get(apiSavePointTaskDTO.getType())),
Status.EXECUTE_SUCCESS);
}

@GetMapping("/cancel")
// @Log(title = "Cancel Flink Job", businessType = BusinessType.TRIGGER)
@ApiOperation("Cancel Flink Job")
Expand Down
15 changes: 15 additions & 0 deletions dinky-admin/src/main/java/org/dinky/data/dto/TaskSubmitDto.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,25 @@
@Data
@Builder
public class TaskSubmitDto {
public TaskSubmitDto() {}

public TaskSubmitDto(Integer id, Boolean isOnline, String savePointPath, Map<String, String> variables) {
this.id = id;
this.isOnline = isOnline;
this.savePointPath = savePointPath;
this.variables = variables;
}

@ApiModelProperty(value = "ID", dataType = "Integer", example = "6", notes = "The identifier of the execution")
private Integer id;

@ApiModelProperty(
value = "Is online",
dataType = "Boolean",
example = "true",
notes = "Online dinky task, and only one job is allowed to execute")
private Boolean isOnline;

@ApiModelProperty(
value = "Save Point Path",
dataType = "String",
Expand Down
4 changes: 1 addition & 3 deletions dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,7 @@ private void aboutDolphinSchedulerInitOperation(Object v) {
project = projectClient.createDinkyProject();
}
} catch (Exception e) {
log.error("Error in DolphinScheduler: ", e);
log.error(
"get or create DolphinScheduler project failed, please check the config of DolphinScheduler!");
log.warn("Get or create DolphinScheduler project failed, please check the config of DolphinScheduler!");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,9 @@ public JdbcSelectResult query(String sql, Integer limit) {
} catch (Exception e) {
result.error(LogUtil.getError(e));
log.error("Query failed", e);
} finally {
close(preparedStatement, results);
}
close(preparedStatement, results);
result.setRowData(datas);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.scheduler.model;

import java.util.ArrayList;
import java.util.List;

import io.swagger.annotations.ApiModelProperty;
Expand All @@ -28,7 +29,7 @@
public class DinkyTaskParams {

@ApiModelProperty(value = "自定义参数")
private List<Property> localParams;
private List<Property> localParams = new ArrayList<>();

@ApiModelProperty(value = "dinky地址")
private String address;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ export const isOnline = (data: TaskDataType | undefined) => {
export const isCanPushDolphin = (data: TaskDataType | undefined) => {
return data
? JOB_LIFE_CYCLE.PUBLISH === data.step &&
!isSql(data?.dialect) &&
data?.dialect?.toLowerCase() !== DIALECT.FLINKSQLENV &&
data?.dialect?.toLowerCase() !== DIALECT.SCALA &&
data?.dialect?.toLowerCase() !== DIALECT.JAVA &&
Expand Down
4 changes: 1 addition & 3 deletions dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,7 @@ const HeaderContainer = (props: connect) => {
icon: isOnline(currentData) ? <MergeCellsOutlined /> : <FundOutlined />,
title: isOnline(currentData) ? l('button.offline') : l('button.publish'),
isShow:
(currentTab?.type == TabsPageType.project &&
currentTab?.subType?.toLowerCase() === DIALECT.FLINK_SQL) ||
currentTab?.subType?.toLowerCase() === DIALECT.FLINKJAR,
(currentTab?.type == TabsPageType.project),
click: () => handleChangeJobLife()
},
{
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-cdc-doris</artifactId>
<artifactId>dinky-cdc-plus</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand Down

0 comments on commit 7d5327f

Please sign in to comment.