Skip to content

Commit

Permalink
feat: add message start event support (#1173)
Browse files Browse the repository at this point in the history
  • Loading branch information
igpetrov authored Sep 27, 2023
1 parent ce63dc5 commit d997f41
Show file tree
Hide file tree
Showing 19 changed files with 2,461 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class Keywords {
*/
public static final String CORRELATION_KEY_EXPRESSION_KEYWORD = "correlationKeyExpression";

public static final String START_MESSAGE_EVENT_MESSAGE_ID_EXPRESSION = "messageIdExpression";

/**
* The keyword that identifies the source of `activation condition` property of a Connector.
* Activation condition is a boolean FEEL expression that determines whether the inbound Connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@
import io.camunda.connector.api.error.ConnectorInputException;
import io.camunda.connector.api.inbound.InboundConnectorResult;
import io.camunda.connector.api.inbound.correlation.MessageCorrelationPoint;
import io.camunda.connector.api.inbound.correlation.MessageStartEventCorrelationPoint;
import io.camunda.connector.api.inbound.correlation.StartEventCorrelationPoint;
import io.camunda.connector.api.inbound.result.CorrelatedMessage;
import io.camunda.connector.api.inbound.result.CorrelatedMessageStart;
import io.camunda.connector.api.inbound.result.CorrelationErrorData;
import io.camunda.connector.api.inbound.result.CorrelationErrorData.CorrelationErrorReason;
import io.camunda.connector.api.inbound.result.MessageCorrelationResult;
import io.camunda.connector.api.inbound.result.MessageStartCorrelationResult;
import io.camunda.connector.api.inbound.result.ProcessInstance;
import io.camunda.connector.api.inbound.result.StartEventCorrelationResult;
import io.camunda.connector.feel.FeelEngineWrapper;
import io.camunda.connector.feel.FeelEngineWrapperException;
import io.camunda.connector.runtime.core.ConnectorHelper;
import io.camunda.connector.runtime.core.inbound.InboundConnectorDefinitionImpl;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.command.ClientStatusException;
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.client.api.response.PublishMessageResponse;
import org.slf4j.Logger;
Expand Down Expand Up @@ -60,6 +64,9 @@ public InboundConnectorResult<?> correlate(
if (correlationPoint instanceof MessageCorrelationPoint msgCorPoint) {
return triggerMessage(definition, msgCorPoint, variables);
}
if (correlationPoint instanceof MessageStartEventCorrelationPoint msgStartCorPoint) {
return triggerMessageStartEvent(definition, msgStartCorPoint, variables);
}
throw new ConnectorException(
"Process correlation point "
+ correlationPoint.getClass()
Expand Down Expand Up @@ -103,6 +110,70 @@ protected InboundConnectorResult<ProcessInstance> triggerStartEvent(
}
}

protected InboundConnectorResult<CorrelatedMessageStart> triggerMessageStartEvent(
InboundConnectorDefinitionImpl definition,
MessageStartEventCorrelationPoint correlationPoint,
Object variables) {

if (!isActivationConditionMet(definition, variables)) {
LOG.debug("Activation condition didn't match: {}", correlationPoint);
return new MessageStartCorrelationResult(
correlationPoint.messageName(),
new CorrelationErrorData(CorrelationErrorReason.ACTIVATION_CONDITION_NOT_MET));
}

String messageId = extractMessageKey(correlationPoint, variables);
if (correlationPoint.messageIdExpression() != null
&& !correlationPoint.messageIdExpression().isBlank()
&& messageId == null) {
LOG.debug(
"Wasn't able to obtain idempotency key for expression {}.",
correlationPoint.messageIdExpression());
return new MessageStartCorrelationResult(
correlationPoint.messageName(),
new CorrelationErrorData(CorrelationErrorReason.FAULT_IDEMPOTENCY_KEY));
}

Object extractedVariables = extractVariables(variables, definition);

try {
PublishMessageResponse result =
zeebeClient
.newPublishMessageCommand()
.messageName(correlationPoint.messageName())
// correlation key must be empty to start a new process, see:
// https://docs.camunda.io/docs/components/modeler/bpmn/message-events/#message-start-events
.correlationKey("")
.messageId(messageId)
.tenantId(definition.tenantId())
.variables(extractedVariables)
.send()
.join();

LOG.info("Published message with key: " + result.getMessageKey());

return new MessageStartCorrelationResult(
correlationPoint.messageName(),
new CorrelatedMessageStart(
result.getMessageKey(),
messageId,
correlationPoint.bpmnProcessId(),
correlationPoint.processDefinitionKey(),
correlationPoint.version()));

} catch (ClientStatusException e1) {
// gracefully handle zeebe rejections, such as idempotency key rejection
LOG.info("Failed to publish message: ", e1);
return new MessageStartCorrelationResult(
correlationPoint.messageName(),
new CorrelationErrorData(
CorrelationErrorReason.FAULT_ZEEBE_CLIENT_STATUS, e1.getMessage()));
} catch (Exception e2) {
throw new ConnectorException(
"Failed to publish process message for subscription: " + correlationPoint, e2);
}
}

protected InboundConnectorResult<CorrelatedMessage> triggerMessage(
InboundConnectorDefinitionImpl definition,
MessageCorrelationPoint correlationPoint,
Expand Down Expand Up @@ -164,6 +235,18 @@ protected String extractCorrelationKey(MessageCorrelationPoint point, Object con
}
}

protected String extractMessageKey(MessageStartEventCorrelationPoint point, Object context) {
final String messageIdExpression = point.messageIdExpression();
if (messageIdExpression == null || messageIdExpression.isBlank()) {
return "";
}
try {
return feelEngine.evaluate(messageIdExpression, context, String.class);
} catch (Exception e) {
throw new ConnectorInputException(e);
}
}

protected Object extractVariables(
Object rawVariables, InboundConnectorDefinitionImpl definition) {
return ConnectorHelper.createOutputVariables(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@

import io.camunda.connector.api.inbound.InboundConnectorResult;
import io.camunda.connector.api.inbound.correlation.MessageCorrelationPoint;
import io.camunda.connector.api.inbound.correlation.MessageStartEventCorrelationPoint;
import io.camunda.connector.api.inbound.correlation.StartEventCorrelationPoint;
import io.camunda.connector.api.inbound.result.CorrelationErrorData.CorrelationErrorReason;
import io.camunda.connector.api.inbound.result.MessageStartCorrelationResult;
import io.camunda.connector.api.inbound.result.ProcessInstance;
import io.camunda.connector.api.inbound.result.StartEventCorrelationResult;
import io.camunda.connector.feel.FeelEngineWrapper;
Expand Down Expand Up @@ -106,6 +108,55 @@ void message_shouldCallCorrectZeebeMethod() {
verify(dummyCommand).correlationKey(correlationKeyValue);
verify(dummyCommand).send();
}

@Test
void startMessageEvent_shouldCallCorrectZeebeMethod() {
// given
var point = new MessageStartEventCorrelationPoint("test", "", "1", 1, 0);
var definition = mock(InboundConnectorDefinitionImpl.class);
when(definition.correlationPoint()).thenReturn(point);

var dummyCommand = Mockito.spy(new PublishMessageCommandDummy());
when(zeebeClient.newPublishMessageCommand()).thenReturn(dummyCommand);

// when
handler.correlate(definition, Collections.emptyMap());

// then
verify(zeebeClient).newPublishMessageCommand();
verifyNoMoreInteractions(zeebeClient);

verify(dummyCommand).messageName("test");
verify(dummyCommand).correlationKey("");
verify(dummyCommand).send();
}

@Test
void startMessageEvent_idempotencyKeyEvaluated() {
// given
var point = new MessageStartEventCorrelationPoint("test", "=myVar", "1", 1, 0);
var definition = mock(InboundConnectorDefinitionImpl.class);
when(definition.correlationPoint()).thenReturn(point);

var dummyCommand = Mockito.spy(new PublishMessageCommandDummy());
when(zeebeClient.newPublishMessageCommand()).thenReturn(dummyCommand);

// when
handler.correlate(
definition,
Map.of("myVar", "myValue", "myOtherMap", Map.of("myOtherKey", "myOtherValue")));

// then
verify(zeebeClient).newPublishMessageCommand();
verifyNoMoreInteractions(zeebeClient);

ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
verify(dummyCommand).messageName("test");
verify(dummyCommand).correlationKey("");
verify(dummyCommand).messageId(captor.capture());
assertThat(captor.getValue()).isEqualTo("myValue");
verify(dummyCommand).send();
}
}

@Nested
Expand Down Expand Up @@ -236,6 +287,116 @@ void activationConditionBlank_shouldCorrelate() {
assertThat(instance.getBpmnProcessId()).isEqualTo(point.bpmnProcessId());
assertThat(instance.getVersion()).isEqualTo(point.version());
}

@Test
void messageStartEvent_activationConditionFalse_shouldNotCorrelate() {
// given
var point = new MessageStartEventCorrelationPoint("testMsg", "=myVar", "1", 1, 0);
var definition = mock(InboundConnectorDefinitionImpl.class);
when(definition.correlationPoint()).thenReturn(point);
when(definition.activationCondition()).thenReturn("=testKey=\"otherValue\"");

Map<String, Object> variables = Map.of("testKey", "testValue");

// when
InboundConnectorResult<?> result = handler.correlate(definition, variables);

// then
verifyNoMoreInteractions(zeebeClient);

assertThat(result).isInstanceOf(MessageStartCorrelationResult.class);
assertThat(result.getCorrelationPointId()).isEqualTo("testMsg");
assertThat(result.getType()).isEqualTo(MessageStartCorrelationResult.TYPE_NAME);
assertFalse(result.getResponseData().isPresent());
assertFalse(result.isActivated());
assertThat(result.getErrorData().isPresent()).isTrue();
assertThat(result.getErrorData().get().reason())
.isEqualTo(CorrelationErrorReason.ACTIVATION_CONDITION_NOT_MET);
}

@Test
void messageStartEvent_activationConditionTrue_shouldCorrelate() {
// given
var dummyCommand = Mockito.spy(new PublishMessageCommandDummy());
when(zeebeClient.newPublishMessageCommand()).thenReturn(dummyCommand);

var point = new MessageStartEventCorrelationPoint("testMsg", "=myVar", "1", 1, 0);
var definition = mock(InboundConnectorDefinitionImpl.class);
when(definition.correlationPoint()).thenReturn(point);
when(definition.activationCondition()).thenReturn("=myOtherMap.myOtherKey=\"myOtherValue\"");

Map<String, Object> variables =
Map.of("myVar", "myValue", "myOtherMap", Map.of("myOtherKey", "myOtherValue"));

// when
InboundConnectorResult<?> result = handler.correlate(definition, variables);

// then
verify(zeebeClient).newPublishMessageCommand();

assertThat(result).isInstanceOf(MessageStartCorrelationResult.class);
assertThat(result.getCorrelationPointId()).isEqualTo("testMsg");
assertThat(result.getType()).isEqualTo(MessageStartCorrelationResult.TYPE_NAME);
assertThat(result.isActivated()).isTrue();
assertThat(result.getResponseData().isPresent()).isTrue();
assertThat(result.getErrorData().isPresent()).isFalse();
}

@Test
void messageStartEvent_activationConditionNull_shouldCorrelate() {
// given
var dummyCommand = Mockito.spy(new PublishMessageCommandDummy());
when(zeebeClient.newPublishMessageCommand()).thenReturn(dummyCommand);

var point = new MessageStartEventCorrelationPoint("testMsg", "=myVar", "1", 1, 0);
var definition = mock(InboundConnectorDefinitionImpl.class);
when(definition.correlationPoint()).thenReturn(point);
when(definition.activationCondition()).thenReturn(null);

Map<String, Object> variables =
Map.of("myVar", "myValue", "myOtherMap", Map.of("myOtherKey", "myOtherValue"));

// when
InboundConnectorResult<?> result = handler.correlate(definition, variables);

// then
verify(zeebeClient).newPublishMessageCommand();

assertThat(result).isInstanceOf(MessageStartCorrelationResult.class);
assertThat(result.getCorrelationPointId()).isEqualTo("testMsg");
assertThat(result.getType()).isEqualTo(MessageStartCorrelationResult.TYPE_NAME);
assertThat(result.isActivated()).isTrue();
assertThat(result.getResponseData().isPresent()).isTrue();
assertThat(result.getErrorData().isPresent()).isFalse();
}

@Test
void messageStartEvent_activationConditionBlank_shouldCorrelate() {
// given
var dummyCommand = Mockito.spy(new PublishMessageCommandDummy());
when(zeebeClient.newPublishMessageCommand()).thenReturn(dummyCommand);

var point = new MessageStartEventCorrelationPoint("testMsg", "=myVar", "1", 1, 0);
var definition = mock(InboundConnectorDefinitionImpl.class);
when(definition.correlationPoint()).thenReturn(point);
when(definition.activationCondition()).thenReturn(" ");

Map<String, Object> variables =
Map.of("myVar", "myValue", "myOtherMap", Map.of("myOtherKey", "myOtherValue"));

// when
InboundConnectorResult<?> result = handler.correlate(definition, variables);

// then
verify(zeebeClient).newPublishMessageCommand();

assertThat(result).isInstanceOf(MessageStartCorrelationResult.class);
assertThat(result.getCorrelationPointId()).isEqualTo("testMsg");
assertThat(result.getType()).isEqualTo(MessageStartCorrelationResult.TYPE_NAME);
assertThat(result.isActivated()).isTrue();
assertThat(result.getResponseData().isPresent()).isTrue();
assertThat(result.getErrorData().isPresent()).isFalse();
}
}

@Nested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import static io.camunda.connector.runtime.core.Keywords.CORRELATION_KEY_EXPRESSION_KEYWORD;
import static io.camunda.connector.runtime.core.Keywords.INBOUND_TYPE_KEYWORD;
import static io.camunda.connector.runtime.core.Keywords.START_MESSAGE_EVENT_MESSAGE_ID_EXPRESSION;

import io.camunda.connector.api.inbound.InboundConnectorDefinition;
import io.camunda.connector.api.inbound.correlation.MessageCorrelationPoint;
import io.camunda.connector.api.inbound.correlation.MessageStartEventCorrelationPoint;
import io.camunda.connector.api.inbound.correlation.ProcessCorrelationPoint;
import io.camunda.connector.api.inbound.correlation.StartEventCorrelationPoint;
import io.camunda.connector.runtime.core.inbound.InboundConnectorDefinitionImpl;
Expand Down Expand Up @@ -182,14 +184,14 @@ private Collection<FlowElement> collectFlowElements(
private Optional<ProcessCorrelationPoint> getCorrelationPointForElement(
BaseElement element, Process process, ProcessDefinition definition) {

if (element instanceof StartEvent) {
return getCorrelationPointForStartEvent(process, definition);
} else if (element instanceof IntermediateCatchEvent) {
return getCorrelationPointForIntermediateCatchEvent((IntermediateCatchEvent) element);
} else if (element instanceof BoundaryEvent) {
return getCorrelationPointForIntermediateBoundaryEvent((BoundaryEvent) element);
} else if (element instanceof ReceiveTask) {
return getCorrelationPointForReceiveTask((ReceiveTask) element);
if (element instanceof StartEvent se) {
return getCorrelationPointForStartEvent(se, process, definition);
} else if (element instanceof IntermediateCatchEvent ice) {
return getCorrelationPointForIntermediateCatchEvent(ice);
} else if (element instanceof BoundaryEvent be) {
return getCorrelationPointForIntermediateBoundaryEvent(be);
} else if (element instanceof ReceiveTask rt) {
return getCorrelationPointForReceiveTask(rt);
}
LOG.warn("Unsupported Inbound element type: " + element.getClass());
return Optional.empty();
Expand Down Expand Up @@ -226,7 +228,26 @@ private Optional<ProcessCorrelationPoint> getCorrelationPointCatchEvent(CatchEve
}

private Optional<ProcessCorrelationPoint> getCorrelationPointForStartEvent(
Process process, ProcessDefinition definition) {
StartEvent startEvent, Process process, ProcessDefinition definition) {

MessageEventDefinition msgDef =
(MessageEventDefinition)
startEvent.getEventDefinitions().stream()
.filter(def -> def instanceof MessageEventDefinition)
.findAny()
.orElse(null);

if (msgDef != null) {
String messageId =
extractRequiredProperty(startEvent, START_MESSAGE_EVENT_MESSAGE_ID_EXPRESSION);
return Optional.of(
new MessageStartEventCorrelationPoint(
msgDef.getMessage().getName(),
messageId,
process.getId(),
definition.getVersion().intValue(),
definition.getKey()));
}

return Optional.of(
new StartEventCorrelationPoint(
Expand Down Expand Up @@ -260,6 +281,6 @@ private String extractRequiredProperty(BaseElement element, String name) {
.filter(property -> property.getName().equals(name))
.findAny()
.map(ZeebeProperty::getValue)
.orElseThrow(() -> new IllegalStateException("Missing required property " + name));
.orElse("");
}
}
Loading

0 comments on commit d997f41

Please sign in to comment.