Skip to content

Commit

Permalink
[DSIP-73] Add dolphinscheduler-task-executor module to unify the task…
Browse files Browse the repository at this point in the history
… execution logic
  • Loading branch information
ruanwenjun committed Nov 15, 2024
1 parent d07a2ed commit eddd4ee
Show file tree
Hide file tree
Showing 264 changed files with 5,482 additions and 7,266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.worker.IPhysicalTaskExecutorOperator;
import org.apache.dolphinscheduler.extract.worker.IStreamingTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillResponse;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillRequest;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillResponse;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -306,11 +306,11 @@ public Result stopTask(User loginUser, long projectCode, Integer taskInstanceId)
}

// todo: we only support streaming task for now
final TaskInstanceKillResponse taskInstanceKillResponse = Clients
.withService(ITaskInstanceOperator.class)
final TaskExecutorKillResponse taskExecutorKillResponse = Clients
.withService(IPhysicalTaskExecutorOperator.class)
.withHost(taskInstance.getHost())
.killTask(new TaskInstanceKillRequest(taskInstanceId));
log.info("TaskInstance kill response: {}", taskInstanceKillResponse);
.killTask(TaskExecutorKillRequest.of(taskInstanceId));
log.info("TaskInstance kill response: {}", taskExecutorKillResponse);

putMsg(result, Status.SUCCESS);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
Expand Down Expand Up @@ -131,11 +129,6 @@ public TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLog
return new TaskInstanceLogPageQueryResponse();
}

@Override
public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) {
return new GetAppIdResponse();
}

