diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java index 45564e99b17829..6ca2924c593bc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java @@ -24,6 +24,8 @@ import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.concurrent.ThreadFactory; @@ -33,6 +35,7 @@ * @param the type of the event handled by the Disruptor */ public class TaskDisruptor { + private static final Logger LOG = LogManager.getLogger(TaskDisruptor.class); private final Disruptor disruptor; private final EventTranslatorVararg eventTranslator; @@ -68,9 +71,15 @@ public void start() { * * @param args the arguments for the event */ - public void publishEvent(Object... args) { - RingBuffer ringBuffer = disruptor.getRingBuffer(); - ringBuffer.publishEvent(eventTranslator, args); + public boolean publishEvent(Object... args) { + try { + RingBuffer ringBuffer = disruptor.getRingBuffer(); + return ringBuffer.tryPublishEvent(eventTranslator, args); + } catch (Exception e) { + LOG.warn("Failed to publish event", e); + // Handle the exception, e.g., retry or alert + } + return false; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java index e5933d133cbd61..d93393aa0ef89f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java @@ -66,7 +66,10 @@ public void onEvent(TimerJobEvent event) { } JobType jobType = event.getJob().getJobType(); for (AbstractTask task : tasks) { - disruptorMap.get(jobType).publishEvent(task, event.getJob().getJobConfig()); + if (!disruptorMap.get(jobType).publishEvent(task, event.getJob().getJobConfig())) { + task.cancel(); + continue; + } log.info("dispatch timer job success, job id is {}, task id is {}", event.getJob().getJobId(), task.getTaskId()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java index 74efe49beb11d0..25bbccf3fa2fa6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java @@ -44,7 +44,10 @@ public void run(Timeout timeout) { log.info("job status is not running, job id is {}, skip dispatch", this.job.getJobId()); return; } - dispatchDisruptor.publishEvent(this.job); + if (!dispatchDisruptor.publishEvent(this.job)) { + log.warn("dispatch timer job failed, job id is {}, job name is {}", + this.job.getJobId(), this.job.getJobName()); + } } catch (Exception e) { log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java index 4e31e467013f1c..b1ccb9764438c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java @@ -31,15 +31,16 @@ import org.apache.doris.job.extensions.mtmv.MTMVTask; import org.apache.doris.job.task.AbstractTask; -import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventTranslatorVararg; +import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy; import com.lmax.disruptor.WorkHandler; import lombok.Getter; import java.util.EnumMap; import java.util.Map; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; public class TaskDisruptorGroupManager { @@ -86,7 +87,8 @@ private void registerDispatchDisruptor() { (event, sequence, args) -> event.setJob((AbstractJob) args[0]); this.dispatchDisruptor = new TaskDisruptor<>(dispatchEventFactory, DISPATCH_TIMER_JOB_QUEUE_SIZE, dispatchThreadFactory, - new BlockingWaitStrategy(), dispatchTaskExecutorHandlers, eventTranslator); + new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS), + dispatchTaskExecutorHandlers, eventTranslator); } private void registerInsertDisruptor() { @@ -102,7 +104,8 @@ private void registerInsertDisruptor() { event.setJobConfig((JobExecutionConfiguration) args[1]); }; TaskDisruptor insertDisruptor = new TaskDisruptor<>(insertEventFactory, DISPATCH_INSERT_TASK_QUEUE_SIZE, - insertTaskThreadFactory, new BlockingWaitStrategy(), insertTaskExecutorHandlers, eventTranslator); + insertTaskThreadFactory, new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS), + insertTaskExecutorHandlers, eventTranslator); disruptorMap.put(JobType.INSERT, insertDisruptor); } @@ -119,17 +122,14 @@ private void registerMTMVDisruptor() { event.setJobConfig((JobExecutionConfiguration) args[1]); }; TaskDisruptor mtmvDisruptor = new TaskDisruptor<>(mtmvEventFactory, DISPATCH_MTMV_TASK_QUEUE_SIZE, - mtmvTaskThreadFactory, new BlockingWaitStrategy(), insertTaskExecutorHandlers, eventTranslator); + mtmvTaskThreadFactory, new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS), + insertTaskExecutorHandlers, eventTranslator); disruptorMap.put(JobType.MV, mtmvDisruptor); } - public void dispatchTimerJob(AbstractJob job) { - dispatchDisruptor.publishEvent(job); - } - - public void dispatchInstantTask(AbstractTask task, JobType jobType, - JobExecutionConfiguration jobExecutionConfiguration) { - disruptorMap.get(jobType).publishEvent(task, jobExecutionConfiguration); + public boolean dispatchInstantTask(AbstractTask task, JobType jobType, + JobExecutionConfiguration jobExecutionConfiguration) { + return disruptorMap.get(jobType).publishEvent(task, jobExecutionConfiguration); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 33d12c30a4b76f..862b85597cdc3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -155,7 +155,7 @@ private void cycleTimerJobScheduler(T job, long startTimeWindowMs) { } - public void schedulerInstantJob(T job, TaskType taskType, C context) { + public void schedulerInstantJob(T job, TaskType taskType, C context) throws JobException { List tasks = job.commonCreateTasks(taskType, context); if (CollectionUtils.isEmpty(tasks)) { log.info("job create task is empty, skip scheduler, job id is {}, job name is {}", job.getJobId(), @@ -165,12 +165,15 @@ public void schedulerInstantJob(T job, TaskType taskType, C context) { } return; } - tasks.forEach(task -> { - taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(), - job.getJobConfig()); + for (AbstractTask task : tasks) { + if (!taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(), + job.getJobConfig())) { + throw new JobException("dispatch instant task failed, job id is " + + job.getJobId() + ", task id is " + task.getTaskId()); + } log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(), job.getJobName(), task.getTaskId()); - }); + } } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java index 57df84a0e89146..345b31d6bc2537 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java @@ -21,8 +21,8 @@ import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.scheduler.constants.TaskType; -import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventTranslatorThreeArg; +import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy; import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.dsl.Disruptor; @@ -44,7 +44,7 @@ @Log4j2 public class TaskDisruptor implements Closeable { - private Disruptor disruptor; + private Disruptor disruptor; private static final int DEFAULT_RING_BUFFER_SIZE = Config.async_task_queen_size; private static final int consumerThreadCount = Config.async_task_consumer_thread_num; @@ -74,7 +74,7 @@ public class TaskDisruptor implements Closeable { public void start() { CustomThreadFactory exportTaskThreadFactory = new CustomThreadFactory("export-task-consumer"); disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, exportTaskThreadFactory, - ProducerType.SINGLE, new BlockingWaitStrategy()); + ProducerType.SINGLE, new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS)); WorkHandler[] workers = new TaskHandler[consumerThreadCount]; for (int i = 0; i < consumerThreadCount; i++) { workers[i] = new TaskHandler(); @@ -109,7 +109,7 @@ public void tryPublish(Long jobId, Long taskId, TaskType taskType) { try { disruptor.publishEvent(TRANSLATOR, jobId, taskId, taskType); } catch (Exception e) { - log.error("tryPublish failed, jobId: {}", jobId, e); + log.warn("tryPublish failed, jobId: {}", jobId, e); } } @@ -127,7 +127,7 @@ public void tryPublishTask(Long taskId) { try { disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TRANSIENT_TASK); } catch (Exception e) { - log.error("tryPublish failed, taskId: {}", taskId, e); + log.warn("tryPublish failed, taskId: {}", taskId, e); } }