diff --git a/modules/flowable-engine/src/test/java/org/flowable/engine/test/bpmn/async/ParallelMultiInstanceAsyncNonExclusiveTest.java b/modules/flowable-engine/src/test/java/org/flowable/engine/test/bpmn/async/ParallelMultiInstanceAsyncNonExclusiveTest.java new file mode 100644 index 00000000000..e0ceec13e22 --- /dev/null +++ b/modules/flowable-engine/src/test/java/org/flowable/engine/test/bpmn/async/ParallelMultiInstanceAsyncNonExclusiveTest.java @@ -0,0 +1,191 @@ +/* Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.flowable.engine.test.bpmn.async; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType; +import org.flowable.common.engine.api.delegate.event.FlowableEntityEvent; +import org.flowable.common.engine.api.delegate.event.FlowableEvent; +import org.flowable.common.engine.api.delegate.event.FlowableEventListener; +import org.flowable.common.engine.impl.interceptor.Command; +import org.flowable.common.engine.impl.interceptor.CommandConfig; +import org.flowable.common.engine.impl.interceptor.CommandExecutor; +import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.flowable.engine.impl.interceptor.CommandInvoker; +import org.flowable.engine.impl.jobexecutor.AsyncContinuationJobHandler; +import org.flowable.engine.impl.jobexecutor.ParallelMultiInstanceActivityCompletionJobHandler; +import org.flowable.engine.runtime.ProcessInstance; +import org.flowable.engine.test.Deployment; +import org.flowable.engine.test.impl.CustomConfigurationFlowableTestCase; +import org.flowable.job.api.FlowableUnrecoverableJobException; +import org.flowable.job.api.Job; +import org.flowable.job.service.impl.cmd.LockExclusiveJobCmd; +import org.flowable.job.service.impl.persistence.entity.JobEntity; +import org.junit.jupiter.api.Test; + +/** + * @author Filip Hrisafov + */ +class ParallelMultiInstanceAsyncNonExclusiveTest extends CustomConfigurationFlowableTestCase { + + protected CustomCommandInvoker customCommandInvoker; + protected CustomEventListener customEventListener; + protected CollectingAsyncRunnableExecutionExceptionHandler executionExceptionHandler; + + public ParallelMultiInstanceAsyncNonExclusiveTest() { + super("parallelMultiInstanceAsyncNonExclusiveTest"); + } + + @Override + protected void configureConfiguration(ProcessEngineConfigurationImpl processEngineConfiguration) { + customCommandInvoker = new CustomCommandInvoker(); + processEngineConfiguration.setCommandInvoker(customCommandInvoker); + processEngineConfiguration.getAsyncExecutorConfiguration().setGlobalAcquireLockEnabled(true); + executionExceptionHandler = new CollectingAsyncRunnableExecutionExceptionHandler(); + processEngineConfiguration.setCustomAsyncRunnableExecutionExceptionHandlers(Collections.singletonList(executionExceptionHandler)); + customEventListener = new CustomEventListener(); + processEngineConfiguration.setEventListeners(Collections.singletonList(customEventListener)); + + } + + @Test + @Deployment + public void parallelMultiInstanceNonExclusiveJobs() { + // This test is trying to cause an optimistic locking exception when using non-exclusive parallel multi instance jobs. + // This is mimicking the following scenario: + // 4 async jobs complete in the same time, and thus they create 4 parallel-multi-instance-complete exclusive jobs + // 3 of those jobs will fail to get the exclusive lock and unacquire their jobs and 1 will get the lock + // the one that will get the lock will continue to the next step of the process and perform the multi instance cleanup + // the cleanup of the multi instance should not fail. + + ProcessInstance processInstance = runtimeService.createProcessInstanceBuilder() + .processDefinitionKey("parallelScriptTask") + .start(); + + List jobs = managementService.createJobQuery().list(); + assertThat(jobs).hasSize(4); + assertThat(jobs) + .extracting(Job::getJobHandlerType) + .containsOnly(AsyncContinuationJobHandler.TYPE); + customCommandInvoker.lockExclusiveCounter = new AtomicLong(0L); + + customCommandInvoker.executeLockReleaseLatch = new CountDownLatch(1); + customEventListener.parallelMultiInstanceCompleteLatch = customCommandInvoker.executeLockReleaseLatch; + + customCommandInvoker.executeAsyncRunnableLatch = new CountDownLatch(4); + customEventListener.asyncContinuationLatch = new CountDownLatch(4); + + customCommandInvoker.executeLockCountLatch = new CountDownLatch(3); + customEventListener.parallelMultiInstanceWaitCompleteLatch = customCommandInvoker.executeLockCountLatch; + + waitForJobExecutorToProcessAllJobs(15_000, 200); + + assertThat(executionExceptionHandler.getExceptions()).isEmpty(); + assertThat(managementService.createJobQuery().processInstanceId(processInstance.getId()).list()).isEmpty(); + assertThat(managementService.createDeadLetterJobQuery().processInstanceId(processInstance.getId()).list()).isEmpty(); + } + + protected static class CustomCommandInvoker extends CommandInvoker { + + protected AtomicLong lockExclusiveCounter = new AtomicLong(); + protected CountDownLatch executeLockCountLatch; + protected CountDownLatch executeLockReleaseLatch; + protected CountDownLatch executeAsyncRunnableLatch; + + protected CustomCommandInvoker() { + super(((commandContext, runnable) -> runnable.run()), null); + } + + @Override + public T execute(CommandConfig config, Command command, CommandExecutor commandExecutor) { + if (command instanceof LockExclusiveJobCmd) { + if (lockExclusiveCounter.incrementAndGet() > 1) { + // We let the first exclusive to run without waiting + // we then wait to complete this transaction until the execute lock exclusive is released + try { + executeLockCountLatch.countDown(); + executeLockReleaseLatch.await(4, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } + return super.execute(config, command, commandExecutor); + } + } + + protected static class CustomEventListener implements FlowableEventListener { + + protected CountDownLatch asyncContinuationLatch; + protected CountDownLatch parallelMultiInstanceCompleteLatch; + protected CountDownLatch parallelMultiInstanceWaitCompleteLatch; + + @Override + public void onEvent(FlowableEvent event) { + if (FlowableEngineEventType.JOB_EXECUTION_SUCCESS.equals(event.getType()) && event instanceof FlowableEntityEvent) { + JobEntity entity = (JobEntity) ((FlowableEntityEvent) event).getEntity(); + String jobHandlerType = entity.getJobHandlerType(); + if (AsyncContinuationJobHandler.TYPE.equals(jobHandlerType)) { + // We are going to wait for all the async jobs to complete in the same time + asyncContinuationLatch.countDown(); + try { + if (!asyncContinuationLatch.await(4, TimeUnit.SECONDS)) { + throw new FlowableUnrecoverableJobException("asyncContinuationLatch did not reach 0"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } else if (ParallelMultiInstanceActivityCompletionJobHandler.TYPE.equals(jobHandlerType)) { + // There will be one multi instance complete job, so we count it down to release the rest of the lock exclusive commands + parallelMultiInstanceCompleteLatch.countDown(); + + try { + // Wait for the rest of the lock exclusive commands to complete before resuming this transaction + if (!parallelMultiInstanceWaitCompleteLatch.await(4, TimeUnit.SECONDS)) { + throw new FlowableUnrecoverableJobException("parallelMultiInstanceWaitLatch did not reach 0"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + } + + } + + @Override + public boolean isFailOnException() { + return true; + } + + @Override + public boolean isFireOnTransactionLifecycleEvent() { + return false; + } + + @Override + public String getOnTransaction() { + return null; + } + } +} diff --git a/modules/flowable-engine/src/test/java/org/flowable/engine/test/jobexecutor/ResetExpiredJobsTest.java b/modules/flowable-engine/src/test/java/org/flowable/engine/test/jobexecutor/ResetExpiredJobsTest.java index 82213f805ed..5d88d1c3f7e 100644 --- a/modules/flowable-engine/src/test/java/org/flowable/engine/test/jobexecutor/ResetExpiredJobsTest.java +++ b/modules/flowable-engine/src/test/java/org/flowable/engine/test/jobexecutor/ResetExpiredJobsTest.java @@ -150,8 +150,11 @@ public void testResetExpiredJobTimeout() { managementService.executeCommand(new ResetExpiredJobsCmd(jobIds, jobServiceConfiguration.getJobEntityManager(), jobServiceConfiguration)); assertThat(managementService.executeCommand(new FindExpiredJobsCmd(expiredJobsPagesSize, jobServiceConfiguration.getJobEntityManager(), jobServiceConfiguration))).isEmpty(); - assertThat(managementService.createJobQuery().jobId(job.getId()).singleResult()).isNull(); - assertThat(managementService.createJobQuery().singleResult()).isNotNull(); + JobEntity jobAfterExpiry = (JobEntity) managementService.createJobQuery().singleResult(); + assertThat(jobAfterExpiry).isNotNull(); + assertThat(jobAfterExpiry.getId()).isEqualTo(job.getId()); + assertThat(jobAfterExpiry.getLockExpirationTime()).isNull(); + assertThat(jobAfterExpiry.getLockOwner()).isNull(); } @Test diff --git a/modules/flowable-engine/src/test/resources/org/flowable/engine/test/bpmn/async/ParallelMultiInstanceAsyncNonExclusiveTest.parallelMultiInstanceNonExclusiveJobs.bpmn20.xml b/modules/flowable-engine/src/test/resources/org/flowable/engine/test/bpmn/async/ParallelMultiInstanceAsyncNonExclusiveTest.parallelMultiInstanceNonExclusiveJobs.bpmn20.xml new file mode 100755 index 00000000000..2b29d7dd6c4 --- /dev/null +++ b/modules/flowable-engine/src/test/resources/org/flowable/engine/test/bpmn/async/ParallelMultiInstanceAsyncNonExclusiveTest.parallelMultiInstanceNonExclusiveJobs.bpmn20.xml @@ -0,0 +1,91 @@ + + + + + + + + + + + + + + + + 4 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/JobServiceImpl.java b/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/JobServiceImpl.java index 4bb2ebeb9f5..940a301fc99 100644 --- a/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/JobServiceImpl.java +++ b/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/JobServiceImpl.java @@ -17,11 +17,14 @@ import org.flowable.common.engine.api.FlowableIllegalArgumentException; import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType; +import org.flowable.common.engine.api.delegate.event.FlowableEventDispatcher; +import org.flowable.common.engine.impl.persistence.entity.ByteArrayRef; import org.flowable.job.api.DeadLetterJobQuery; import org.flowable.job.api.HistoryJobQuery; import org.flowable.job.api.JobQuery; import org.flowable.job.api.SuspendedJobQuery; import org.flowable.job.api.TimerJobQuery; +import org.flowable.job.service.InternalJobManager; import org.flowable.job.service.JobService; import org.flowable.job.service.JobServiceConfiguration; import org.flowable.job.service.event.impl.FlowableJobEventBuilder; @@ -200,13 +203,30 @@ public void deleteJob(JobEntity job) { public void deleteJobsByExecutionId(String executionId) { JobEntityManager jobEntityManager = getJobEntityManager(); Collection jobsForExecution = jobEntityManager.findJobsByExecutionId(executionId); + if (jobsForExecution.isEmpty()) { + return; + } + + InternalJobManager internalJobManager = configuration.getInternalJobManager(); + FlowableEventDispatcher eventDispatcher = getEventDispatcher(); + boolean eventDispatcherEnabled = eventDispatcher != null && eventDispatcher.isEnabled(); for (JobEntity job : jobsForExecution) { - getJobEntityManager().delete(job); - if (getEventDispatcher() != null && getEventDispatcher().isEnabled()) { - getEventDispatcher().dispatchEvent(FlowableJobEventBuilder.createEntityEvent( + if (internalJobManager != null) { + internalJobManager.handleJobDelete(job); + } + + deleteByteArrayRef(job.getExceptionByteArrayRef()); + deleteByteArrayRef(job.getCustomValuesByteArrayRef()); + + if (eventDispatcherEnabled) { + eventDispatcher.dispatchEvent(FlowableJobEventBuilder.createEntityEvent( + FlowableEngineEventType.ENTITY_DELETED, job), configuration.getEngineName()); + eventDispatcher.dispatchEvent(FlowableJobEventBuilder.createEntityEvent( FlowableEngineEventType.JOB_CANCELED, job), configuration.getEngineName()); } } + + jobEntityManager.deleteJobsByExecutionId(executionId); } @Override @@ -235,4 +255,10 @@ public void deleteDeadLetterJobsByExecutionId(String executionId) { } } + protected void deleteByteArrayRef(ByteArrayRef jobByteArrayRef) { + if (jobByteArrayRef != null) { + jobByteArrayRef.delete(configuration.getEngineName()); + } + } + } diff --git a/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/asyncexecutor/DefaultJobManager.java b/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/asyncexecutor/DefaultJobManager.java index d28079e3921..03ec8db8ea7 100644 --- a/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/asyncexecutor/DefaultJobManager.java +++ b/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/asyncexecutor/DefaultJobManager.java @@ -350,20 +350,11 @@ public void unacquire(JobInfo job) { return; } - HistoryJobEntity newJobEntity = jobServiceConfiguration.getHistoryJobEntityManager().create(); - copyHistoryJobInfo(newJobEntity, jobEntity); - newJobEntity.setId(null); // We want a new id to be assigned to this job - newJobEntity.setLockExpirationTime(null); - newJobEntity.setLockOwner(null); - jobServiceConfiguration.getHistoryJobEntityManager().insert(newJobEntity); - jobServiceConfiguration.getHistoryJobEntityManager().deleteNoCascade(jobEntity); + jobEntity.setLockExpirationTime(null); + jobEntity.setLockOwner(null); } else if (job instanceof JobEntity) { - // Deleting the old job and inserting it again with another id, - // will avoid that the job is immediately is picked up again (for example - // when doing lots of exclusive jobs for the same process instance) - JobEntity jobEntity = jobServiceConfiguration.getJobEntityManager().findById(job.getId()); if (jobEntity == null) { @@ -372,13 +363,8 @@ public void unacquire(JobInfo job) { return; } - JobEntity newJobEntity = jobServiceConfiguration.getJobEntityManager().create(); - copyJobInfo(newJobEntity, jobEntity); - newJobEntity.setId(null); // We want a new id to be assigned to this job - newJobEntity.setLockExpirationTime(null); - newJobEntity.setLockOwner(null); - jobServiceConfiguration.getJobEntityManager().insert(newJobEntity); - jobServiceConfiguration.getJobEntityManager().delete(jobEntity.getId()); + jobEntity.setLockExpirationTime(null); + jobEntity.setLockOwner(null); // We're not calling triggerExecutorIfNeeded here after the insert. The unacquire happened // for a reason (eg queue full or exclusive lock failure). No need to try it immediately again, @@ -393,13 +379,8 @@ public void unacquire(JobInfo job) { return; } - ExternalWorkerJobEntity newJobEntity = jobServiceConfiguration.getExternalWorkerJobEntityManager().create(); - copyJobInfo(newJobEntity, jobEntity); - newJobEntity.setId(null); // We want a new id to be assigned to this job - newJobEntity.setLockExpirationTime(null); - newJobEntity.setLockOwner(null); - jobServiceConfiguration.getExternalWorkerJobEntityManager().insert(newJobEntity); - jobServiceConfiguration.getExternalWorkerJobEntityManager().delete(jobEntity.getId()); + jobEntity.setLockExpirationTime(null); + jobEntity.setLockOwner(null); } else if (job instanceof TimerJobEntity) { jobServiceConfiguration.getTimerJobEntityManager().resetExpiredJob(job.getId()); } else { diff --git a/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/persistence/entity/JobEntityManager.java b/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/persistence/entity/JobEntityManager.java index 6f41cbd3004..202159445d3 100644 --- a/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/persistence/entity/JobEntityManager.java +++ b/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/persistence/entity/JobEntityManager.java @@ -46,4 +46,6 @@ public interface JobEntityManager extends EntityManager, JobInfoEntit */ long findJobCountByQueryCriteria(JobQueryImpl jobQuery); + + void deleteJobsByExecutionId(String executionId); } diff --git a/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/persistence/entity/JobEntityManagerImpl.java b/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/persistence/entity/JobEntityManagerImpl.java index 70a53b46857..3ef3918b6ff 100644 --- a/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/persistence/entity/JobEntityManagerImpl.java +++ b/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/persistence/entity/JobEntityManagerImpl.java @@ -101,4 +101,8 @@ public void delete(JobEntity entity, boolean fireDeleteEvent) { super.delete(entity, fireDeleteEvent); } + @Override + public void deleteJobsByExecutionId(String executionId) { + dataManager.deleteJobsByExecutionId(executionId); + } } diff --git a/modules/flowable-job-service/src/main/resources/org/flowable/job/service/db/mapping/entity/Job.xml b/modules/flowable-job-service/src/main/resources/org/flowable/job/service/db/mapping/entity/Job.xml index ff9316f6ae9..3b10cc9a9ca 100755 --- a/modules/flowable-job-service/src/main/resources/org/flowable/job/service/db/mapping/entity/Job.xml +++ b/modules/flowable-job-service/src/main/resources/org/flowable/job/service/db/mapping/entity/Job.xml @@ -251,7 +251,7 @@ - delete from ${prefix}ACT_RU_JOB where PROCESS_INSTANCE_ID_ = #{id} + delete from ${prefix}ACT_RU_JOB where EXECUTION_ID_ = #{id}