Skip to content

Commit

Permalink
[Fix](Job)Replace BlockingWaitStrategy with LiteTimeoutBlockingWaitSt…
Browse files Browse the repository at this point in the history
…rategy to avoid deadlock issues. (#40625)

FYI https://issues.apache.org/jira/browse/LOG4J2-1221

- BlockingWaitStrategy is a wait strategy used in the Disruptor
framework that blocks the thread when the ring buffer is full or not yet
available for publishing.

When threads are blocked, they are waiting for space in the ring buffer
to become available, which can lead to potential deadlocks if not
managed properly.
Timeout Handling:

- LiteTimeoutBlockingWaitStrategy provides a timeout for waiting
threads. If the buffer is not ready within the timeout period, the
thread is released, preventing it from being blocked indefinitely.
Reduced Risk of Deadlocks:

- By avoiding indefinite blocking, this strategy reduces the risk of
deadlocks caused by threads waiting on each other. The timeout allows
the system to handle scenarios where resources are temporarily
  • Loading branch information
CalvinKirs authored Sep 11, 2024
1 parent 55bde8c commit 087048f
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.ThreadFactory;

Expand All @@ -33,6 +35,7 @@
* @param <T> the type of the event handled by the Disruptor
*/
public class TaskDisruptor<T> {
private static final Logger LOG = LogManager.getLogger(TaskDisruptor.class);
private final Disruptor<T> disruptor;
private final EventTranslatorVararg<T> eventTranslator;

Expand Down Expand Up @@ -68,9 +71,15 @@ public void start() {
*
* @param args the arguments for the event
*/
public void publishEvent(Object... args) {
RingBuffer<T> ringBuffer = disruptor.getRingBuffer();
ringBuffer.publishEvent(eventTranslator, args);
public boolean publishEvent(Object... args) {
try {
RingBuffer<T> ringBuffer = disruptor.getRingBuffer();
return ringBuffer.tryPublishEvent(eventTranslator, args);
} catch (Exception e) {
LOG.warn("Failed to publish event", e);
// Handle the exception, e.g., retry or alert
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ public void onEvent(TimerJobEvent<T> event) {
}
JobType jobType = event.getJob().getJobType();
for (AbstractTask task : tasks) {
disruptorMap.get(jobType).publishEvent(task, event.getJob().getJobConfig());
if (!disruptorMap.get(jobType).publishEvent(task, event.getJob().getJobConfig())) {
task.cancel();
continue;
}
log.info("dispatch timer job success, job id is {}, task id is {}",
event.getJob().getJobId(), task.getTaskId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ public void run(Timeout timeout) {
log.info("job status is not running, job id is {}, skip dispatch", this.job.getJobId());
return;
}
dispatchDisruptor.publishEvent(this.job);
if (!dispatchDisruptor.publishEvent(this.job)) {
log.warn("dispatch timer job failed, job id is {}, job name is {}",
this.job.getJobId(), this.job.getJobName());
}
} catch (Exception e) {
log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.job.task.AbstractTask;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventTranslatorVararg;
import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy;
import com.lmax.disruptor.WorkHandler;
import lombok.Getter;

import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class TaskDisruptorGroupManager<T extends AbstractTask> {

Expand Down Expand Up @@ -86,7 +87,8 @@ private void registerDispatchDisruptor() {
(event, sequence, args) -> event.setJob((AbstractJob) args[0]);
this.dispatchDisruptor = new TaskDisruptor<>(dispatchEventFactory, DISPATCH_TIMER_JOB_QUEUE_SIZE,
dispatchThreadFactory,
new BlockingWaitStrategy(), dispatchTaskExecutorHandlers, eventTranslator);
new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS),
dispatchTaskExecutorHandlers, eventTranslator);
}

private void registerInsertDisruptor() {
Expand All @@ -102,7 +104,8 @@ private void registerInsertDisruptor() {
event.setJobConfig((JobExecutionConfiguration) args[1]);
};
TaskDisruptor insertDisruptor = new TaskDisruptor<>(insertEventFactory, DISPATCH_INSERT_TASK_QUEUE_SIZE,
insertTaskThreadFactory, new BlockingWaitStrategy(), insertTaskExecutorHandlers, eventTranslator);
insertTaskThreadFactory, new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS),
insertTaskExecutorHandlers, eventTranslator);
disruptorMap.put(JobType.INSERT, insertDisruptor);
}

Expand All @@ -119,17 +122,14 @@ private void registerMTMVDisruptor() {
event.setJobConfig((JobExecutionConfiguration) args[1]);
};
TaskDisruptor mtmvDisruptor = new TaskDisruptor<>(mtmvEventFactory, DISPATCH_MTMV_TASK_QUEUE_SIZE,
mtmvTaskThreadFactory, new BlockingWaitStrategy(), insertTaskExecutorHandlers, eventTranslator);
mtmvTaskThreadFactory, new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS),
insertTaskExecutorHandlers, eventTranslator);
disruptorMap.put(JobType.MV, mtmvDisruptor);
}

public void dispatchTimerJob(AbstractJob job) {
dispatchDisruptor.publishEvent(job);
}

public void dispatchInstantTask(AbstractTask task, JobType jobType,
JobExecutionConfiguration jobExecutionConfiguration) {
disruptorMap.get(jobType).publishEvent(task, jobExecutionConfiguration);
public boolean dispatchInstantTask(AbstractTask task, JobType jobType,
JobExecutionConfiguration jobExecutionConfiguration) {
return disruptorMap.get(jobType).publishEvent(task, jobExecutionConfiguration);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private void cycleTimerJobScheduler(T job, long startTimeWindowMs) {
}


public void schedulerInstantJob(T job, TaskType taskType, C context) {
public void schedulerInstantJob(T job, TaskType taskType, C context) throws JobException {
List<? extends AbstractTask> tasks = job.commonCreateTasks(taskType, context);
if (CollectionUtils.isEmpty(tasks)) {
log.info("job create task is empty, skip scheduler, job id is {}, job name is {}", job.getJobId(),
Expand All @@ -165,12 +165,15 @@ public void schedulerInstantJob(T job, TaskType taskType, C context) {
}
return;
}
tasks.forEach(task -> {
taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(),
job.getJobConfig());
for (AbstractTask task : tasks) {
if (!taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(),
job.getJobConfig())) {
throw new JobException("dispatch instant task failed, job id is "
+ job.getJobId() + ", task id is " + task.getTaskId());
}
log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(),
job.getJobName(), task.getTaskId());
});
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.scheduler.constants.TaskType;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventTranslatorThreeArg;
import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
Expand All @@ -44,7 +44,7 @@
@Log4j2
public class TaskDisruptor implements Closeable {

private Disruptor<TaskEvent> disruptor;
private Disruptor<TaskEvent> disruptor;
private static final int DEFAULT_RING_BUFFER_SIZE = Config.async_task_queen_size;

private static final int consumerThreadCount = Config.async_task_consumer_thread_num;
Expand Down Expand Up @@ -74,7 +74,7 @@ public class TaskDisruptor implements Closeable {
public void start() {
CustomThreadFactory exportTaskThreadFactory = new CustomThreadFactory("export-task-consumer");
disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, exportTaskThreadFactory,
ProducerType.SINGLE, new BlockingWaitStrategy());
ProducerType.SINGLE, new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS));
WorkHandler<TaskEvent>[] workers = new TaskHandler[consumerThreadCount];
for (int i = 0; i < consumerThreadCount; i++) {
workers[i] = new TaskHandler();
Expand Down Expand Up @@ -109,7 +109,7 @@ public void tryPublish(Long jobId, Long taskId, TaskType taskType) {
try {
disruptor.publishEvent(TRANSLATOR, jobId, taskId, taskType);
} catch (Exception e) {
log.error("tryPublish failed, jobId: {}", jobId, e);
log.warn("tryPublish failed, jobId: {}", jobId, e);
}
}

Expand All @@ -127,7 +127,7 @@ public void tryPublishTask(Long taskId) {
try {
disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TRANSIENT_TASK);
} catch (Exception e) {
log.error("tryPublish failed, taskId: {}", taskId, e);
log.warn("tryPublish failed, taskId: {}", taskId, e);
}
}

Expand Down

0 comments on commit 087048f

Please sign in to comment.