Skip to content

Commit

Permalink
Fix issue with event subscription triggering a plan item instance in …
Browse files Browse the repository at this point in the history
…unavailable state
  • Loading branch information
tijsrademakers committed Aug 31, 2023
1 parent 7bd4ee8 commit e936552
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import org.flowable.cmmn.api.repository.CaseDefinition;
import org.flowable.cmmn.api.runtime.CaseInstanceBuilder;
import org.flowable.cmmn.api.runtime.CaseInstanceQuery;
import org.flowable.cmmn.api.runtime.PlanItemInstanceState;
import org.flowable.cmmn.converter.CmmnXmlConstants;
import org.flowable.cmmn.engine.CmmnEngineConfiguration;
import org.flowable.cmmn.engine.impl.persistence.entity.PlanItemInstanceEntity;
import org.flowable.cmmn.model.CmmnModel;
import org.flowable.cmmn.model.EventListener;
import org.flowable.cmmn.model.ExtensionElement;
import org.flowable.cmmn.model.PlanItem;
import org.flowable.common.engine.api.constant.ReferenceTypes;
import org.flowable.common.engine.api.scope.ScopeTypes;
import org.flowable.common.engine.impl.lock.LockManager;
Expand Down Expand Up @@ -76,23 +80,37 @@ protected EventRegistryProcessingInfo eventReceived(EventInstance eventInstance)
for (EventSubscription eventSubscription : eventSubscriptions) {
EventConsumerInfo eventConsumerInfo = new EventConsumerInfo(eventSubscription.getId(), eventSubscription.getSubScopeId(),
eventSubscription.getScopeDefinitionId(), ScopeTypes.CMMN);
handleEventSubscription(cmmnRuntimeService, eventSubscription, eventInstance, correlationKeys, eventConsumerInfo);
eventRegistryProcessingInfo.addEventConsumerInfo(eventConsumerInfo);
boolean eventSubscriptionHandled = handleEventSubscription(cmmnRuntimeService, eventSubscription, eventInstance, correlationKeys, eventConsumerInfo);

if (eventSubscriptionHandled) {
eventRegistryProcessingInfo.addEventConsumerInfo(eventConsumerInfo);
}
}

return eventRegistryProcessingInfo;
}

protected void handleEventSubscription(CmmnRuntimeService cmmnRuntimeService, EventSubscription eventSubscription,
protected boolean handleEventSubscription(CmmnRuntimeService cmmnRuntimeService, EventSubscription eventSubscription,
EventInstance eventInstance, Collection<CorrelationKey> correlationKeys, EventConsumerInfo eventConsumerInfo) {

if (eventSubscription.getSubScopeId() != null) {

// When a subscope id is set, this means that a plan item instance is waiting for the event

cmmnRuntimeService.createPlanItemInstanceTransitionBuilder(eventSubscription.getSubScopeId())
.transientVariable(EventConstants.EVENT_INSTANCE, eventInstance)
.trigger();
PlanItemInstanceEntity planItemInstanceEntity = (PlanItemInstanceEntity) cmmnRuntimeService.createPlanItemInstanceQuery().planItemInstanceId(eventSubscription.getSubScopeId()).singleResult();
CmmnModel cmmnModel = cmmnEngineConfiguration.getCmmnRepositoryService().getCmmnModel(planItemInstanceEntity.getCaseDefinitionId());
PlanItem planItem = cmmnModel.findPlanItemByPlanItemDefinitionId(planItemInstanceEntity.getPlanItemDefinitionId());
if (PlanItemInstanceState.ACTIVE.equals(planItemInstanceEntity.getState())
|| (planItem != null && planItem.getPlanItemDefinition() instanceof EventListener
&& PlanItemInstanceState.AVAILABLE.equals(planItemInstanceEntity.getState()))) {

cmmnRuntimeService.createPlanItemInstanceTransitionBuilder(eventSubscription.getSubScopeId())
.transientVariable(EventConstants.EVENT_INSTANCE, eventInstance)
.trigger();

} else {
return false;
}

} else if (eventSubscription.getScopeDefinitionId() != null
&& eventSubscription.getScopeId() == null
Expand Down Expand Up @@ -122,7 +140,7 @@ protected void handleEventSubscription(CmmnRuntimeService cmmnRuntimeService, Ev
// Returning, no new instance should be started
eventConsumerInfo.setHasExistingInstancesForUniqueCorrelation(true);
LOGGER.debug("Event received to start a new case instance, but a unique instance already exists.");
return;
return true;

} else if (cmmnEngineConfiguration.isEventRegistryUniqueCaseInstanceCheckWithLock()) {

Expand Down Expand Up @@ -161,36 +179,35 @@ protected void handleEventSubscription(CmmnRuntimeService cmmnRuntimeService, Ev
// Returning, no new instance should be started
eventConsumerInfo.setHasExistingInstancesForUniqueCorrelation(true);
LOGGER.debug("Event received to start a new case instance, but a unique instance already exists.");
return;
return true;
}

startCaseInstance(caseInstanceBuilder, correlationKeyWithAllParameters.getValue(), ReferenceTypes.EVENT_CASE);
return;
return true;

} finally {
lockManager.releaseAndDeleteLock();

}

} else {
LOGGER.info(
"Lock for {} was not acquired. This means that another event has already acquired that lock and will start a new case instance. Ignoring this one.",
countLockName);
return;
LOGGER.info("Lock for {} was not acquired. This means that another event has already acquired that lock and will start a new case instance. Ignoring this one.", countLockName);
return true;

}

} else {
startCaseInstance(caseInstanceBuilder, correlationKeyWithAllParameters.getValue(), ReferenceTypes.EVENT_CASE);
return;
return true;
}

}
}

