Skip to content

Commit

Permalink
GH-353 - Improvements in internal event externalization APIs.
Browse files Browse the repository at this point in the history
DelegatingEventExternalizer.externalize(…) flavors now return a CompletableFuture to allow the target APIs to use asynchronous message sending and to transparently return a result from those invocations.
  • Loading branch information
odrotbohm committed Nov 1, 2023
1 parent e5ca4f7 commit ca32c29
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ DelegatingEventExternalizer rabbitEventExternalizer(EventExternalizationConfigur
var routing = BrokerRouting.of(target, context);

operations.convertAndSend(routing.getTarget(), routing.getKey(payload), payload);

return null;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events;

import java.util.Objects;

import org.springframework.core.ResolvableType;
import org.springframework.core.ResolvableTypeProvider;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
* An infrastructure event signaling that an application event has been externalized with a particular, broker-specific
* result.
*
* @author Oliver Drotbohm
* @since 1.1
*/
public class EventExternalized<S, T> implements ResolvableTypeProvider {

private final S event;
private final Object mapped;
private final RoutingTarget target;
private final @Nullable T brokerResult;
private final ResolvableType type;

/**
* Creates a new {@link EventExternalized} event for the given source event, its mapped derivative,
* {@link RoutingTarget} and broker result.
*
* @param event must not be {@literal null}.
* @param mapped must not be {@literal null}.
* @param target must not be {@literal null}.
* @param brokerResult can be {@literal null}
*/
public EventExternalized(S event, Object mapped, RoutingTarget target, @Nullable T brokerResult) {

Assert.notNull(event, "Source event must not be null!");
Assert.notNull(mapped, "Mapped event must not be null!");
Assert.notNull(target, "Routing target must not be null!");

this.event = event;
this.mapped = mapped;
this.target = target;
this.brokerResult = brokerResult;

this.type = ResolvableType.forClassWithGenerics(EventExternalized.class, ResolvableType.forInstance(event),
brokerResult == null ? ResolvableType.forClass(Object.class) : ResolvableType.forInstance(brokerResult));
}

/**
* Returns the source event.
*
* @return will never be {@literal null}.
*/
public S getEvent() {
return event;
}

/**
* Returns the type of the source event.
*
* @return will never be {@literal null}.
*/
@SuppressWarnings("unchecked")
public Class<S> getEventType() {
return (Class<S>) type.getGeneric(0).resolve(Object.class);
}

/**
* Returns the mapped event.
*
* @return will never be {@literal null}.
*/
public Object getMapped() {
return mapped;
}

/**
* Returns the routing target.
*
* @return will never be {@literal null}.
*/
public RoutingTarget getTarget() {
return target;
}

/**
* Returns the broker result.
*
* @return can be {@literal null}.
*/
public T getBrokerResult() {
return brokerResult;
}

/*
* (non-Javadoc)
* @see org.springframework.core.ResolvableTypeProvider#getResolvableType()
*/
@Override
public ResolvableType getResolvableType() {
return type;
}

/*
* (non-Javadoc)
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj) {

if (obj == this) {
return true;
}

if (!(obj instanceof EventExternalized that)) {
return false;
}

return Objects.equals(this.event, that.event)
&& Objects.equals(this.mapped, that.mapped)
&& Objects.equals(this.brokerResult, that.brokerResult);
}

/*
* (non-Javadoc)
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
return Objects.hash(this.event, this.mapped, this.brokerResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
*/
package org.springframework.modulith.events.support;

import java.util.function.BiConsumer;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;

import org.springframework.modulith.events.ApplicationModuleListener;
import org.springframework.modulith.events.EventExternalizationConfiguration;
Expand All @@ -34,7 +35,7 @@
@Component
public class DelegatingEventExternalizer extends EventExternalizationSupport {

private final BiConsumer<RoutingTarget, Object> delegate;
private final BiFunction<RoutingTarget, Object, CompletableFuture<?>> delegate;

/**
* Creates a new {@link DelegatingEventExternalizer} for the given {@link EventExternalizationConfiguration} and
Expand All @@ -44,7 +45,7 @@ public class DelegatingEventExternalizer extends EventExternalizationSupport {
* @param delegate must not be {@literal null}.
*/
public DelegatingEventExternalizer(EventExternalizationConfiguration configuration,
BiConsumer<RoutingTarget, Object> delegate) {
BiFunction<RoutingTarget, Object, CompletableFuture<?>> delegate) {

super(configuration);

Expand All @@ -59,16 +60,23 @@ public DelegatingEventExternalizer(EventExternalizationConfiguration configurati
*/
@Override
@ApplicationModuleListener
public void externalize(Object event) {
super.externalize(event);
public CompletableFuture<?> externalize(Object event) {
return super.externalize(event);
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.support.EventExternalizationSupport#externalize(org.springframework.modulith.events.RoutingTarget, java.lang.Object)
*/
@Override
protected void externalize(Object payload, RoutingTarget target) {
delegate.accept(target, payload);
protected CompletableFuture<?> externalize(Object payload, RoutingTarget target) {

var result = delegate.apply(target, payload);

if (result == null) {
throw new IllegalStateException("Delegate must not return null!");
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
*/
package org.springframework.modulith.events.support;

import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.modulith.events.ApplicationModuleListener;
import org.springframework.modulith.events.EventExternalizationConfiguration;
import org.springframework.modulith.events.EventExternalized;
import org.springframework.modulith.events.RoutingTarget;
import org.springframework.modulith.events.core.ConditionalEventListener;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -61,14 +64,15 @@ public boolean supports(Object event) {
* Externalizes the given event.
*
* @param event must not be {@literal null}.
* @return the externalization result, will never be {@literal null}.
*/
@ApplicationModuleListener
public void externalize(Object event) {
public CompletableFuture<?> externalize(Object event) {

Assert.notNull(event, "Object must not be null!");

if (!configuration.supports(event)) {
return;
return CompletableFuture.completedFuture(null);
}

var target = configuration.determineTarget(event);
Expand All @@ -80,14 +84,16 @@ public void externalize(Object event) {
logger.debug("Externalizing event of type {} to {}.", event.getClass(), target);
}

externalize(mapped, target);
return externalize(mapped, target)
.thenApply(it -> new EventExternalized<>(event, mapped, target, it));
}

/**
* Publish the given payload to the given {@link RoutingTarget}.
*
* @param payload must not be {@literal null}.
* @param target must not be {@literal null}.
* @return the externalization result, will never be {@literal null}.
*/
protected abstract void externalize(Object payload, RoutingTarget target);
protected abstract CompletableFuture<?> externalize(Object payload, RoutingTarget target);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.springframework.modulith.events.jms;

import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.AutoConfiguration;
Expand Down Expand Up @@ -55,6 +57,8 @@ DelegatingEventExternalizer jmsEventExternalizer(EventExternalizationConfigurati
var serialized = serializer.serialize(payload);

operations.send(target.getTarget(), session -> session.createTextMessage(serialized.toString()));

return CompletableFuture.completedFuture(null);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ DelegatingEventExternalizer kafkaEventExternalizer(EventExternalizationConfigura
return new DelegatingEventExternalizer(configuration, (target, payload) -> {

var routing = BrokerRouting.of(target, context);

operations.send(routing.getTarget(), routing.getKey(payload), payload);
return operations.send(routing.getTarget(), routing.getKey(payload), payload);
});
}
}

0 comments on commit ca32c29

Please sign in to comment.