Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Commit

Permalink
feat: Upgrade to spring-boot 3, spring-cloud-aws 3 and AWS SDK 2 (#155)
Browse files Browse the repository at this point in the history
* feat: Upgrade to spring-boot 3, spring-cloud-aws 3 and AWS SDK 2

BREAKING CHANGES
no longer compatible to spring-cloud-aws 2 and the AWS SDK 1

* feat: health indicator refactored

* fix: change env variable in the test container

* fix: move property to correct path

Refs: INVTECH-INVTECH-2027

* fix: move property to correct path

Refs: INVTECH-INVTECH-2027
  • Loading branch information
ypavlovidealo authored Jul 21, 2023
1 parent 737dba4 commit c00d64d
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 212 deletions.
24 changes: 16 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>de.idealo.spring</groupId>
Expand Down Expand Up @@ -44,11 +45,13 @@
</distributionManagement>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<spring.cloud-version>2022.0.3</spring.cloud-version>
<spring.cloud-aws-version>3.0.1</spring.cloud-aws-version>
<spring.boot-version>3.1.0</spring.boot-version>
<testcontainers.version>1.18.1</testcontainers.version>
<spring.integration-version>3.0.0</spring.integration-version>
<testcontainers.version>1.18.3</testcontainers.version>

<sonar.projectKey>idealo_spring-cloud-stream-binder-sns</sonar.projectKey>
<sonar.organization>idealo</sonar.organization>
Expand All @@ -67,7 +70,7 @@
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-dependencies</artifactId>
<version>2.4.4</version>
<version>${spring.cloud-aws-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down Expand Up @@ -95,16 +98,16 @@
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-starter-aws</artifactId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-messaging</artifactId>
<artifactId>spring-cloud-aws-sns</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-aws</artifactId>
<version>2.5.4</version>
<version>${spring.integration-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down Expand Up @@ -146,6 +149,11 @@
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-sqs</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package de.idealo.spring.stream.binder.sns;

import de.idealo.spring.stream.binder.sns.properties.SnsConsumerProperties;
import de.idealo.spring.stream.binder.sns.properties.SnsExtendedBindingProperties;
import de.idealo.spring.stream.binder.sns.properties.SnsProducerProperties;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
Expand All @@ -15,30 +12,33 @@
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.StringUtils;

import com.amazonaws.services.sns.AmazonSNSAsync;
import software.amazon.awssdk.services.sns.SnsAsyncClient;

import de.idealo.spring.stream.binder.sns.properties.SnsConsumerProperties;
import de.idealo.spring.stream.binder.sns.properties.SnsExtendedBindingProperties;
import de.idealo.spring.stream.binder.sns.properties.SnsProducerProperties;
import de.idealo.spring.stream.binder.sns.provisioning.SnsProducerDestination;
import de.idealo.spring.stream.binder.sns.provisioning.SnsStreamProvisioner;
import org.springframework.util.StringUtils;

public class SnsMessageHandlerBinder
extends AbstractMessageChannelBinder<ExtendedConsumerProperties<SnsConsumerProperties>, ExtendedProducerProperties<SnsProducerProperties>, SnsStreamProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, SnsConsumerProperties, SnsProducerProperties> {

private final AmazonSNSAsync amazonSNS;
private final SnsAsyncClient amazonSNS;

private final SnsExtendedBindingProperties extendedBindingProperties;

public SnsMessageHandlerBinder(AmazonSNSAsync amazonSNS,
SnsStreamProvisioner provisioningProvider,
SnsExtendedBindingProperties extendedBindingProperties) {
public SnsMessageHandlerBinder(SnsAsyncClient amazonSNS,
SnsStreamProvisioner provisioningProvider,
SnsExtendedBindingProperties extendedBindingProperties) {
super(new String[0], provisioningProvider);
this.amazonSNS = amazonSNS;
this.extendedBindingProperties = extendedBindingProperties;
}

public AmazonSNSAsync getAmazonSNS() {
public SnsAsyncClient getAmazonSNS() {
return amazonSNS;
}

Expand All @@ -47,7 +47,6 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin
SnsProducerDestination snsDestination = (SnsProducerDestination) destination;
SnsMessageHandler snsMessageHandler = new SnsMessageHandler(amazonSNS);
snsMessageHandler.setTopicArn(snsDestination.getArn());
snsMessageHandler.setFailureChannel(errorChannel);
snsMessageHandler.setBeanFactory(getBeanFactory());

if (StringUtils.hasText(producerProperties.getExtension().getConfirmAckChannel())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,28 @@
package de.idealo.spring.stream.binder.sns.config;

import static io.awspring.cloud.core.config.AmazonWebserviceClientConfigurationUtils.GLOBAL_CLIENT_CONFIGURATION_BEAN_NAME;

import java.util.Optional;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.sns.AmazonSNSAsync;
import com.amazonaws.services.sns.AmazonSNSAsyncClient;
import io.awspring.cloud.autoconfigure.messaging.SnsProperties;
import io.awspring.cloud.context.annotation.ConditionalOnMissingAmazonClient;
import io.awspring.cloud.core.config.AmazonWebserviceClientFactoryBean;
import io.awspring.cloud.core.region.RegionProvider;
import io.awspring.cloud.core.region.StaticRegionProvider;
import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer;
import io.awspring.cloud.autoconfigure.core.AwsClientCustomizer;
import io.awspring.cloud.autoconfigure.sns.SnsProperties;
import software.amazon.awssdk.services.sns.SnsAsyncClient;
import software.amazon.awssdk.services.sns.SnsAsyncClientBuilder;

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(AmazonSNSAsync.class)
@ConditionalOnClass(SnsAsyncClient.class)
public class SnsAsyncAutoConfiguration {

private final AWSCredentialsProvider awsCredentialsProvider;

private final RegionProvider regionProvider;

private final ClientConfiguration clientConfiguration;

SnsAsyncAutoConfiguration(ObjectProvider<AWSCredentialsProvider> awsCredentialsProvider,
ObjectProvider<RegionProvider> regionProvider, SnsProperties properties,
@Qualifier(GLOBAL_CLIENT_CONFIGURATION_BEAN_NAME) ObjectProvider<ClientConfiguration> globalClientConfiguration,
@Qualifier("snsClientConfiguration") ObjectProvider<ClientConfiguration> snsClientConfiguration) {
this.awsCredentialsProvider = awsCredentialsProvider.getIfAvailable();
this.regionProvider = properties.getRegion() == null ? regionProvider.getIfAvailable()
: new StaticRegionProvider(properties.getRegion());
this.clientConfiguration = snsClientConfiguration.getIfAvailable(globalClientConfiguration::getIfAvailable);
}

@ConditionalOnMissingAmazonClient(AmazonSNSAsync.class)
/**
* Based on {@link io.awspring.cloud.autoconfigure.sns.SnsAutoConfiguration#snsClient}
*/
@ConditionalOnMissingBean
@Bean
public AmazonWebserviceClientFactoryBean<AmazonSNSAsyncClient> amazonSNS(SnsProperties properties) {
AmazonWebserviceClientFactoryBean<AmazonSNSAsyncClient> clientFactoryBean = new AmazonWebserviceClientFactoryBean<>(
AmazonSNSAsyncClient.class, this.awsCredentialsProvider, this.regionProvider, this.clientConfiguration);
Optional.ofNullable(properties.getEndpoint()).ifPresent(clientFactoryBean::setCustomEndpoint);
return clientFactoryBean;
public SnsAsyncClient amazonSNS(SnsProperties properties, AwsClientBuilderConfigurer awsClientBuilderConfigurer, ObjectProvider<AwsClientCustomizer<SnsAsyncClientBuilder>> configurer) {
return awsClientBuilderConfigurer.configure(SnsAsyncClient.builder(), properties, configurer.getIfAvailable()).build();
}

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package de.idealo.spring.stream.binder.sns.config;

import de.idealo.spring.stream.binder.sns.properties.SnsExtendedBindingProperties;
import java.util.Optional;

import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
Expand All @@ -13,13 +10,11 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSAsync;

import io.awspring.cloud.core.env.ResourceIdResolver;
import software.amazon.awssdk.services.sns.SnsAsyncClient;

import de.idealo.spring.stream.binder.sns.SnsMessageHandlerBinder;
import de.idealo.spring.stream.binder.sns.health.SnsBinderHealthIndicator;
import de.idealo.spring.stream.binder.sns.properties.SnsExtendedBindingProperties;
import de.idealo.spring.stream.binder.sns.provisioning.SnsStreamProvisioner;

@Configuration
Expand All @@ -28,14 +23,14 @@
public class SnsBinderConfiguration {

@Bean
public SnsStreamProvisioner provisioningProvider(AmazonSNS amazonSNS, Optional<ResourceIdResolver> resourceIdResolver) {
return new SnsStreamProvisioner(amazonSNS, resourceIdResolver.orElse(null));
public SnsStreamProvisioner provisioningProvider(SnsAsyncClient amazonSNS) {
return new SnsStreamProvisioner(amazonSNS);
}

@Bean
public SnsMessageHandlerBinder snsMessageHandlerBinder(AmazonSNSAsync amazonSNS,
SnsStreamProvisioner snsStreamProvisioner,
SnsExtendedBindingProperties snsExtendedBindingProperties) {
public SnsMessageHandlerBinder snsMessageHandlerBinder(SnsAsyncClient amazonSNS,
SnsStreamProvisioner snsStreamProvisioner,
SnsExtendedBindingProperties snsExtendedBindingProperties) {
return new SnsMessageHandlerBinder(amazonSNS, snsStreamProvisioner, snsExtendedBindingProperties);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
package de.idealo.spring.stream.binder.sns.health;

import static java.util.stream.Collectors.toList;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import de.idealo.spring.stream.binder.sns.SnsMessageHandlerBinder;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.util.Assert;
import software.amazon.awssdk.services.sns.model.ListTopicsResponse;
import software.amazon.awssdk.services.sns.model.Topic;

import com.amazonaws.SdkClientException;
import com.amazonaws.services.sns.model.ListTopicsResult;

import de.idealo.spring.stream.binder.sns.SnsMessageHandlerBinder;
import java.util.List;
import java.util.stream.Collectors;

public class SnsBinderHealthIndicator extends AbstractHealthIndicator {

private static final Logger LOGGER = LoggerFactory.getLogger(SnsBinderHealthIndicator.class);

private final SnsMessageHandlerBinder snsMessageHandlerBinder;
private final BindingServiceProperties bindingServiceProperties;

Expand All @@ -32,32 +26,24 @@ public SnsBinderHealthIndicator(final SnsMessageHandlerBinder snsMessageHandlerB

@Override
protected void doHealthCheck(Health.Builder builder) {

final List<String> topicList = bindingServiceProperties.getBindings().values().stream()
var availableTopics = snsMessageHandlerBinder.getAmazonSNS().listTopics()
.thenApply(ListTopicsResponse::topics)
.thenApply(List::stream)
.join()
.map(Topic::topicArn)
.map(topicArn -> topicArn.substring(topicArn.lastIndexOf(':') + 1))
.collect(Collectors.toSet());

var availableDeclaredTopics = bindingServiceProperties.getBindings().values().stream()
.filter(bindingProperties -> "sns".equalsIgnoreCase(bindingProperties.getBinder()))
.map(bindingProperties -> bindingProperties.getDestination())
.collect(toList());
.map(BindingProperties::getDestination)
.allMatch(declaredTopic -> availableTopics.contains(declaredTopic));

if (!topicsAreReachable(topicList)) {
builder.down().withDetail("SNS", "topic is not reachable");
} else {
if (availableDeclaredTopics) {
builder.up();
} else {
builder.down().withDetail("SNS", "topic is not reachable");
}
}

private boolean topicsAreReachable(final List<String> expectedTopicList) {
try {
final ListTopicsResult listTopicsResult = this.snsMessageHandlerBinder.getAmazonSNS().listTopics();
final List<String> actualTopicList = listTopicsResult.getTopics().stream().map(topic -> extractTopicName(topic.getTopicArn())).collect(toList());

return actualTopicList.containsAll(expectedTopicList);
} catch (SdkClientException e) {
LOGGER.error("SNS is not reachable", e);
return false;
}
}

private String extractTopicName(final String topicArn) {
return topicArn.substring(topicArn.lastIndexOf(':') + 1);
}
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
package de.idealo.spring.stream.binder.sns.provisioning;

import de.idealo.spring.stream.binder.sns.properties.SnsConsumerProperties;
import de.idealo.spring.stream.binder.sns.properties.SnsProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.messaging.core.DestinationResolver;
import org.springframework.integration.aws.support.SnsAsyncTopicArnResolver;

import io.awspring.cloud.sns.core.TopicArnResolver;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.services.sns.SnsAsyncClient;

import com.amazonaws.services.sns.AmazonSNS;
import io.awspring.cloud.core.env.ResourceIdResolver;
import io.awspring.cloud.messaging.support.destination.DynamicTopicDestinationResolver;
import de.idealo.spring.stream.binder.sns.properties.SnsConsumerProperties;
import de.idealo.spring.stream.binder.sns.properties.SnsProducerProperties;

public class SnsStreamProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<SnsConsumerProperties>, ExtendedProducerProperties<SnsProducerProperties>> {

private final DestinationResolver<String> destinationResolver;
private final TopicArnResolver destinationResolver;

public SnsStreamProvisioner(AmazonSNS amazonSNS, ResourceIdResolver resourceIdResolver) {
this.destinationResolver = new DynamicTopicDestinationResolver(amazonSNS, resourceIdResolver);
public SnsStreamProvisioner(SnsAsyncClient amazonSNS) {
this.destinationResolver = new SnsAsyncTopicArnResolver(amazonSNS);
}

@Override
public ProducerDestination provisionProducerDestination(String name, ExtendedProducerProperties<SnsProducerProperties> properties) {
String arn = this.destinationResolver.resolveDestination(name);
return new SnsProducerDestination(name, arn);
Arn arn = this.destinationResolver.resolveTopicArn(name);
return new SnsProducerDestination(name, arn.resourceAsString());
}

@Override
Expand Down
Loading

0 comments on commit c00d64d

Please sign in to comment.