diff --git a/spring-modulith-events/spring-modulith-events-amqp/src/main/java/org/springframework/modulith/events/amqp/RabbitEventExternalizerConfiguration.java b/spring-modulith-events/spring-modulith-events-amqp/src/main/java/org/springframework/modulith/events/amqp/RabbitEventExternalizerConfiguration.java index 541b6dca4..ae8276aca 100644 --- a/spring-modulith-events/spring-modulith-events-amqp/src/main/java/org/springframework/modulith/events/amqp/RabbitEventExternalizerConfiguration.java +++ b/spring-modulith-events/spring-modulith-events-amqp/src/main/java/org/springframework/modulith/events/amqp/RabbitEventExternalizerConfiguration.java @@ -62,6 +62,8 @@ DelegatingEventExternalizer rabbitEventExternalizer(EventExternalizationConfigur var routing = BrokerRouting.of(target, context); operations.convertAndSend(routing.getTarget(), routing.getKey(payload), payload); + + return null; }); } } diff --git a/spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/EventExternalized.java b/spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/EventExternalized.java new file mode 100644 index 000000000..e4ada4ee7 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/EventExternalized.java @@ -0,0 +1,147 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.modulith.events; + +import java.util.Objects; + +import org.springframework.core.ResolvableType; +import org.springframework.core.ResolvableTypeProvider; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * An infrastructure event signaling that an application event has been externalized with a particular, broker-specific + * result. + * + * @author Oliver Drotbohm + * @since 1.1 + */ +public class EventExternalized implements ResolvableTypeProvider { + + private final S event; + private final Object mapped; + private final RoutingTarget target; + private final @Nullable T brokerResult; + private final ResolvableType type; + + /** + * Creates a new {@link EventExternalized} event for the given source event, its mapped derivative, + * {@link RoutingTarget} and broker result. + * + * @param event must not be {@literal null}. + * @param mapped must not be {@literal null}. + * @param target must not be {@literal null}. + * @param brokerResult can be {@literal null} + */ + public EventExternalized(S event, Object mapped, RoutingTarget target, @Nullable T brokerResult) { + + Assert.notNull(event, "Source event must not be null!"); + Assert.notNull(mapped, "Mapped event must not be null!"); + Assert.notNull(target, "Routing target must not be null!"); + + this.event = event; + this.mapped = mapped; + this.target = target; + this.brokerResult = brokerResult; + + this.type = ResolvableType.forClassWithGenerics(EventExternalized.class, ResolvableType.forInstance(event), + brokerResult == null ? ResolvableType.forClass(Object.class) : ResolvableType.forInstance(brokerResult)); + } + + /** + * Returns the source event. + * + * @return will never be {@literal null}. + */ + public S getEvent() { + return event; + } + + /** + * Returns the type of the source event. + * + * @return will never be {@literal null}. + */ + @SuppressWarnings("unchecked") + public Class getEventType() { + return (Class) type.getGeneric(0).resolve(Object.class); + } + + /** + * Returns the mapped event. + * + * @return will never be {@literal null}. + */ + public Object getMapped() { + return mapped; + } + + /** + * Returns the routing target. + * + * @return will never be {@literal null}. + */ + public RoutingTarget getTarget() { + return target; + } + + /** + * Returns the broker result. + * + * @return can be {@literal null}. + */ + public T getBrokerResult() { + return brokerResult; + } + + /* + * (non-Javadoc) + * @see org.springframework.core.ResolvableTypeProvider#getResolvableType() + */ + @Override + public ResolvableType getResolvableType() { + return type; + } + + /* + * (non-Javadoc) + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + + if (obj == this) { + return true; + } + + if (!(obj instanceof EventExternalized that)) { + return false; + } + + return Objects.equals(this.event, that.event) + && Objects.equals(this.mapped, that.mapped) + && Objects.equals(this.brokerResult, that.brokerResult); + } + + /* + * (non-Javadoc) + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + return Objects.hash(this.event, this.mapped, this.brokerResult); + } +} diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/DelegatingEventExternalizer.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/DelegatingEventExternalizer.java index cd17b2d91..2bd766fbf 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/DelegatingEventExternalizer.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/DelegatingEventExternalizer.java @@ -15,7 +15,8 @@ */ package org.springframework.modulith.events.support; -import java.util.function.BiConsumer; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; import org.springframework.modulith.events.ApplicationModuleListener; import org.springframework.modulith.events.EventExternalizationConfiguration; @@ -34,17 +35,17 @@ @Component public class DelegatingEventExternalizer extends EventExternalizationSupport { - private final BiConsumer delegate; + private final BiFunction> delegate; /** * Creates a new {@link DelegatingEventExternalizer} for the given {@link EventExternalizationConfiguration} and - * {@link BiConsumer} implementing the actual externalization. + * {@link BiFunction} implementing the actual externalization. * * @param configuration must not be {@literal null}. * @param delegate must not be {@literal null}. */ public DelegatingEventExternalizer(EventExternalizationConfiguration configuration, - BiConsumer delegate) { + BiFunction> delegate) { super(configuration); @@ -59,8 +60,8 @@ public DelegatingEventExternalizer(EventExternalizationConfiguration configurati */ @Override @ApplicationModuleListener - public void externalize(Object event) { - super.externalize(event); + public CompletableFuture externalize(Object event) { + return super.externalize(event); } /* @@ -68,7 +69,14 @@ public void externalize(Object event) { * @see org.springframework.modulith.events.support.EventExternalizationSupport#externalize(org.springframework.modulith.events.RoutingTarget, java.lang.Object) */ @Override - protected void externalize(Object payload, RoutingTarget target) { - delegate.accept(target, payload); + protected CompletableFuture externalize(Object payload, RoutingTarget target) { + + var result = delegate.apply(target, payload); + + if (result == null) { + throw new IllegalStateException("Delegate must not return null!"); + } + + return result; } } diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/EventExternalizationSupport.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/EventExternalizationSupport.java index 3f26d93b3..82f2b13ed 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/EventExternalizationSupport.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/EventExternalizationSupport.java @@ -15,10 +15,13 @@ */ package org.springframework.modulith.events.support; +import java.util.concurrent.CompletableFuture; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.modulith.events.ApplicationModuleListener; import org.springframework.modulith.events.EventExternalizationConfiguration; +import org.springframework.modulith.events.EventExternalized; import org.springframework.modulith.events.RoutingTarget; import org.springframework.modulith.events.core.ConditionalEventListener; import org.springframework.util.Assert; @@ -61,14 +64,15 @@ public boolean supports(Object event) { * Externalizes the given event. * * @param event must not be {@literal null}. + * @return the externalization result, will never be {@literal null}. */ @ApplicationModuleListener - public void externalize(Object event) { + public CompletableFuture externalize(Object event) { Assert.notNull(event, "Object must not be null!"); if (!configuration.supports(event)) { - return; + return CompletableFuture.completedFuture(null); } var target = configuration.determineTarget(event); @@ -80,7 +84,8 @@ public void externalize(Object event) { logger.debug("Externalizing event of type {} to {}.", event.getClass(), target); } - externalize(mapped, target); + return externalize(mapped, target) + .thenApply(it -> new EventExternalized<>(event, mapped, target, it)); } /** @@ -88,6 +93,7 @@ public void externalize(Object event) { * * @param payload must not be {@literal null}. * @param target must not be {@literal null}. + * @return the externalization result, will never be {@literal null}. */ - protected abstract void externalize(Object payload, RoutingTarget target); + protected abstract CompletableFuture externalize(Object payload, RoutingTarget target); } diff --git a/spring-modulith-events/spring-modulith-events-jms/src/main/java/org/springframework/modulith/events/jms/JmsEventExternalizerConfiguration.java b/spring-modulith-events/spring-modulith-events-jms/src/main/java/org/springframework/modulith/events/jms/JmsEventExternalizerConfiguration.java index 3af0631b7..8f6079521 100644 --- a/spring-modulith-events/spring-modulith-events-jms/src/main/java/org/springframework/modulith/events/jms/JmsEventExternalizerConfiguration.java +++ b/spring-modulith-events/spring-modulith-events-jms/src/main/java/org/springframework/modulith/events/jms/JmsEventExternalizerConfiguration.java @@ -15,6 +15,8 @@ */ package org.springframework.modulith.events.jms; +import java.util.concurrent.CompletableFuture; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.AutoConfiguration; @@ -55,6 +57,8 @@ DelegatingEventExternalizer jmsEventExternalizer(EventExternalizationConfigurati var serialized = serializer.serialize(payload); operations.send(target.getTarget(), session -> session.createTextMessage(serialized.toString())); + + return CompletableFuture.completedFuture(null); }); } } diff --git a/spring-modulith-events/spring-modulith-events-kafka/src/main/java/org/springframework/modulith/events/kafka/KafkaEventExternalizerConfiguration.java b/spring-modulith-events/spring-modulith-events-kafka/src/main/java/org/springframework/modulith/events/kafka/KafkaEventExternalizerConfiguration.java index e06fc7915..fa0e926c2 100644 --- a/spring-modulith-events/spring-modulith-events-kafka/src/main/java/org/springframework/modulith/events/kafka/KafkaEventExternalizerConfiguration.java +++ b/spring-modulith-events/spring-modulith-events-kafka/src/main/java/org/springframework/modulith/events/kafka/KafkaEventExternalizerConfiguration.java @@ -61,8 +61,7 @@ DelegatingEventExternalizer kafkaEventExternalizer(EventExternalizationConfigura return new DelegatingEventExternalizer(configuration, (target, payload) -> { var routing = BrokerRouting.of(target, context); - - operations.send(routing.getTarget(), routing.getKey(payload), payload); + return operations.send(routing.getTarget(), routing.getKey(payload), payload); }); } }