Skip to content

Commit

Permalink
Support to add headers in event externalization.
Browse files Browse the repository at this point in the history
EventExternalizationConfiguration now exposes a ….headers(Class<T>, Function<T, Map<String, Object>) to allow to define a function that extracts headers from the event that are supposed to added to the message to be sent out. The Kafka and AMQP implementations have been augmented to consider those configurations.

Furthermore, if the mapping step prior to the externalization creates a Spring Message<?>, we add routing information as fallback and send it out as is.
  • Loading branch information
odrotbohm committed Oct 11, 2024
1 parent 0467958 commit ad331d2
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ DelegatingEventExternalizer rabbitEventExternalizer(EventExternalizationConfigur
return new DelegatingEventExternalizer(configuration, (target, payload) -> {

var routing = BrokerRouting.of(target, context);
var headers = configuration.getHeadersFor(payload);

operations.convertAndSend(routing.getTarget(), routing.getKey(payload), payload);
operations.convertAndSend(routing.getTarget(), routing.getKey(payload), payload, headers);

return CompletableFuture.completedFuture(null);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.modulith.events;

import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;

Expand All @@ -31,16 +32,18 @@ class DefaultEventExternalizationConfiguration implements EventExternalizationCo
private final Predicate<Object> filter;
private final Function<Object, Object> mapper;
private final Function<Object, RoutingTarget> router;
private final Function<Object, Map<String, Object>> headers;

/**
* Creates a new {@link DefaultEventExternalizationConfiguration}
*
* @param filter must not be {@literal null}.
* @param mapper must not be {@literal null}.
* @param router must not be {@literal null}.
* @param headers must not be {@literal null}.
*/
DefaultEventExternalizationConfiguration(Predicate<Object> filter, Function<Object, Object> mapper,
Function<Object, RoutingTarget> router) {
Function<Object, RoutingTarget> router, Function<Object, Map<String, Object>> headers) {

Assert.notNull(filter, "Filter must not be null!");
Assert.notNull(mapper, "Mapper must not be null!");
Expand All @@ -49,6 +52,7 @@ class DefaultEventExternalizationConfiguration implements EventExternalizationCo
this.filter = filter;
this.mapper = mapper;
this.router = router;
this.headers = headers;
}

/**
Expand Down Expand Up @@ -95,4 +99,16 @@ public RoutingTarget determineTarget(Object event) {

return router.apply(event).verify();
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventExternalizationConfiguration#getHeadersFor(java.lang.Object)
*/
@Override
public Map<String, Object> getHeadersFor(Object event) {

Assert.notNull(event, "Event must not be null!");

return headers.apply(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.lang.annotation.Annotation;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
Expand Down Expand Up @@ -188,6 +190,15 @@ public static Function<Object, RoutingTarget> byFullyQualifiedTypeName() {
*/
RoutingTarget determineTarget(Object event);

/**
* Returns the headers to be attached to the message sent out for the given event.
*
* @param event must not be {@literal null}.
* @return will never be {@literal null}.
* @since 1.3
*/
Map<String, Object> getHeadersFor(Object event);

/**
* API to define which events are supposed to be selected for externalization.
*
Expand Down Expand Up @@ -367,23 +378,28 @@ public static class Router {
private final Predicate<Object> filter;
private final Function<Object, Object> mapper;
private final Function<Object, RoutingTarget> router;
private final Function<Object, Map<String, Object>> headers;

/**
* Creates a new {@link Router} for the given selector {@link Predicate} and mapper and router {@link Function}s.
*
* @param filter must not be {@literal null}.
* @param mapper must not be {@literal null}.
* @param router must not be {@literal null}.
* @param headers must not be {@literal null}.
*/
Router(Predicate<Object> filter, Function<Object, Object> mapper, Function<Object, RoutingTarget> router) {
Router(Predicate<Object> filter, Function<Object, Object> mapper, Function<Object, RoutingTarget> router,
Function<Object, Map<String, Object>> headers) {

Assert.notNull(filter, "Selector must not be null!");
Assert.notNull(mapper, "Mapper must not be null!");
Assert.notNull(router, "Router must not be null!");
Assert.notNull(headers, "Headers extractor must not be null!");

this.filter = filter;
this.mapper = mapper;
this.router = router;
this.headers = headers;
}

/**
Expand All @@ -392,7 +408,7 @@ public static class Router {
* @param filter must not be {@literal null}.
*/
Router(Predicate<Object> filter) {
this(filter, Function.identity(), DEFAULT_ROUTER);
this(filter, Function.identity(), DEFAULT_ROUTER, it -> Collections.emptyMap());
}

/**
Expand All @@ -406,7 +422,7 @@ public Router mapping(Function<Object, Object> mapper) {

Assert.notNull(mapper, "Mapper must not be null!");

return new Router(filter, mapper, router);
return new Router(filter, mapper, router, headers);
}

/**
Expand All @@ -428,7 +444,42 @@ public <T> Router mapping(Class<T> type, Function<T, Object> mapper) {
.map(mapper::apply)
.orElse(it);

return new Router(filter, this.mapper.compose(combined), router);
return new Router(filter, this.mapper.compose(combined), router, headers);
}

/**
* Registers the given function to extract headers from the events to be externalized. Will reset the entire header
* extractor arrangement. For type-specific extractions, see {@link #headers(Class, Function)}.
*
* @param extractor must not be {@literal null}.
* @return will never be {@literal null}.
* @see #headers(Class, Function)
* @since 1.3
*/
public Router headers(Function<Object, Map<String, Object>> extractor) {

Assert.notNull(extractor, "Headers extractor must not be null!");

return new Router(filter, mapper, router, extractor);
}

/**
* Registers the given type-specific function to extract headers from the events to be externalized.
*
* @param extractor must not be {@literal null}.
* @return will never be {@literal null}.
* @since 1.3
*/
public <T> Router headers(Class<T> type, Function<T, Map<String, Object>> extractor) {

Assert.notNull(type, "Type must not be null!");
Assert.notNull(extractor, "Headers extractor must not be null!");

Function<Object, Map<String, Object>> combined = it -> toOptional(type, it)
.map(extractor::apply)
.orElseGet(() -> this.headers.apply(it));

return new Router(filter, mapper, router, combined);
}

/**
Expand All @@ -437,7 +488,7 @@ public <T> Router mapping(Class<T> type, Function<T, Object> mapper) {
* @return will never be {@literal null}.
*/
public Router routeMapped() {
return new Router(filter, mapper, router.compose(mapper));
return new Router(filter, mapper, router.compose(mapper), headers);
}

/**
Expand All @@ -450,7 +501,7 @@ public Router routeAll(Function<Object, RoutingTarget> router) {

Assert.notNull(router, "Router must not be null!");

return new Router(filter, mapper, router);
return new Router(filter, mapper, router, headers);
}

/**
Expand All @@ -466,9 +517,11 @@ public <T> Router route(Class<T> type, Function<T, RoutingTarget> router) {
Assert.notNull(type, "Type must not be null!");
Assert.notNull(router, "Router must not be null!");

return new Router(filter, mapper, it -> toOptional(type, it)
Function<Object, RoutingTarget> adapted = it -> toOptional(type, it)
.map(router::apply)
.orElseGet(() -> this.router.apply(it)));
.orElseGet(() -> this.router.apply(it));

return new Router(filter, mapper, adapted, headers);
}

/**
Expand All @@ -487,9 +540,11 @@ public <T> Router routeKey(Class<T> type, Function<T, String> extractor) {
Assert.notNull(type, "Type must not be null!");
Assert.notNull(extractor, "Extractor must not be null!");

return new Router(filter, mapper, it -> toOptional(type, it)
Function<Object, RoutingTarget> adapted = it -> toOptional(type, it)
.map(t -> this.router.apply(t).withKey(extractor.apply(t)))
.orElseGet(() -> this.router.apply(it)));
.orElseGet(() -> this.router.apply(it));

return new Router(filter, mapper, adapted, headers);
}

/**
Expand All @@ -503,9 +558,9 @@ public EventExternalizationConfiguration routeOptional(Function<Object, Optional

Assert.notNull(router, "Router must not be null!");

return new Router(filter, mapper, it -> router.apply(it)
.orElseGet(() -> this.router.apply(it)))
.build();
Function<Object, RoutingTarget> adapted = it -> router.apply(it).orElseGet(() -> this.router.apply(it));

return new Router(filter, mapper, adapted, headers).build();
}

/**
Expand Down Expand Up @@ -533,16 +588,16 @@ public Router routeAllByType(Function<Class<?>, RoutingTarget> router) {

Assert.notNull(router, "Router must not be null!");

return new Router(filter, mapper, it -> router.apply(it.getClass()));
return new Router(filter, mapper, it -> router.apply(it.getClass()), headers);
}

/**
* Creates a new {@link EventExternalizationConfiguration} refelcting the current configuration.
* Creates a new {@link EventExternalizationConfiguration} reflecting the current configuration.
*
* @return will never be {@literal null}.
*/
public EventExternalizationConfiguration build() {
return new DefaultEventExternalizationConfiguration(filter, mapper, router);
return new DefaultEventExternalizationConfiguration(filter, mapper, router, headers);
}

private static <T> Optional<T> toOptional(Class<T> type, Object source) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.List;
import java.util.Map;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -130,6 +132,21 @@ void defaultSetup() {
assertThat(target.getKey()).isNull();
}

@Test // GH-855
void registersHeaderExtractor() {

var configuration = defaults("org.springframework.modulith")
.headers(AnotherSampleEvent.class, it -> Map.of("another", "anotherValue"))
.headers(SampleEvent.class, it -> Map.of("sample", "value"))
.build();

assertThat(configuration.getHeadersFor(new SampleEvent()))
.containsEntry("sample", "value");

assertThat(configuration.getHeadersFor(new AnotherSampleEvent()))
.containsEntry("another", "anotherValue");
}

@Retention(RetentionPolicy.RUNTIME)
@interface CustomExternalized {
String value() default "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.modulith.events.EventExternalizationConfiguration;
import org.springframework.modulith.events.config.EventExternalizationAutoConfiguration;
import org.springframework.modulith.events.support.BrokerRouting;
Expand Down Expand Up @@ -61,7 +64,17 @@ DelegatingEventExternalizer kafkaEventExternalizer(EventExternalizationConfigura
return new DelegatingEventExternalizer(configuration, (target, payload) -> {

var routing = BrokerRouting.of(target, context);
return operations.send(routing.getTarget(), routing.getKey(payload), payload);

var builder = payload instanceof Message<?> message
? MessageBuilder.fromMessage(message)
: MessageBuilder.withPayload(payload).copyHeaders(configuration.getHeadersFor(payload));

var message = builder
.setHeaderIfAbsent(KafkaHeaders.KEY, routing.getKey(payload))
.setHeaderIfAbsent(KafkaHeaders.TOPIC, routing.getTarget())
.build();

return operations.send(message);
});
}
}
Loading

0 comments on commit ad331d2

Please sign in to comment.