Skip to content

Commit

Permalink
fix(webhook): support failure handling strategy (#2787)
Browse files Browse the repository at this point in the history
* fix(webhook): support failure handling strategy

* clean up
  • Loading branch information
chillleader authored Jun 24, 2024
1 parent 5f93d4a commit aa7e844
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import static org.springframework.web.bind.annotation.RequestMethod.PUT;

import io.camunda.connector.api.error.ConnectorException;
import io.camunda.connector.api.inbound.CorrelationFailureHandlingStrategy.ForwardErrorToUpstream;
import io.camunda.connector.api.inbound.CorrelationFailureHandlingStrategy.Ignore;
import io.camunda.connector.api.inbound.CorrelationResult;
import io.camunda.connector.api.inbound.CorrelationResult.Success.MessagePublished;
import io.camunda.connector.api.inbound.CorrelationResult.Success.ProcessInstanceCreated;
Expand Down Expand Up @@ -134,25 +136,17 @@ private ResponseEntity<?> buildResponse(
response = buildSuccessfulResponse(webhookResult, success);
} else {
if (correlationResult instanceof CorrelationResult.Failure failure) {
response = buildResponse(webhookResult, failure);
switch (failure.handlingStrategy()) {
case ForwardErrorToUpstream ignored -> response = buildErrorResponse(failure);
case Ignore ignored -> response = buildSuccessfulResponse(webhookResult, null);
}
} else {
throw new IllegalStateException("Illegal correlation result : " + correlationResult);
}
}
return response;
}

private ResponseEntity<?> buildResponse(
WebhookResult webhookResult, CorrelationResult.Failure failure) {
ResponseEntity<?> response;
if (failure instanceof CorrelationResult.Failure.ActivationConditionNotMet) {
response = buildSuccessfulResponse(webhookResult, null);
} else {
response = buildErrorResponse(failure);
}
return response;
}

private ResponseEntity<?> buildErrorResponse(CorrelationResult.Failure failure) {
ResponseEntity<?> response;
if (failure instanceof CorrelationResult.Failure.Other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void testSuccessfulProcessingWithActivationAndStrictResponse() throws Exc
}

@Test
public void testSuccessfulProcessingWithFailedActivation() throws Exception {
public void testSuccessfulProcessingWithFailedActivation_NoConsumeUnmatched() throws Exception {
WebhookConnectorExecutable webhookConnectorExecutable = mock(WebhookConnectorExecutable.class);
WebhookResult webhookResult = mock(WebhookResult.class);
when(webhookResult.request()).thenReturn(new MappedHttpRequest(Map.of(), Map.of(), Map.of()));
Expand Down Expand Up @@ -209,6 +209,47 @@ public void testSuccessfulProcessingWithFailedActivation() throws Exception {
new HashMap<>(),
new MockHttpServletRequest());

assertEquals(422, responseEntity.getStatusCode().value());
assertNotNull(responseEntity.getBody());
}

@Test
public void testSuccessfulProcessingWithFailedActivation_ConsumeUnmatched() throws Exception {
WebhookConnectorExecutable webhookConnectorExecutable = mock(WebhookConnectorExecutable.class);
WebhookResult webhookResult = mock(WebhookResult.class);
when(webhookResult.request()).thenReturn(new MappedHttpRequest(Map.of(), Map.of(), Map.of()));
when(webhookResult.response())
.thenReturn((WebhookResultContext) -> new WebhookHttpResponse(Map.of(), null, null));
when(webhookConnectorExecutable.triggerWebhook(any(WebhookProcessingPayload.class)))
.thenReturn(webhookResult);

var correlationHandlerMock = mock(InboundCorrelationHandler.class);
when(correlationHandlerMock.correlate(any(), any()))
.thenReturn(new CorrelationResult.Failure.ActivationConditionNotMet(true));

var webhookDef = webhookDefinition("nonExistingProcess", 1, "myPath");
var webhookContext =
new InboundConnectorContextImpl(
secretProvider,
v -> {},
webhookDef,
correlationHandlerMock,
(e) -> {},
mapper,
EvictingQueue.create(10));

// Register webhook function 'implementation'
webhookConnectorRegistry.register(
new RegisteredExecutable.Activated(webhookConnectorExecutable, webhookContext));

ResponseEntity<?> responseEntity =
controller.inbound(
"myPath",
new HashMap<>(),
"{}".getBytes(),
new HashMap<>(),
new MockHttpServletRequest());

assertEquals(200, responseEntity.getStatusCode().value());
assertNotNull(responseEntity.getBody());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.camunda.connector.api.inbound;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.camunda.connector.api.inbound.CorrelationFailureHandlingStrategy.ForwardErrorToUpstream;
import io.camunda.connector.api.inbound.CorrelationFailureHandlingStrategy.Ignore;

Expand All @@ -42,6 +43,7 @@ record MessageAlreadyCorrelated(@JsonIgnore ProcessElementContext activatedEleme

sealed interface Failure extends CorrelationResult {

@JsonProperty
String message();

default CorrelationFailureHandlingStrategy handlingStrategy() {
Expand All @@ -56,7 +58,7 @@ public CorrelationFailureHandlingStrategy handlingStrategy() {
}
}

record ActivationConditionNotMet(boolean consumeUnmatched) implements Failure {
record ActivationConditionNotMet(@JsonIgnore boolean consumeUnmatched) implements Failure {

@Override
public String message() {
Expand Down

0 comments on commit aa7e844

Please sign in to comment.