Skip to content

Commit

Permalink
V2 (#76)
Browse files Browse the repository at this point in the history
* Upgrade to java sdk v2
  • Loading branch information
adam-aws authored Mar 9, 2022
1 parent 8b7e3ff commit f4ccdbd
Show file tree
Hide file tree
Showing 41 changed files with 1,157 additions and 1,030 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ Temporary queues are also automatically deleted if the clients that created them
2. Sign in to the [Amazon SQS console](https://console.aws.amazon.com/sqs/home?region=us-east-1).
3. To use the Temporary Queue client, you'll need [Java 8 (or later)](https://www.java.com/en/download/) and [Maven 3](http://maven.apache.org/).
4. [Download the latest release](https://github.com/awslabs/amazon-sqs-java-temporary-queues-client/releases) or add a Maven dependency into your `pom.xml` file:
### Version 2.x
```xml
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-temporary-queues-client</artifactId>
<version>2.0.0</version>
<type>jar</type>
</dependency>
```

### Version 1.x
```xml
<dependency>
<groupId>com.amazonaws</groupId>
Expand Down
41 changes: 22 additions & 19 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-temporary-queues-client</artifactId>
<version>1.2.4</version>
<version>2.0.0</version>
<name>Amazon SQS Java Temporary Queues Client</name>
<description>An Amazon SQS client that supports creating lightweight, automatically-deleted temporary queues, for use in common messaging patterns such as Request/Response. See http://aws.amazon.com/sqs.</description>
<url>https://github.com/awslabs/amazon-sqs-java-temporary-queues-client</url>
Expand All @@ -27,26 +27,28 @@
</developer>
</developers>
<properties>
<aws-java-sdk.version>1.12.128</aws-java-sdk.version>
<aws-java-sdk.version>2.17.140</aws-java-sdk.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>1.12.128</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/software.amazon.awssdk/core -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>test-utils</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/commons-logging/commons-logging -->
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>

<dependency>
Expand All @@ -64,7 +66,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.10.0</version>
<version>2.28.2</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -74,8 +76,9 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>${aws-java-sdk.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions src/main/java/com/amazonaws/services/sqs/AmazonSQSRequester.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
package com.amazonaws.services.sqs;

import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.SendMessageRequest;

public interface AmazonSQSRequester {

AmazonSQS getAmazonSQS();
SqsClient getAmazonSQS();

/**
* Sends a message and waits the given amount of time for
* the response message.
*/
public Message sendMessageAndGetResponse(SendMessageRequest request,
int timeout, TimeUnit unit) throws TimeoutException;
public Message sendMessageAndGetResponse(SendMessageRequest request,
int timeout, TimeUnit unit) throws TimeoutException;

/**
* Sends a message and returns a <tt>CompletableFuture</tt>
* that will be completed with the response message when it arrives.
*/
public CompletableFuture<Message> sendMessageAndGetResponseAsync(SendMessageRequest request,
int timeout, TimeUnit unit);
public CompletableFuture<Message> sendMessageAndGetResponseAsync(SendMessageRequest request,
int timeout, TimeUnit unit);

public void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,22 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.util.Constants;
import com.amazonaws.services.sqs.util.SQSMessageConsumer;
import com.amazonaws.services.sqs.util.SQSQueueUtils;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

/**
* Implementation of the request/response interfaces that creates a single
* temporary queue for each response message.
*/
class AmazonSQSRequesterClient implements AmazonSQSRequester {
private final AmazonSQS sqs;
private final SqsClient sqs;
private final String queuePrefix;
private final Map<String, String> queueAttributes;
private final Consumer<Exception> exceptionHandler;
Expand All @@ -33,11 +35,11 @@ class AmazonSQSRequesterClient implements AmazonSQSRequester {

private Runnable shutdownHook;

AmazonSQSRequesterClient(AmazonSQS sqs, String queuePrefix, Map<String, String> queueAttributes) {
AmazonSQSRequesterClient(SqsClient sqs, String queuePrefix, Map<String, String> queueAttributes) {
this(sqs, queuePrefix, queueAttributes, SQSQueueUtils.DEFAULT_EXCEPTION_HANDLER);
}

AmazonSQSRequesterClient(AmazonSQS sqs, String queuePrefix, Map<String, String> queueAttributes,
AmazonSQSRequesterClient(SqsClient sqs, String queuePrefix, Map<String, String> queueAttributes,
Consumer<Exception> exceptionHandler) {
this.sqs = sqs;
this.queuePrefix = queuePrefix;
Expand All @@ -50,7 +52,7 @@ public void setShutdownHook(Runnable shutdownHook) {
}

@Override
public AmazonSQS getAmazonSQS() {
public SqsClient getAmazonSQS() {
return sqs;
}

Expand All @@ -62,14 +64,14 @@ public Message sendMessageAndGetResponse(SendMessageRequest request, int timeout
@Override
public CompletableFuture<Message> sendMessageAndGetResponseAsync(SendMessageRequest request, int timeout, TimeUnit unit) {
String queueName = queuePrefix + UUID.randomUUID().toString();
CreateQueueRequest createQueueRequest = new CreateQueueRequest()
.withQueueName(queueName)
.withAttributes(queueAttributes);
String responseQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();
CreateQueueRequest createQueueRequest = CreateQueueRequest.builder()
.queueName(queueName)
.attributesWithStrings(queueAttributes).build();
String responseQueueUrl = sqs.createQueue(createQueueRequest).queueUrl();

SendMessageRequest requestWithResponseUrl = SQSQueueUtils.copyWithExtraAttributes(request,
Collections.singletonMap(Constants.RESPONSE_QUEUE_URL_ATTRIBUTE_NAME,
new MessageAttributeValue().withDataType("String").withStringValue(responseQueueUrl)));
MessageAttributeValue.builder().dataType("String").stringValue(responseQueueUrl).build()));
// TODO-RS: Should be using sendMessageAsync
sqs.sendMessage(requestWithResponseUrl);

Expand Down Expand Up @@ -102,7 +104,7 @@ protected void accept(Message message) {
@Override
protected void runShutdownHook() {
future.completeExceptionally(new TimeoutException());
sqs.deleteQueue(queueUrl);
sqs.deleteQueue(DeleteQueueRequest.builder().queueUrl(queueUrl).build());
responseConsumers.remove(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.amazonaws.services.sqs;

import com.amazonaws.services.sqs.util.Constants;
import software.amazon.awssdk.services.sqs.SqsClient;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -10,7 +11,7 @@

public class AmazonSQSRequesterClientBuilder {

private Optional<AmazonSQS> customSQS = Optional.empty();
private Optional<SqsClient> customSQS = Optional.empty();

private String internalQueuePrefix = "__RequesterClientQueues__";

Expand All @@ -35,15 +36,15 @@ public static AmazonSQSRequester defaultClient() {
return standard().build();
}

public Optional<AmazonSQS> getAmazonSQS() {
public Optional<SqsClient> getAmazonSQS() {
return customSQS;
}

public void setAmazonSQS(AmazonSQS sqs) {
public void setAmazonSQS(SqsClient sqs) {
this.customSQS = Optional.of(sqs);
}

public AmazonSQSRequesterClientBuilder withAmazonSQS(AmazonSQS sqs) {
public AmazonSQSRequesterClientBuilder withAmazonSQS(SqsClient sqs) {
setAmazonSQS(sqs);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.amazonaws.services.sqs;

import com.amazonaws.services.sqs.AmazonSQS;
import software.amazon.awssdk.services.sqs.SqsClient;

public interface AmazonSQSResponder {
AmazonSQS getAmazonSQS();

SqsClient getAmazonSQS();

/**
* Tests whether the given message was sent using
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

class AmazonSQSResponderClient implements AmazonSQSResponder {

private static final Log LOG = LogFactory.getLog(AmazonSQSResponderClient.class);

private final AmazonSQS sqs;
private final SqsClient sqs;

public AmazonSQSResponderClient(AmazonSQS sqs) {
public AmazonSQSResponderClient(SqsClient sqs) {
this.sqs = sqs;
}

@Override
public AmazonSQS getAmazonSQS() {
public SqsClient getAmazonSQS() {
return sqs;
}

Expand All @@ -28,10 +29,10 @@ public void sendResponseMessage(MessageContent request, MessageContent response)
MessageAttributeValue attribute = request.getMessageAttributes().get(Constants.RESPONSE_QUEUE_URL_ATTRIBUTE_NAME);

if (attribute != null) {
String replyQueueUrl = attribute.getStringValue();
String replyQueueUrl = attribute.stringValue();
try {
SendMessageRequest responseRequest = response.toSendMessageRequest()
.withQueueUrl(replyQueueUrl);
SendMessageRequest responseRequest = response.toSendMessageRequest().toBuilder()
.queueUrl(replyQueueUrl).build();
sqs.sendMessage(responseRequest);
} catch (QueueDoesNotExistException e) {
// Stale request, ignore
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.amazonaws.services.sqs;

import com.amazonaws.services.sqs.util.Constants;
import software.amazon.awssdk.services.sqs.SqsClient;

import java.util.Optional;

public class AmazonSQSResponderClientBuilder {

private Optional<AmazonSQS> customSQS = Optional.empty();
private Optional<SqsClient> customSQS = Optional.empty();

private String internalQueuePrefix = "__RequesterClientQueues__";
private long queueHeartbeatInterval = Constants.HEARTBEAT_INTERVAL_SECONDS_DEFAULT;
Expand All @@ -18,15 +19,15 @@ public static AmazonSQSResponder defaultClient() {
return standard().build();
}

public Optional<AmazonSQS> getAmazonSQS() {
public Optional<SqsClient> getAmazonSQS() {
return customSQS;
}

public void setAmazonSQS(AmazonSQS sqs) {
public void setAmazonSQS(SqsClient sqs) {
this.customSQS = Optional.of(sqs);
}

public AmazonSQSResponderClientBuilder withAmazonSQS(AmazonSQS sqs) {
public AmazonSQSResponderClientBuilder withAmazonSQS(SqsClient sqs) {
setAmazonSQS(sqs);
return this;
}
Expand Down Expand Up @@ -65,9 +66,9 @@ public static AmazonSQSResponderClientBuilder standard() {
}

public AmazonSQSResponder build() {
AmazonSQS sqs = customSQS.orElseGet(AmazonSQSClientBuilder::defaultClient);
AmazonSQS deleter = new AmazonSQSIdleQueueDeletingClient(sqs, internalQueuePrefix, queueHeartbeatInterval);
AmazonSQS virtualQueuesClient = AmazonSQSVirtualQueuesClientBuilder.standard().withAmazonSQS(deleter)
SqsClient sqs = customSQS.orElseGet(SqsClient::create);
SqsClient deleter = new AmazonSQSIdleQueueDeletingClient(sqs, internalQueuePrefix, queueHeartbeatInterval);
SqsClient virtualQueuesClient = AmazonSQSVirtualQueuesClientBuilder.standard().withAmazonSQS(deleter)
.withHeartbeatIntervalSeconds(queueHeartbeatInterval).build();
return new AmazonSQSResponderClient(virtualQueuesClient);
}
Expand Down
Loading

0 comments on commit f4ccdbd

Please sign in to comment.