Skip to content

Commit

Permalink
fix(sqs): fail fast when queue does not exist (#3415)
Browse files Browse the repository at this point in the history
* fix(sqs): fail fast when queue does not exist

* fix tests
  • Loading branch information
chillleader authored Oct 1, 2024
1 parent 351c15c commit 949a742
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package io.camunda.connector.inbound;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.QueueAttributeName;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import io.camunda.connector.api.annotation.InboundConnector;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.Health;
Expand All @@ -22,6 +24,7 @@
import io.camunda.connector.generator.java.annotation.ElementTemplate.ConnectorElementType;
import io.camunda.connector.generator.java.annotation.ElementTemplate.PropertyGroup;
import io.camunda.connector.inbound.model.SqsInboundProperties;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -63,6 +66,7 @@
templateNameOverride = "Amazon SQS Boundary Event Connector")
})
public class SqsExecutable implements InboundConnectorExecutable {

private static final Logger LOGGER = LoggerFactory.getLogger(SqsExecutable.class);
private final AmazonSQSClientSupplier sqsClientSupplier;
private final ExecutorService executorService;
Expand Down Expand Up @@ -100,6 +104,16 @@ public void activate(final InboundConnectorContext context) {
amazonSQS =
sqsClientSupplier.sqsClient(
CredentialsProviderSupport.credentialsProvider(properties), region);

try {
amazonSQS.getQueueAttributes(
properties.getQueue().url(),
List.of(QueueAttributeName.ApproximateNumberOfMessages.toString()));
} catch (QueueDoesNotExistException e) {
LOGGER.error("Queue does not exist, failing subscription activation");
throw new RuntimeException("Queue does not exist: " + properties.getQueue().url());
}

LOGGER.debug("SQS client created successfully");
if (sqsQueueConsumer == null) {
sqsQueueConsumer = new SqsQueueConsumer(amazonSQS, properties, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public void run() {
do {
try {
receiveMessageResult = sqsClient.receiveMessage(receiveMessageRequest);
} catch (Exception e) {
LOGGER.error("Failed to receive messages from SQS queue", e);
continue;
}
try {
List<Message> messages = receiveMessageResult.getMessages();
for (Message message : messages) {
context.log(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.Files.readString;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
Expand All @@ -19,6 +20,7 @@
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -110,6 +112,9 @@ public void activateTest(Map<String, Object> properties) throws InterruptedExcep
@Test
public void deactivateTest() {
// Given
when(sqsClient.getQueueAttributes(any(), any())).thenReturn(null);
when(supplier.sqsClient(any(AWSCredentialsProvider.class), eq(ACTUAL_QUEUE_REGION)))
.thenReturn(sqsClient);
Map<String, Object> properties =
Map.of(
"authentication",
Expand All @@ -133,6 +138,30 @@ public void deactivateTest() {
assertThat(executorService.isShutdown()).isTrue();
}

@Test
public void nonExistingQueueTest() {
// Given
when(sqsClient.getQueueAttributes(any(), any())).thenThrow(new QueueDoesNotExistException(""));
when(supplier.sqsClient(any(AWSCredentialsProvider.class), eq(ACTUAL_QUEUE_REGION)))
.thenReturn(sqsClient);
Map<String, Object> properties =
Map.of(
"authentication",
Map.of(
"secretKey", ACTUAL_SECRET_KEY,
"accessKey", ACTUAL_ACCESS_KEY),
"configuration",
Map.of("region", "us-east-1"),
"queue",
Map.of("url", ACTUAL_QUEUE_URL, "pollingWaitTime", "1"));
var context = createConnectorContext(properties, createDefinition());
consumer = new SqsQueueConsumer(sqsClient, new SqsInboundProperties(), context);
consumer.setQueueConsumerActive(true);
SqsExecutable sqsExecutable = new SqsExecutable(supplier, executorService, consumer);
// When & then
assertThrows(RuntimeException.class, () -> sqsExecutable.activate(context));
}

private InboundConnectorDefinition createDefinition() {
var element = new ProcessElement("proc-id", 1, 2, "element-id", "<default>");
return InboundConnectorDefinitionBuilder.create().elements(element).type("type").build();
Expand Down

0 comments on commit 949a742

Please sign in to comment.