@Override
public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
@Slf4j
public class ThreadUtils {

/**
* Create a daemon fixed thread pool, the thread name will be formatted with the given name.
*
* @param threadNameFormat the thread name format, e.g. "DemonThread-%d"
* @param threadsNum the number of threads in the pool
*/
public static ThreadPoolExecutor newDaemonFixedThreadExecutor(String threadNameFormat, int threadsNum) {
return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, newDaemonThreadFactory(threadNameFormat));
}
Expand All @@ -43,9 +49,10 @@ public static ScheduledExecutorService newSingleDaemonScheduledExecutorService(S
* Create a daemon scheduler thread pool, the thread name will be formatted with the given name.
*
* @param threadNameFormat the thread name format, e.g. "DemonThread-%d"
* @param threadsNum the number of threads in the pool
* @param threadsNum the number of threads in the pool
*/
public static ScheduledExecutorService newDaemonScheduledExecutorService(String threadNameFormat, int threadsNum) {
public static ScheduledExecutorService newDaemonScheduledExecutorService(final String threadNameFormat,
final int threadsNum) {
return Executors.newScheduledThreadPool(threadsNum, newDaemonThreadFactory(threadNameFormat));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,45 @@
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

import lombok.Builder;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

/**
* The abstract class of delay event, the event will be triggered after the delay time.
* <p> You can extend this class to implement your own delay event.
*/
@ToString
@SuperBuilder
public abstract class AbstractDelayEvent implements IEvent, Delayed {

protected long delayTime;

protected long triggerTimeInMillis;
@Builder.Default
protected long createTimeInNano = System.nanoTime();

public AbstractDelayEvent() {
this(0);
}

public AbstractDelayEvent(long delayTime) {
if (delayTime == 0) {
this.triggerTimeInMillis = System.currentTimeMillis();
} else {
this.triggerTimeInMillis = System.currentTimeMillis() + delayTime;
}
public AbstractDelayEvent(final long delayTime) {
this(delayTime, System.nanoTime());
}

public AbstractDelayEvent(final long delayTime, final long createTimeInNano) {
this.delayTime = delayTime;
this.createTimeInNano = createTimeInNano;
}

@Override
public long getDelay(TimeUnit unit) {
long delay = triggerTimeInMillis - System.currentTimeMillis();
return unit.convert(delay, TimeUnit.MILLISECONDS);
long delay = createTimeInNano + delayTime * 1_000_000 - System.nanoTime();
return unit.convert(delay, TimeUnit.NANOSECONDS);
}

@Override
public int compareTo(Delayed other) {
return Long.compare(this.triggerTimeInMillis, ((AbstractDelayEvent) other).triggerTimeInMillis);
return Long.compare(this.createTimeInNano, ((AbstractDelayEvent) other).createTimeInNano);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ public Optional<T> poll() {
return Optional.ofNullable(delayEventQueue.poll());
}

@Override
public Optional<T> peek() {
return Optional.ofNullable(delayEventQueue.peek());
}

@Override
public Optional<T> remove() {
return Optional.ofNullable(delayEventQueue.remove());
}

@Override
public boolean isEmpty() {
return delayEventQueue.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ public interface IEventBus<T extends IEvent> {
*/
Optional<T> poll() throws InterruptedException;

/**
* peek the head event from the bus. This method will not block if the event bus is empty will return empty optional.
*/
Optional<T> peek();

/**
* Remove the head event from the bus. This method will not block if the event bus is empty will return empty optional.
*/
Optional<T> remove();

/**
* Whether the bus is empty.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,28 @@
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-meter</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,19 @@

import org.apache.dolphinscheduler.common.utils.JSONUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.TimeZone;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;

/**
* json serialize or deserialize
*/
@Slf4j
public class JsonSerializer {

Expand All @@ -60,13 +57,6 @@ private JsonSerializer() {

}

/**
* serialize to byte
*
* @param obj object
* @param <T> object type
* @return byte array
*/
public static <T> byte[] serialize(T obj) {
if (obj == null) {
return null;
Expand All @@ -79,44 +69,14 @@ public static <T> byte[] serialize(T obj) {
}
}

/**
* serialize to string
*
* @param obj object
* @param <T> object type
* @return string
*/
public static <T> String serializeToString(T obj) {
String json = "";
try {
json = objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
log.error("serializeToString exception!", e);
}

return json;
}

/**
* deserialize
*
* @param src byte array
* @param clazz class
* @param <T> deserialize type
* @return deserialize type
*/
@SneakyThrows
public static <T> T deserialize(byte[] src, Class<T> clazz) {
if (src == null) {
return null;
}

String json = new String(src, StandardCharsets.UTF_8);
try {
return objectMapper.readValue(json, clazz);
} catch (IOException e) {
log.error("deserialize exception!", e);
return null;
}
return objectMapper.readValue(json, clazz);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ class NettyRemotingServer {
this.serverConfig = serverConfig;
this.serverName = serverConfig.getServerName();
this.methodInvokerExecutor = ThreadUtils.newDaemonFixedThreadExecutor(
serverName + "MethodInvoker-%d", Runtime.getRuntime().availableProcessors() * 2 + 1);
serverName + "-methodInvoker-%d", Runtime.getRuntime().availableProcessors() * 2 + 1);
this.channelHandler = new JdkDynamicServerHandler(methodInvokerExecutor);
ThreadFactory bossThreadFactory =
ThreadUtils.newDaemonThreadFactory(serverName + "BossThread-%d");
ThreadUtils.newDaemonThreadFactory(serverName + "-boss-%d");
ThreadFactory workerThreadFactory =
ThreadUtils.newDaemonThreadFactory(serverName + "WorkerThread-%d");
ThreadUtils.newDaemonThreadFactory(serverName + "-worker-%d");
if (Epoll.isAvailable()) {
this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@
<version>dev-SNAPSHOT</version>
</parent>

<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-common</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-base</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
Expand All @@ -35,9 +33,6 @@ public interface ILogService {
@RpcMethod
TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest);

@RpcMethod
GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest);

@RpcMethod
void removeTaskInstanceLog(String taskInstanceLogAbsolutePath);

Expand Down
12 changes: 9 additions & 3 deletions dolphinscheduler-extract/dolphinscheduler-extract-master/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,27 @@
<artifactId>dolphinscheduler-extract-master</artifactId>

<dependencies>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-base</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-executor</artifactId>
</dependency>

</dependencies>

</project>
Loading

0 comments on commit eddd4ee

Please sign in to comment.