From 087048f2b2f06eae15e80e7ffa209610c9e3f173 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 12 Sep 2024 00:18:48 +0800 Subject: [PATCH] [Fix](Job)Replace BlockingWaitStrategy with LiteTimeoutBlockingWaitStrategy to avoid deadlock issues. (#40625) FYI https://issues.apache.org/jira/browse/LOG4J2-1221 - BlockingWaitStrategy is a wait strategy used in the Disruptor framework that blocks the thread when the ring buffer is full or not yet available for publishing. When threads are blocked, they are waiting for space in the ring buffer to become available, which can lead to potential deadlocks if not managed properly. Timeout Handling: - LiteTimeoutBlockingWaitStrategy provides a timeout for waiting threads. If the buffer is not ready within the timeout period, the thread is released, preventing it from being blocked indefinitely. Reduced Risk of Deadlocks: - By avoiding indefinite blocking, this strategy reduces the risk of deadlocks caused by threads waiting on each other. The timeout allows the system to handle scenarios where resources are temporarily --- .../doris/job/disruptor/TaskDisruptor.java | 15 ++++++++++--- .../job/executor/DispatchTaskHandler.java | 5 ++++- .../job/executor/TimerJobSchedulerTask.java | 5 ++++- .../manager/TaskDisruptorGroupManager.java | 22 +++++++++---------- .../doris/job/scheduler/JobScheduler.java | 13 ++++++----- .../scheduler/disruptor/TaskDisruptor.java | 10 ++++----- 6 files changed, 44 insertions(+), 26 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java index 45564e99b17829..6ca2924c593bc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java @@ -24,6 +24,8 @@ import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.concurrent.ThreadFactory; @@ -33,6 +35,7 @@ * @param the type of the event handled by the Disruptor */ public class TaskDisruptor { + private static final Logger LOG = LogManager.getLogger(TaskDisruptor.class); private final Disruptor disruptor; private final EventTranslatorVararg eventTranslator; @@ -68,9 +71,15 @@ public void start() { * * @param args the arguments for the event */ - public void publishEvent(Object... args) { - RingBuffer ringBuffer = disruptor.getRingBuffer(); - ringBuffer.publishEvent(eventTranslator, args); + public boolean publishEvent(Object... args) { + try { + RingBuffer ringBuffer = disruptor.getRingBuffer(); + return ringBuffer.tryPublishEvent(eventTranslator, args); + } catch (Exception e) { + LOG.warn("Failed to publish event", e); + // Handle the exception, e.g., retry or alert + } + return false; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java index e5933d133cbd61..d93393aa0ef89f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java @@ -66,7 +66,10 @@ public void onEvent(TimerJobEvent event) { } JobType jobType = event.getJob().getJobType(); for (AbstractTask task : tasks) { - disruptorMap.get(jobType).publishEvent(task, event.getJob().getJobConfig()); + if (!disruptorMap.get(jobType).publishEvent(task, event.getJob().getJobConfig())) { + task.cancel(); + continue; + } log.info("dispatch timer job success, job id is {}, task id is {}", event.getJob().getJobId(), task.getTaskId()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java index 74efe49beb11d0..25bbccf3fa2fa6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java @@ -44,7 +44,10 @@ public void run(Timeout timeout) { log.info("job status is not running, job id is {}, skip dispatch", this.job.getJobId()); return; } - dispatchDisruptor.publishEvent(this.job); + if (!dispatchDisruptor.publishEvent(this.job)) { + log.warn("dispatch timer job failed, job id is {}, job name is {}", + this.job.getJobId(), this.job.getJobName()); + } } catch (Exception e) { log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java index 4e31e467013f1c..b1ccb9764438c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java @@ -31,15 +31,16 @@ import org.apache.doris.job.extensions.mtmv.MTMVTask; import org.apache.doris.job.task.AbstractTask; -import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventTranslatorVararg; +import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy; import com.lmax.disruptor.WorkHandler; import lombok.Getter; import java.util.EnumMap; import java.util.Map; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; public class TaskDisruptorGroupManager { @@ -86,7 +87,8 @@ private void registerDispatchDisruptor() { (event, sequence, args) -> event.setJob((AbstractJob) args[0]); this.dispatchDisruptor = new TaskDisruptor<>(dispatchEventFactory, DISPATCH_TIMER_JOB_QUEUE_SIZE, dispatchThreadFactory, - new BlockingWaitStrategy(), dispatchTaskExecutorHandlers, eventTranslator); + new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS), + dispatchTaskExecutorHandlers, eventTranslator); } private void registerInsertDisruptor() { @@ -102,7 +104,8 @@ private void registerInsertDisruptor() { event.setJobConfig((JobExecutionConfiguration) args[1]); }; TaskDisruptor insertDisruptor = new TaskDisruptor<>(insertEventFactory, DISPATCH_INSERT_TASK_QUEUE_SIZE, - insertTaskThreadFactory, new BlockingWaitStrategy(), insertTaskExecutorHandlers, eventTranslator); + insertTaskThreadFactory, new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS), + insertTaskExecutorHandlers, eventTranslator); disruptorMap.put(JobType.INSERT, insertDisruptor); } @@ -119,17 +122,14 @@ private void registerMTMVDisruptor() { event.setJobConfig((JobExecutionConfiguration) args[1]); }; TaskDisruptor mtmvDisruptor = new TaskDisruptor<>(mtmvEventFactory, DISPATCH_MTMV_TASK_QUEUE_SIZE, - mtmvTaskThreadFactory, new BlockingWaitStrategy(), insertTaskExecutorHandlers, eventTranslator); + mtmvTaskThreadFactory, new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS), + insertTaskExecutorHandlers, eventTranslator); disruptorMap.put(JobType.MV, mtmvDisruptor); } - public void dispatchTimerJob(AbstractJob job) { - dispatchDisruptor.publishEvent(job); - } - - public void dispatchInstantTask(AbstractTask task, JobType jobType, - JobExecutionConfiguration jobExecutionConfiguration) { - disruptorMap.get(jobType).publishEvent(task, jobExecutionConfiguration); + public boolean dispatchInstantTask(AbstractTask task, JobType jobType, + JobExecutionConfiguration jobExecutionConfiguration) { + return disruptorMap.get(jobType).publishEvent(task, jobExecutionConfiguration); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 33d12c30a4b76f..862b85597cdc3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -155,7 +155,7 @@ private void cycleTimerJobScheduler(T job, long startTimeWindowMs) { } - public void schedulerInstantJob(T job, TaskType taskType, C context) { + public void schedulerInstantJob(T job, TaskType taskType, C context) throws JobException { List tasks = job.commonCreateTasks(taskType, context); if (CollectionUtils.isEmpty(tasks)) { log.info("job create task is empty, skip scheduler, job id is {}, job name is {}", job.getJobId(), @@ -165,12 +165,15 @@ public void schedulerInstantJob(T job, TaskType taskType, C context) { } return; } - tasks.forEach(task -> { - taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(), - job.getJobConfig()); + for (AbstractTask task : tasks) { + if (!taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(), + job.getJobConfig())) { + throw new JobException("dispatch instant task failed, job id is " + + job.getJobId() + ", task id is " + task.getTaskId()); + } log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(), job.getJobName(), task.getTaskId()); - }); + } } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java index 57df84a0e89146..345b31d6bc2537 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java @@ -21,8 +21,8 @@ import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.scheduler.constants.TaskType; -import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventTranslatorThreeArg; +import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy; import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.dsl.Disruptor; @@ -44,7 +44,7 @@ @Log4j2 public class TaskDisruptor implements Closeable { - private Disruptor disruptor; + private Disruptor disruptor; private static final int DEFAULT_RING_BUFFER_SIZE = Config.async_task_queen_size; private static final int consumerThreadCount = Config.async_task_consumer_thread_num; @@ -74,7 +74,7 @@ public class TaskDisruptor implements Closeable { public void start() { CustomThreadFactory exportTaskThreadFactory = new CustomThreadFactory("export-task-consumer"); disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, exportTaskThreadFactory, - ProducerType.SINGLE, new BlockingWaitStrategy()); + ProducerType.SINGLE, new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS)); WorkHandler[] workers = new TaskHandler[consumerThreadCount]; for (int i = 0; i < consumerThreadCount; i++) { workers[i] = new TaskHandler(); @@ -109,7 +109,7 @@ public void tryPublish(Long jobId, Long taskId, TaskType taskType) { try { disruptor.publishEvent(TRANSLATOR, jobId, taskId, taskType); } catch (Exception e) { - log.error("tryPublish failed, jobId: {}", jobId, e); + log.warn("tryPublish failed, jobId: {}", jobId, e); } } @@ -127,7 +127,7 @@ public void tryPublishTask(Long taskId) { try { disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TRANSIENT_TASK); } catch (Exception e) { - log.error("tryPublish failed, taskId: {}", taskId, e); + log.warn("tryPublish failed, taskId: {}", taskId, e); } }