startCaseInstance(caseInstanceBuilder, null, null);

}

return true;
}

protected long countCaseInstances(CmmnRuntimeService cmmnRuntimeService, EventInstance eventInstance,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import java.util.List;
import java.util.function.Supplier;

import jakarta.jms.Message;
import jakarta.jms.TextMessage;

import org.flowable.cmmn.api.CmmnRuntimeService;
import org.flowable.cmmn.api.CmmnTaskService;
import org.flowable.cmmn.api.runtime.CaseInstance;
Expand All @@ -31,7 +28,12 @@
import org.flowable.common.engine.impl.interceptor.EngineConfigurationConstants;
import org.flowable.eventregistry.api.EventDeployment;
import org.flowable.eventregistry.api.EventRegistry;
import org.flowable.eventregistry.api.EventRegistryEvent;
import org.flowable.eventregistry.api.EventRegistryNonMatchingEventConsumer;
import org.flowable.eventregistry.api.EventRegistryProcessingInfo;
import org.flowable.eventregistry.api.EventRepositoryService;
import org.flowable.eventregistry.api.runtime.EventInstance;
import org.flowable.eventregistry.api.runtime.EventPayloadInstance;
import org.flowable.eventregistry.impl.EventRegistryEngineConfiguration;
import org.flowable.eventregistry.model.InboundChannelModel;
import org.flowable.task.api.Task;
Expand All @@ -43,6 +45,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import jakarta.jms.Message;
import jakarta.jms.TextMessage;

@CmmnJmsEventTest
@TestPropertySource(properties = {
"application.test.jms-queue=test-cmmn-queue"
Expand Down Expand Up @@ -151,6 +156,43 @@ public void testStartCaseWithEventDirectlyOnChannel() {
}
}

@Test
@CmmnDeployment(resources = { "org/flowable/eventregistry/integrationtest/caseWithEventListener.cmmn",
"org/flowable/eventregistry/integrationtest/one.event",
"org/flowable/eventregistry/integrationtest/one.channel"})
public void testStartCaseWithEventInUnavailableState() throws Exception {
try {
CaseInstance caseInstance = cmmnRuntimeService.createCaseInstanceBuilder().caseDefinitionKey("testEvent")
.start();

assertThat(cmmnTaskService.createTaskQuery().caseInstanceId(caseInstance.getId()).count()).isEqualTo(0);

TestNonMatchingEventConsumer nonMatchingEventConsumer = new TestNonMatchingEventConsumer();
getEventRegistryEngineConfiguration().setNonMatchingEventConsumer(nonMatchingEventConsumer);

InboundChannelModel channelModel = (InboundChannelModel) getEventRepositoryService().getChannelModelByKey("one");
ObjectNode eventNode = getEventRegistryEngineConfiguration().getObjectMapper().createObjectNode();
eventNode.put("payload1", "fozzie");
eventNode.put("payload2", 456);
getEventRegistry().eventReceived(channelModel, eventNode.toString());

assertThat(cmmnTaskService.createTaskQuery().caseInstanceId(caseInstance.getId()).count()).isEqualTo(0);

assertThat(nonMatchingEventConsumer.getNonMatchingEvent()).isNotNull();
EventInstance eventInstance = (EventInstance) nonMatchingEventConsumer.getNonMatchingEvent().getEventObject();
EventPayloadInstance payloadInstance = eventInstance.getPayloadInstances().iterator().next();
assertThat(payloadInstance.getDefinitionName()).isEqualTo("payload1");
assertThat(payloadInstance.getValue()).isEqualTo("fozzie");

} finally {
getEventRegistryEngineConfiguration().setNonMatchingEventConsumer(null);
List<EventDeployment> eventDeployments = getEventRepositoryService().createDeploymentQuery().list();
for (EventDeployment eventDeployment : eventDeployments) {
getEventRepositoryService().deleteDeployment(eventDeployment.getId());
}
}
}

@Test
@CmmnDeployment(resources = { "org/flowable/eventregistry/integrationtest/testSendEventTask.cmmn",
"org/flowable/eventregistry/integrationtest/one.event",
Expand Down Expand Up @@ -307,4 +349,22 @@ protected EventRegistry getEventRegistry() {
protected EventRegistryEngineConfiguration getEventRegistryEngineConfiguration() {
return (EventRegistryEngineConfiguration) cmmnEngine.getCmmnEngineConfiguration().getEngineConfigurations().get(EngineConfigurationConstants.KEY_EVENT_REGISTRY_CONFIG);
}

protected class TestNonMatchingEventConsumer implements EventRegistryNonMatchingEventConsumer {

protected EventRegistryEvent nonMatchingEvent;

@Override
public void handleNonMatchingEvent(EventRegistryEvent event, EventRegistryProcessingInfo eventRegistryProcessingInfo) {
nonMatchingEvent = event;
}

public EventRegistryEvent getNonMatchingEvent() {
return nonMatchingEvent;
}

public void setNonMatchingEvent(EventRegistryEvent nonMatchingEvent) {
this.nonMatchingEvent = nonMatchingEvent;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ public void testHeaderCorrelationEvent() {
assertThat(runtimeService.getVariable(anotherProcessInstance.getId(), "value2")).isEqualTo(456);

} finally {
getEventRegistryEngineConfiguration().setNonMatchingEventConsumer(null);
List<EventDeployment> eventDeployments = getEventRepositoryService().createDeploymentQuery().list();
for (EventDeployment eventDeployment : eventDeployments) {
getEventRepositoryService().deleteDeployment(eventDeployment.getId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/CMMN/20151109/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:flowable="http://flowable.org/cmmn" xmlns:cmmndi="http://www.omg.org/spec/CMMN/20151109/CMMNDI" xmlns:dc="http://www.omg.org/spec/CMMN/20151109/DC" xmlns:di="http://www.omg.org/spec/CMMN/20151109/DI" xmlns:design="http://flowable.org/design" targetNamespace="http://flowable.org/cmmn">
<case id="testEvent" name="testEvent">
<casePlanModel id="onecaseplanmodel1" name="Case plan model">
<planItem id="planItemhumanTask1" name="Human task" definitionRef="humanTask1">
<entryCriterion id="entryCriterion1" sentryRef="sentryentryCriterion1"></entryCriterion>
</planItem>
<planItem id="planItemeventListener1" definitionRef="eventListener1"></planItem>
<sentry id="sentryentryCriterion1">
<planItemOnPart id="sentryOnPartentryCriterion1" sourceRef="planItemeventListener1">
<standardEvent>occur</standardEvent>
</planItemOnPart>
</sentry>
<humanTask id="humanTask1" name="Human task" />
<eventListener id="eventListener1" flowable:availableCondition="${false}">
<extensionElements>
<flowable:eventType><![CDATA[one]]></flowable:eventType>
</extensionElements>
</eventListener>
</casePlanModel>
</case>
</definitions>

0 comments on commit e936552

Please sign in to comment.