From bb200ec06ab0186d8c55e1b9280dd470a7dedfcb Mon Sep 17 00:00:00 2001 From: Oliver Drotbohm Date: Mon, 4 Sep 2023 09:42:23 +0200 Subject: [PATCH] Hacking. --- .../spring-modulith-events-api/pom.xml | 5 + .../events/CompletedEventPublications.java | 51 ++++++++++ .../modulith/events/EventPublication.java | 92 +++++++++++++++++++ .../events/UncompletedEventPublications.java | 27 ++++++ .../modulith/events/core/Completable.java | 2 +- .../events/core/DefaultEventPublication.java | 2 +- .../core/DefaultEventPublicationRegistry.java | 53 +++++++++-- .../events/core/EventPublicationRegistry.java | 10 +- .../core/EventPublicationRepository.java | 21 +++-- ...ation.java => TargetEventPublication.java} | 75 +-------------- ...PersistentApplicationEventMulticaster.java | 34 +++++-- ...a => TargetEventPublicationUnitTests.java} | 8 +- ...ationEventMulticasterIntegrationTests.java | 6 +- .../jdbc/JdbcEventPublicationRepository.java | 66 +++++++++---- ...PublicationRepositoryIntegrationTests.java | 56 ++++++++--- .../jpa/JpaEventPublicationRepository.java | 58 +++++++++--- ...PublicationRepositoryIntegrationTests.java | 42 +++++++-- .../MongoDbEventPublicationRepository.java | 37 ++++++-- ...MongoDbEventPublicationRepositoryTest.java | 53 +++++++---- .../PersistentDomainEventIntegrationTest.java | 4 +- 20 files changed, 511 insertions(+), 191 deletions(-) create mode 100644 spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/CompletedEventPublications.java create mode 100644 spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/EventPublication.java create mode 100644 spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/UncompletedEventPublications.java rename spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/{EventPublication.java => TargetEventPublication.java} (51%) rename spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/core/{EventPublicationUnitTests.java => TargetEventPublicationUnitTests.java} (83%) diff --git a/spring-modulith-events/spring-modulith-events-api/pom.xml b/spring-modulith-events/spring-modulith-events-api/pom.xml index a663cc188..6a147d712 100644 --- a/spring-modulith-events/spring-modulith-events-api/pom.xml +++ b/spring-modulith-events/spring-modulith-events-api/pom.xml @@ -30,6 +30,11 @@ spring-core + + org.springframework + spring-context + + org.jmolecules jmolecules-events diff --git a/spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/CompletedEventPublications.java b/spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/CompletedEventPublications.java new file mode 100644 index 000000000..fb4bd82b3 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/CompletedEventPublications.java @@ -0,0 +1,51 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.modulith.events; + +import java.time.Duration; +import java.util.Collection; +import java.util.function.Predicate; + +/** + * All {@link EventPublication}s that have already been completed. + * + * @author Oliver Drotbohm + * @since 1.1 + */ +public interface CompletedEventPublications { + + /** + * Returns all {@link EventPublication}s that have already been completed. + * + * @return will never be {@literal null}. + */ + Collection findAll(); + + /** + * Deletes all {@link EventPublication}s matching the given {@link Predicate}. Note that implementations will iterate + * all completed {@link EventPublication}s and apply the predicate in memory. + * + * @param filter must not be {@literal null}. + */ + void deletePublications(Predicate filter); + + /** + * Deletes all {@link EventPublication}s whose completion date is older than the given {@link Duration}. + * + * @param duration must not be {@literal null}. + */ + void deletePublicationsOlderThan(Duration duration); +} diff --git a/spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/EventPublication.java b/spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/EventPublication.java new file mode 100644 index 000000000..dda8118bc --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/EventPublication.java @@ -0,0 +1,92 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.modulith.events; + +import java.time.Instant; +import java.util.Optional; +import java.util.UUID; + +import org.springframework.context.ApplicationEvent; +import org.springframework.context.PayloadApplicationEvent; + +/** + * An event publication. + * + * @author Oliver Drotbohm + * @since 1.1 + */ +public interface EventPublication { + + /** + * Returns a unique identifier for this publication. + * + * @return will never be {@literal null}. + */ + UUID getIdentifier(); + + /** + * Returns the event that is published. + * + * @return + */ + Object getEvent(); + + /** + * Returns the event as Spring {@link ApplicationEvent}, effectively wrapping it into a + * {@link PayloadApplicationEvent} in case it's not one already. + * + * @return + */ + default ApplicationEvent getApplicationEvent() { + + Object event = getEvent(); + + return PayloadApplicationEvent.class.isInstance(event) // + ? PayloadApplicationEvent.class.cast(event) + : new PayloadApplicationEvent<>(this, event); + } + + /** + * Returns the time the event is published at. + * + * @return + */ + Instant getPublicationDate(); + + /** + * Returns the completion date of the publication. + * + * @return will never be {@literal null}. + */ + Optional getCompletionDate(); + + /** + * Returns whether the publication of the event has completed. + * + * @return will never be {@literal null}. + */ + default boolean isPublicationCompleted() { + return getCompletionDate().isPresent(); + } + + /* + * (non-Javadoc) + * @see java.lang.Comparable#compareTo(java.lang.Object) + */ + default int compareTo(EventPublication that) { + return this.getPublicationDate().compareTo(that.getPublicationDate()); + } +} diff --git a/spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/UncompletedEventPublications.java b/spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/UncompletedEventPublications.java new file mode 100644 index 000000000..5e4183e50 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/UncompletedEventPublications.java @@ -0,0 +1,27 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.modulith.events; + +import java.util.function.Predicate; + +/** + * @author Oliver Drotbohm + * @since 1.1 + */ +public interface UncompletedEventPublications { + + void resubmitCompletedPublications(Predicate filter); +} diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/Completable.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/Completable.java index a8abe778d..0a00da3a8 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/Completable.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/Completable.java @@ -18,7 +18,7 @@ import java.time.Instant; /** - * Internal interface to be able to mark {@link EventPublication} instances as completed. + * Internal interface to be able to mark {@link TargetEventPublication} instances as completed. * * @author Oliver Drotbohm */ diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/DefaultEventPublication.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/DefaultEventPublication.java index 668f4c185..49207ae9e 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/DefaultEventPublication.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/DefaultEventPublication.java @@ -27,7 +27,7 @@ * * @author Oliver Drotbohm */ -class DefaultEventPublication implements EventPublication { +class DefaultEventPublication implements TargetEventPublication { private final UUID identifier; private final Object event; diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/DefaultEventPublicationRegistry.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/DefaultEventPublicationRegistry.java index 12b9c65be..e0a26eefb 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/DefaultEventPublicationRegistry.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/DefaultEventPublicationRegistry.java @@ -19,12 +19,14 @@ import java.time.Duration; import java.util.Collection; import java.util.List; +import java.util.function.Predicate; import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.context.ApplicationListener; +import org.springframework.modulith.events.CompletedEventPublications; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; @@ -37,7 +39,8 @@ * @author Björn Kieling * @author Dmitry Belyaev */ -public class DefaultEventPublicationRegistry implements DisposableBean, EventPublicationRegistry { +public class DefaultEventPublicationRegistry + implements DisposableBean, EventPublicationRegistry, CompletedEventPublications { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventPublicationRegistry.class); private static final String REGISTER = "Registering publication of {} for {}."; @@ -65,9 +68,9 @@ public DefaultEventPublicationRegistry(EventPublicationRepository events, Clock * @see org.springframework.modulith.events.EventPublicationRegistry#store(java.lang.Object, java.util.stream.Stream) */ @Override - public Collection store(Object event, Stream listeners) { + public Collection store(Object event, Stream listeners) { - return listeners.map(it -> EventPublication.of(event, it, clock.instant())) + return listeners.map(it -> TargetEventPublication.of(event, it, clock.instant())) .peek(it -> LOGGER.debug(REGISTER, it.getEvent().getClass().getName(), it.getTargetIdentifier().getValue())) .map(events::create) .toList(); @@ -78,7 +81,7 @@ public Collection store(Object event, Stream findIncompletePublications() { + public Collection findIncompletePublications() { return events.findIncompletePublications(); } @@ -111,6 +114,44 @@ public void deleteCompletedPublicationsOlderThan(Duration duration) { events.deleteCompletedPublicationsBefore(clock.instant().minus(duration)); }; + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.CompletedEventPublications#findAll() + */ + @Override + public Collection findAll() { + return findIncompletePublications(); + } + + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.CompletedEventPublications#deletePublications(java.util.function.Predicate) + */ + @Override + public void deletePublications(Predicate filter) { + + var identifiers = findIncompletePublications().stream() + .filter(filter) + .map(TargetEventPublication::getIdentifier) + .toList(); + + events.deletePublications(identifiers); + } + + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.CompletedEventPublications#deletePublicationsOlderThan(java.time.Duration) + */ + @Override + public void deletePublicationsOlderThan(Duration duration) { + + var now = clock.instant(); + + deletePublications(event -> event.getCompletionDate() + .filter(date -> date.isBefore(now.minus(duration))) + .isPresent()); + } + /* * (non-Javadoc) * @see org.springframework.beans.factory.DisposableBean#destroy() @@ -118,7 +159,7 @@ public void deleteCompletedPublicationsOlderThan(Duration duration) { @Override public void destroy() { - List publications = events.findIncompletePublications(); + List publications = events.findIncompletePublications(); if (publications.isEmpty()) { @@ -131,7 +172,7 @@ public void destroy() { for (int i = 0; i < publications.size(); i++) { String prefix = i + 1 == publications.size() ? "└─" : "├─"; - EventPublication it = publications.get(i); + TargetEventPublication it = publications.get(i); LOGGER.info("{} {} - {}", prefix, it.getEvent().getClass().getName(), it.getTargetIdentifier().getValue()); } diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/EventPublicationRegistry.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/EventPublicationRegistry.java index cd48b5f71..d75f9a70d 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/EventPublicationRegistry.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/EventPublicationRegistry.java @@ -32,19 +32,19 @@ public interface EventPublicationRegistry { /** - * Stores {@link EventPublication}s for the given event and {@link ApplicationListener}s. + * Stores {@link TargetEventPublication}s for the given event and {@link ApplicationListener}s. * * @param event must not be {@literal null}. * @param listeners must not be {@literal null}. */ - Collection store(Object event, Stream listeners); + Collection store(Object event, Stream listeners); /** - * Returns all {@link EventPublication}s that have not been completed yet. + * Returns all {@link TargetEventPublication}s that have not been completed yet. * * @return will never be {@literal null}. */ - Collection findIncompletePublications(); + Collection findIncompletePublications(); /** * Marks the publication for the given event and {@link PublicationTargetIdentifier} as completed. @@ -55,7 +55,7 @@ public interface EventPublicationRegistry { void markCompleted(Object event, PublicationTargetIdentifier targetIdentifier); /** - * Deletes all completed {@link EventPublication}s that have been completed before the given {@link Duration}. + * Deletes all completed {@link TargetEventPublication}s that have been completed before the given {@link Duration}. * * @param duration must not be {@literal null}. */ diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/EventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/EventPublicationRepository.java index 6abda3e2e..85b5c7876 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/EventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/EventPublicationRepository.java @@ -18,11 +18,12 @@ import java.time.Instant; import java.util.List; import java.util.Optional; +import java.util.UUID; import org.springframework.util.Assert; /** - * Repository to store {@link EventPublication}s. + * Repository to store {@link TargetEventPublication}s. * * @author Björn Kieling * @author Dmitry Belyaev @@ -31,20 +32,20 @@ public interface EventPublicationRepository { /** - * Persists the given {@link EventPublication}. + * Persists the given {@link TargetEventPublication}. * * @param publication must not be {@literal null}. * @return will never be {@literal null}. */ - EventPublication create(EventPublication publication); + TargetEventPublication create(TargetEventPublication publication); /** - * Marks the given {@link EventPublication} as completed. + * Marks the given {@link TargetEventPublication} as completed. * * @param publication must not be {@literal null}. * @param completionDate must not be {@literal null}. */ - default void markCompleted(EventPublication publication, Instant completionDate) { + default void markCompleted(TargetEventPublication publication, Instant completionDate) { Assert.notNull(publication, "EventPublication must not be null!"); Assert.notNull(completionDate, "Instant must not be null!"); @@ -65,22 +66,24 @@ default void markCompleted(EventPublication publication, Instant completionDate) void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate); /** - * Returns all {@link EventPublication} that have not been completed yet. + * Returns all {@link TargetEventPublication} that have not been completed yet. * * @return will never be {@literal null}. */ - List findIncompletePublications(); + List findIncompletePublications(); /** - * Return the incomplete {@link EventPublication} for the given serialized event and listener identifier. + * Return the incomplete {@link TargetEventPublication} for the given serialized event and listener identifier. * * @param event must not be {@literal null}. * @param targetIdentifier must not be {@literal null}. * @return will never be {@literal null}. */ - Optional findIncompletePublicationsByEventAndTargetIdentifier( // + Optional findIncompletePublicationsByEventAndTargetIdentifier( // Object event, PublicationTargetIdentifier targetIdentifier); + void deletePublications(List identifiers); + /** * Deletes all publications that were already marked as completed. */ diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/EventPublication.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/TargetEventPublication.java similarity index 51% rename from spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/EventPublication.java rename to spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/TargetEventPublication.java index 1db13ed9f..f8f2bf322 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/EventPublication.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/TargetEventPublication.java @@ -17,11 +17,7 @@ import java.time.Clock; import java.time.Instant; -import java.util.Optional; -import java.util.UUID; -import org.springframework.context.ApplicationEvent; -import org.springframework.context.PayloadApplicationEvent; import org.springframework.util.Assert; /** @@ -31,10 +27,10 @@ * @author Björn Kieling * @author Dmitry Belyaev */ -public interface EventPublication extends Comparable, Completable { +public interface TargetEventPublication extends Completable, org.springframework.modulith.events.EventPublication { /** - * Creates a {@link EventPublication} for the given event an listener identifier using a default {@link Instant}. + * Creates a {@link TargetEventPublication} for the given event an listener identifier using a default {@link Instant}. * Prefer using {@link #of(Object, PublicationTargetIdentifier, Instant)} with a dedicated {@link Instant} obtained * from a {@link Clock}. * @@ -43,58 +39,22 @@ public interface EventPublication extends Comparable, Completa * @return will never be {@literal null}. * @see #of(Object, PublicationTargetIdentifier, Instant) */ - static EventPublication of(Object event, PublicationTargetIdentifier id) { + static TargetEventPublication of(Object event, PublicationTargetIdentifier id) { return new DefaultEventPublication(event, id, Instant.now()); } /** - * Creates a {@link EventPublication} for the given event an listener identifier and publication date. + * Creates a {@link TargetEventPublication} for the given event an listener identifier and publication date. * * @param event must not be {@literal null}. * @param id must not be {@literal null}. * @param publicationDate must not be {@literal null}. * @return will never be {@literal null}. */ - static EventPublication of(Object event, PublicationTargetIdentifier id, Instant publicationDate) { + static TargetEventPublication of(Object event, PublicationTargetIdentifier id, Instant publicationDate) { return new DefaultEventPublication(event, id, publicationDate); } - /** - * Returns a unique identifier for this publication. - * - * @return will never be {@literal null}. - */ - UUID getIdentifier(); - - /** - * Returns the event that is published. - * - * @return - */ - Object getEvent(); - - /** - * Returns the event as Spring {@link ApplicationEvent}, effectively wrapping it into a - * {@link PayloadApplicationEvent} in case it's not one already. - * - * @return - */ - default ApplicationEvent getApplicationEvent() { - - Object event = getEvent(); - - return PayloadApplicationEvent.class.isInstance(event) // - ? PayloadApplicationEvent.class.cast(event) - : new PayloadApplicationEvent<>(this, event); - } - - /** - * Returns the time the event is published at. - * - * @return - */ - Instant getPublicationDate(); - /** * Returns the identifier of the target that the event is supposed to be published to. * @@ -114,29 +74,4 @@ default boolean isIdentifiedBy(PublicationTargetIdentifier identifier) { return this.getTargetIdentifier().equals(identifier); } - - /** - * Returns the completion date of the publication. - * - * @return will never be {@literal null}. - */ - Optional getCompletionDate(); - - /** - * Returns whether the publication of the event has completed. - * - * @return will never be {@literal null}. - */ - default boolean isPublicationCompleted() { - return getCompletionDate().isPresent(); - } - - /* - * (non-Javadoc) - * @see java.lang.Comparable#compareTo(java.lang.Object) - */ - @Override - public default int compareTo(EventPublication that) { - return this.getPublicationDate().compareTo(that.getPublicationDate()); - } } diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticaster.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticaster.java index 390d68d8d..5bce33d7c 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticaster.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticaster.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.List; import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Stream; @@ -33,8 +34,9 @@ import org.springframework.core.annotation.AnnotationAwareOrderComparator; import org.springframework.core.env.Environment; import org.springframework.lang.NonNull; +import org.springframework.modulith.events.EventPublication; +import org.springframework.modulith.events.UncompletedEventPublications; import org.springframework.modulith.events.core.ConditionalEventListener; -import org.springframework.modulith.events.core.EventPublication; import org.springframework.modulith.events.core.EventPublicationRegistry; import org.springframework.modulith.events.core.PublicationTargetIdentifier; import org.springframework.transaction.event.TransactionPhase; @@ -54,7 +56,7 @@ * @see CompletionRegisteringAdvisor */ public class PersistentApplicationEventMulticaster extends AbstractApplicationEventMulticaster - implements SmartInitializingSingleton { + implements UncompletedEventPublications, SmartInitializingSingleton { private static final Logger LOGGER = LoggerFactory.getLogger(PersistentApplicationEventMulticaster.class); static final String REPUBLISH_ON_RESTART = "spring.modulith.republish-outstanding-events-on-restart"; @@ -128,14 +130,10 @@ protected Collection> getApplicationListeners(Application /* * (non-Javadoc) - * @see org.springframework.beans.factory.SmartInitializingSingleton#afterSingletonsInstantiated() + * @see org.springframework.modulith.events.OutstandingEventPublications#resubmitCompletedEventPublications(java.util.function.Predicate) */ @Override - public void afterSingletonsInstantiated() { - - if (!Boolean.TRUE.equals(environment.get().getProperty(REPUBLISH_ON_RESTART, Boolean.class))) { - return; - } + public void resubmitCompletedPublications(Predicate filter) { LOGGER.debug("Looking up previously pending event publications…"); @@ -143,10 +141,26 @@ public void afterSingletonsInstantiated() { LOGGER.debug("{} found.", publications.isEmpty() ? "None" : publications.size()); - publications.forEach(this::invokeTargetListener); + publications.stream() + .filter(filter) + .forEach(this::invokeTargetListener); + } + + /* + * (non-Javadoc) + * @see org.springframework.beans.factory.SmartInitializingSingleton#afterSingletonsInstantiated() + */ + @Override + public void afterSingletonsInstantiated() { + + if (!Boolean.TRUE.equals(environment.get().getProperty(REPUBLISH_ON_RESTART, Boolean.class))) { + return; + } + + resubmitCompletedPublications(__ -> true); } - private void invokeTargetListener(EventPublication publication) { + private void invokeTargetListener(org.springframework.modulith.events.core.TargetEventPublication publication) { var listeners = new TransactionalEventListeners( getApplicationListeners()); diff --git a/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/core/EventPublicationUnitTests.java b/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/core/TargetEventPublicationUnitTests.java similarity index 83% rename from spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/core/EventPublicationUnitTests.java rename to spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/core/TargetEventPublicationUnitTests.java index 7c54aa7e1..b7133952c 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/core/EventPublicationUnitTests.java +++ b/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/core/TargetEventPublicationUnitTests.java @@ -24,13 +24,13 @@ * @author Björn Kieling * @author Dmitry Belyaev */ -class EventPublicationUnitTests { +class TargetEventPublicationUnitTests { @Test void rejectsNullEvent() { assertThatExceptionOfType(IllegalArgumentException.class)// - .isThrownBy(() -> EventPublication.of(null, PublicationTargetIdentifier.of("foo")))// + .isThrownBy(() -> TargetEventPublication.of(null, PublicationTargetIdentifier.of("foo")))// .withMessageContaining("Event"); } @@ -38,14 +38,14 @@ void rejectsNullEvent() { void rejectsNullTargetIdentifier() { assertThatExceptionOfType(IllegalArgumentException.class)// - .isThrownBy(() -> EventPublication.of(new Object(), null))// + .isThrownBy(() -> TargetEventPublication.of(new Object(), null))// .withMessageContaining("TargetIdentifier"); } @Test void publicationIsIncompleteByDefault() { - EventPublication publication = EventPublication.of(new Object(), + var publication = TargetEventPublication.of(new Object(), PublicationTargetIdentifier.of("foo")); assertThat(publication.isPublicationCompleted()).isFalse(); diff --git a/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticasterIntegrationTests.java b/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticasterIntegrationTests.java index fb6fb72f0..a4fce8160 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticasterIntegrationTests.java +++ b/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/support/PersistentApplicationEventMulticasterIntegrationTests.java @@ -26,7 +26,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.modulith.events.config.EnablePersistentDomainEvents; -import org.springframework.modulith.events.core.EventPublication; +import org.springframework.modulith.events.core.TargetEventPublication; import org.springframework.modulith.events.core.EventPublicationRepository; import org.springframework.stereotype.Component; import org.springframework.test.context.junit.jupiter.SpringExtension; @@ -64,10 +64,10 @@ SampleEventListener listener() { void doesNotPublishGenericEventsToListeners() throws Exception { publisher.publishEvent(new SomeGenericEvent<>()); - verify(repository, never()).create(any(EventPublication.class)); + verify(repository, never()).create(any(TargetEventPublication.class)); publisher.publishEvent(new SomeOtherEvent()); - verify(repository).create(any(EventPublication.class)); + verify(repository).create(any(TargetEventPublication.class)); } @Component diff --git a/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java index 82eea3036..874e92de5 100644 --- a/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java @@ -24,6 +24,7 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; +import java.util.stream.IntStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,15 +32,15 @@ import org.springframework.jdbc.core.ResultSetExtractor; import org.springframework.jdbc.core.RowMapper; import org.springframework.lang.Nullable; -import org.springframework.modulith.events.core.EventPublication; import org.springframework.modulith.events.core.EventPublicationRepository; import org.springframework.modulith.events.core.EventSerializer; import org.springframework.modulith.events.core.PublicationTargetIdentifier; +import org.springframework.modulith.events.core.TargetEventPublication; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; /** - * JDBC-based repository to store {@link EventPublication}s. + * JDBC-based repository to store {@link TargetEventPublication}s. * * @author Dmitry Belyaev * @author Björn Kieling @@ -61,12 +62,6 @@ INSERT INTO EVENT_PUBLICATION (ID, EVENT_TYPE, LISTENER_ID, PUBLICATION_DATE, SE ORDER BY PUBLICATION_DATE ASC """; - private static final String SQL_STATEMENT_UPDATE = """ - UPDATE EVENT_PUBLICATION - SET COMPLETION_DATE = ? - WHERE ID = ? - """; - private static final String SQL_STATEMENT_UPDATE_BY_EVENT_AND_LISTENER_ID = """ UPDATE EVENT_PUBLICATION SET COMPLETION_DATE = ? @@ -85,6 +80,13 @@ INSERT INTO EVENT_PUBLICATION (ID, EVENT_TYPE, LISTENER_ID, PUBLICATION_DATE, SE ORDER BY PUBLICATION_DATE """; + private static final String SQL_STATEMENT_DELETE = """ + DELETE + FROM EVENT_PUBLICATION + WHERE + ID IN (?) + """; + private static final String SQL_STATEMENT_DELETE_UNCOMPLETED = """ DELETE FROM EVENT_PUBLICATION @@ -99,6 +101,8 @@ INSERT INTO EVENT_PUBLICATION (ID, EVENT_TYPE, LISTENER_ID, PUBLICATION_DATE, SE COMPLETION_DATE < ? """; + private static final int DELETE_BATCH_SIZE = 100; + private final JdbcOperations operations; private final EventSerializer serializer; private final DatabaseType databaseType; @@ -129,7 +133,7 @@ public JdbcEventPublicationRepository(JdbcOperations operations, EventSerializer */ @Override @Transactional - public EventPublication create(EventPublication publication) { + public TargetEventPublication create(TargetEventPublication publication) { var serializedEvent = serializeEvent(publication.getEvent()); @@ -160,7 +164,7 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier, @Override @Transactional(readOnly = true) - public Optional findIncompletePublicationsByEventAndTargetIdentifier( // + public Optional findIncompletePublicationsByEventAndTargetIdentifier( // Object event, PublicationTargetIdentifier targetIdentifier) { var serializedEvent = serializeEvent(event); @@ -173,10 +177,26 @@ public Optional findIncompletePublicationsByEventAndTargetIden @Override @Transactional(readOnly = true) @SuppressWarnings("null") - public List findIncompletePublications() { + public List findIncompletePublications() { return operations.query(SQL_STATEMENT_FIND_UNCOMPLETED, this::resultSetToPublications); } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.core.EventPublicationRepository#deletePublications(java.util.List) + */ + @Override + public void deletePublications(List identifiers) { + + var databaseIds = identifiers.stream().map(this::uuidToDatabase).toList(); + + operations.batchUpdate(SQL_STATEMENT_DELETE, batch(databaseIds, DELETE_BATCH_SIZE)); + } + + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.core.EventPublicationRepository#deleteCompletedPublications() + */ @Override public void deleteCompletedPublications() { operations.execute(SQL_STATEMENT_DELETE_UNCOMPLETED); @@ -195,7 +215,7 @@ public void deleteCompletedPublicationsBefore(Instant instant) { } @SuppressWarnings("null") - private List findAllIncompletePublicationsByEventAndListenerId( + private List findAllIncompletePublicationsByEventAndListenerId( String serializedEvent, String listenerId) { return operations.query( // @@ -210,15 +230,15 @@ private String serializeEvent(Object event) { } /** - * Effectively a {@link ResultSetExtractor} to drop {@link EventPublication}s that cannot be deserialized. + * Effectively a {@link ResultSetExtractor} to drop {@link TargetEventPublication}s that cannot be deserialized. * * @param resultSet must not be {@literal null}. * @return will never be {@literal null}. * @throws SQLException */ - private List resultSetToPublications(ResultSet resultSet) throws SQLException { + private List resultSetToPublications(ResultSet resultSet) throws SQLException { - List result = new ArrayList<>(); + List result = new ArrayList<>(); while (resultSet.next()) { @@ -233,14 +253,14 @@ private List resultSetToPublications(ResultSet resultSet) thro } /** - * Effectively a {@link RowMapper} to turn a single row into an {@link EventPublication}. + * Effectively a {@link RowMapper} to turn a single row into an {@link TargetEventPublication}. * * @param rs must not be {@literal null}. * @return can be {@literal null}. * @throws SQLException */ @Nullable - private EventPublication resultSetToPublication(ResultSet rs) throws SQLException { + private TargetEventPublication resultSetToPublication(ResultSet rs) throws SQLException { var id = getUuidFromResultSet(rs); var eventClass = loadClass(id, rs.getString("EVENT_TYPE")); @@ -277,7 +297,17 @@ private Class loadClass(UUID id, String className) { } } - private static class JdbcEventPublication implements EventPublication { + private static List batch(List input, int batchSize) { + + var inputSize = input.size(); + + return IntStream.range(0, (inputSize + batchSize - 1) / batchSize) + .mapToObj(i -> input.subList(i * batchSize, Math.min((i + 1) * batchSize, inputSize))) + .map(List::toArray) + .toList(); + } + + private static class JdbcEventPublication implements TargetEventPublication { private final UUID id; private final Instant publicationDate; diff --git a/spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryIntegrationTests.java b/spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryIntegrationTests.java index 13d377389..83e5f54e6 100644 --- a/spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryIntegrationTests.java +++ b/spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryIntegrationTests.java @@ -26,6 +26,7 @@ import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.util.Comparator; +import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; @@ -35,9 +36,9 @@ import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.context.annotation.Import; import org.springframework.jdbc.core.JdbcOperations; -import org.springframework.modulith.events.core.EventPublication; import org.springframework.modulith.events.core.EventSerializer; import org.springframework.modulith.events.core.PublicationTargetIdentifier; +import org.springframework.modulith.events.core.TargetEventPublication; import org.springframework.modulith.testapp.TestApplication; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.ContextConfiguration; @@ -79,7 +80,7 @@ void shouldPersistAndUpdateEventPublication() { when(serializer.serialize(testEvent)).thenReturn(serializedEvent); when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); - var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); + var publication = repository.create(TargetEventPublication.of(testEvent, TARGET_IDENTIFIER)); var eventPublications = repository.findIncompletePublications(); @@ -110,11 +111,11 @@ void returnsOldestIncompletePublicationsFirst() { createPublicationAt(now.withHour(1)); assertThat(repository.findIncompletePublications()) - .isSortedAccordingTo(Comparator.comparing(EventPublication::getPublicationDate)); + .isSortedAccordingTo(Comparator.comparing(TargetEventPublication::getPublicationDate)); } private void createPublicationAt(LocalDateTime publicationDate) { - repository.create(EventPublication.of("", TARGET_IDENTIFIER, publicationDate.toInstant(ZoneOffset.UTC))); + repository.create(TargetEventPublication.of("", TARGET_IDENTIFIER, publicationDate.toInstant(ZoneOffset.UTC))); } @Test // GH-3 @@ -130,14 +131,14 @@ void shouldUpdateSingleEventPublication() { when(serializer.serialize(testEvent2)).thenReturn(serializedEvent2); when(serializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2); - repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); - var publication = repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.create(TargetEventPublication.of(testEvent1, TARGET_IDENTIFIER)); + var publication = repository.create(TargetEventPublication.of(testEvent2, TARGET_IDENTIFIER)); // Complete publication repository.markCompleted(publication, Instant.now()); assertThat(repository.findIncompletePublications()).hasSize(1) - .element(0).extracting(EventPublication::getEvent).isEqualTo(testEvent1); + .element(0).extracting(TargetEventPublication::getEvent).isEqualTo(testEvent1); } @Test // GH-3 @@ -161,7 +162,7 @@ void shouldNotReturnCompletedEvents() { when(serializer.serialize(testEvent)).thenReturn(serializedEvent); when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); - var publication = EventPublication.of(testEvent, TARGET_IDENTIFIER); + var publication = TargetEventPublication.of(testEvent, TARGET_IDENTIFIER); repository.create(publication); repository.markCompleted(publication, Instant.now()); @@ -180,9 +181,9 @@ void shouldReturnTheOldestEvent() throws Exception { when(serializer.serialize(testEvent)).thenReturn(serializedEvent); when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); - var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); + var publication = repository.create(TargetEventPublication.of(testEvent, TARGET_IDENTIFIER)); Thread.sleep(10); - repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); + repository.create(TargetEventPublication.of(testEvent, TARGET_IDENTIFIER)); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER); @@ -202,7 +203,7 @@ void shouldSilentlyIgnoreNotSerializableEvents() { when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); // Store publication - repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); + repository.create(TargetEventPublication.of(testEvent, TARGET_IDENTIFIER)); operations.update("UPDATE EVENT_PUBLICATION SET EVENT_TYPE='abc'"); @@ -223,9 +224,9 @@ void shouldDeleteCompletedEvents() { when(serializer.serialize(testEvent2)).thenReturn(serializedEvent2); when(serializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2); - var publication = repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + var publication = repository.create(TargetEventPublication.of(testEvent1, TARGET_IDENTIFIER)); - repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.create(TargetEventPublication.of(testEvent2, TARGET_IDENTIFIER)); repository.markCompleted(publication, Instant.now()); repository.deleteCompletedPublications(); @@ -246,8 +247,8 @@ void shouldDeleteCompletedEventsBefore() { when(serializer.serialize(testEvent2)).thenReturn(serializedEvent2); when(serializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2); - repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); - repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.create(TargetEventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository.create(TargetEventPublication.of(testEvent2, TARGET_IDENTIFIER)); var now = Instant.now(); @@ -258,6 +259,31 @@ void shouldDeleteCompletedEventsBefore() { assertThat(operations.query("SELECT * FROM EVENT_PUBLICATION", (rs, __) -> rs.getString("SERIALIZED_EVENT"))) .hasSize(1).element(0).isEqualTo(serializedEvent2); } + + @Test + void deletesPublicationsByIdentifier() { + + var first = createPublication(new TestEvent("first")); + var second = createPublication(new TestEvent("second")); + + repository.deletePublications(List.of(first.getIdentifier())); + + assertThat(repository.findIncompletePublications()) + .hasSize(1) + .element(0) + .matches(it -> it.getIdentifier().equals(second.getIdentifier())) + .matches(it -> it.getEvent().equals(second.getEvent())); + } + + private TargetEventPublication createPublication(Object event) { + + var token = event.toString(); + + doReturn(token).when(serializer).serialize(event); + doReturn(event).when(serializer).deserialize(token, event.getClass()); + + return repository.create(TargetEventPublication.of(event, TARGET_IDENTIFIER)); + } } @Nested diff --git a/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java index 34458d976..ede8ecf47 100644 --- a/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java @@ -22,21 +22,23 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; +import java.util.stream.IntStream; -import org.springframework.modulith.events.core.EventPublication; import org.springframework.modulith.events.core.EventPublicationRepository; import org.springframework.modulith.events.core.EventSerializer; import org.springframework.modulith.events.core.PublicationTargetIdentifier; +import org.springframework.modulith.events.core.TargetEventPublication; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; /** - * Repository to store {@link EventPublication}s. + * Repository to store {@link TargetEventPublication}s. * * @author Oliver Drotbohm * @author Dmitry Belyaev * @author Björn Kieling */ +@Transactional class JpaEventPublicationRepository implements EventPublicationRepository { private static String BY_EVENT_AND_LISTENER_ID = """ @@ -64,6 +66,13 @@ class JpaEventPublicationRepository implements EventPublicationRepository { and p.listenerId = ?2 """; + private static final String DELETE = """ + delete + from JpaEventPublication p + where + p.id in ?1 + """; + private static final String DELETE_COMPLETED = """ delete from JpaEventPublication p @@ -78,6 +87,8 @@ class JpaEventPublicationRepository implements EventPublicationRepository { p.completionDate < ?1 """; + private static final int DELETE_BATCH_SIZE = 100; + private final EntityManager entityManager; private final EventSerializer serializer; @@ -102,8 +113,7 @@ public JpaEventPublicationRepository(EntityManager entityManager, EventSerialize * @see org.springframework.modulith.events.EventPublicationRepository#create(org.springframework.modulith.events.EventPublication) */ @Override - @Transactional - public EventPublication create(EventPublication publication) { + public TargetEventPublication create(TargetEventPublication publication) { entityManager.persist(domainToEntity(publication)); @@ -115,7 +125,6 @@ public EventPublication create(EventPublication publication) { * @see org.springframework.modulith.events.EventPublicationRepository#markCompleted(java.lang.Object, org.springframework.modulith.events.PublicationTargetIdentifier, java.time.Instant) */ @Override - @Transactional public void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate) { entityManager.createQuery(MARK_COMPLETED_BY_EVENT_AND_LISTENER_ID) @@ -131,7 +140,7 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier, */ @Override @Transactional(readOnly = true) - public List findIncompletePublications() { + public List findIncompletePublications() { return entityManager.createQuery(INCOMPLETE, JpaEventPublication.class) .getResultStream() @@ -145,19 +154,30 @@ public List findIncompletePublications() { */ @Override @Transactional(readOnly = true) - public Optional findIncompletePublicationsByEventAndTargetIdentifier( // + public Optional findIncompletePublicationsByEventAndTargetIdentifier( // Object event, PublicationTargetIdentifier targetIdentifier) { return findEntityBySerializedEventAndListenerIdAndCompletionDateNull(event, targetIdentifier) .map(this::entityToDomain); } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.core.EventPublicationRepository#deletePublications(java.util.List) + */ + @Override + public void deletePublications(List identifiers) { + + batch(identifiers, DELETE_BATCH_SIZE).forEach(it -> { + entityManager.createQuery(DELETE).setParameter(1, identifiers).executeUpdate(); + }); + } + /* * (non-Javadoc) * @see org.springframework.modulith.events.EventPublicationRepository#deleteCompletedPublications() */ @Override - @Transactional public void deleteCompletedPublications() { entityManager.createQuery(DELETE_COMPLETED).executeUpdate(); } @@ -171,10 +191,9 @@ public void deleteCompletedPublicationsBefore(Instant instant) { Assert.notNull(instant, "Instant must not be null!"); - var query = entityManager.createQuery(DELETE_COMPLETED_BEFORE); - - query.setParameter(1, instant); - query.executeUpdate(); + entityManager.createQuery(DELETE_COMPLETED_BEFORE) + .setParameter(1, instant) + .executeUpdate(); } private Optional findEntityBySerializedEventAndListenerIdAndCompletionDateNull( // @@ -193,17 +212,26 @@ private String serializeEvent(Object event) { return serializer.serialize(event).toString(); } - private JpaEventPublication domainToEntity(EventPublication domain) { + private JpaEventPublication domainToEntity(TargetEventPublication domain) { return new JpaEventPublication(domain.getIdentifier(), domain.getPublicationDate(), domain.getTargetIdentifier().getValue(), serializeEvent(domain.getEvent()), domain.getEvent().getClass()); } - private EventPublication entityToDomain(JpaEventPublication entity) { + private TargetEventPublication entityToDomain(JpaEventPublication entity) { return new JpaEventPublicationAdapter(entity, serializer); } - private static class JpaEventPublicationAdapter implements EventPublication { + private static List> batch(List input, int batchSize) { + + var inputSize = input.size(); + + return IntStream.range(0, (inputSize + batchSize - 1) / batchSize) + .mapToObj(i -> input.subList(i * batchSize, Math.min((i + 1) * batchSize, inputSize))) + .toList(); + } + + private static class JpaEventPublicationAdapter implements TargetEventPublication { private final JpaEventPublication publication; private final EventSerializer serializer; diff --git a/spring-modulith-events/spring-modulith-events-jpa/src/test/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepositoryIntegrationTests.java b/spring-modulith-events/spring-modulith-events-jpa/src/test/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepositoryIntegrationTests.java index e9b37742e..606a534ac 100644 --- a/spring-modulith-events/spring-modulith-events-jpa/src/test/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepositoryIntegrationTests.java +++ b/spring-modulith-events/spring-modulith-events-jpa/src/test/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepositoryIntegrationTests.java @@ -27,6 +27,7 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Comparator; +import java.util.List; import java.util.UUID; import javax.sql.DataSource; @@ -40,9 +41,9 @@ import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; -import org.springframework.modulith.events.core.EventPublication; import org.springframework.modulith.events.core.EventSerializer; import org.springframework.modulith.events.core.PublicationTargetIdentifier; +import org.springframework.modulith.events.core.TargetEventPublication; import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.orm.jpa.SharedEntityManagerCreator; @@ -127,7 +128,7 @@ void persistsJpaEventPublication() { when(eventSerializer.serialize(testEvent)).thenReturn(serializedEvent); when(eventSerializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); - var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); + var publication = repository.create(TargetEventPublication.of(testEvent, TARGET_IDENTIFIER)); var eventPublications = repository.findIncompletePublications(); @@ -162,7 +163,7 @@ void shouldNotReturnCompletedEvents() { when(eventSerializer.serialize(testEvent)).thenReturn(serializedEvent); when(eventSerializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); - var publication = EventPublication.of(testEvent, TARGET_IDENTIFIER); + var publication = TargetEventPublication.of(testEvent, TARGET_IDENTIFIER); repository.create(publication); repository.markCompleted(publication, Instant.now()); @@ -185,8 +186,8 @@ void shouldDeleteCompletedEvents() { when(eventSerializer.serialize(testEvent2)).thenReturn(serializedEvent2); when(eventSerializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2); - repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); - repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.create(TargetEventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository.create(TargetEventPublication.of(testEvent2, TARGET_IDENTIFIER)); repository.markCompleted(testEvent1, TARGET_IDENTIFIER, Instant.now()); repository.deleteCompletedPublications(); @@ -205,7 +206,7 @@ void returnsOldestIncompletePublicationsFirst() { savePublicationAt(now.withHour(1)); assertThat(repository.findIncompletePublications()) - .isSortedAccordingTo(Comparator.comparing(EventPublication::getPublicationDate)); + .isSortedAccordingTo(Comparator.comparing(TargetEventPublication::getPublicationDate)); } @Test // GH-251 @@ -221,8 +222,8 @@ void shouldDeleteCompletedEventsBefore() { when(eventSerializer.serialize(testEvent2)).thenReturn(serializedEvent2); when(eventSerializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2); - repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); - repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.create(TargetEventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository.create(TargetEventPublication.of(testEvent2, TARGET_IDENTIFIER)); var now = Instant.now(); @@ -235,6 +236,31 @@ void shouldDeleteCompletedEventsBefore() { .element(0).extracting(it -> it.serializedEvent).isEqualTo(serializedEvent2); } + @Test + void deletesPublicationsByIdentifier() { + + var first = createPublication(new TestEvent("first")); + var second = createPublication(new TestEvent("second")); + + repository.deletePublications(List.of(first.getIdentifier())); + + assertThat(repository.findIncompletePublications()) + .hasSize(1) + .element(0) + .matches(it -> it.getIdentifier().equals(second.getIdentifier())) + .matches(it -> it.getEvent().equals(second.getEvent())); + } + + private TargetEventPublication createPublication(Object event) { + + var token = event.toString(); + + doReturn(token).when(eventSerializer).serialize(event); + doReturn(event).when(eventSerializer).deserialize(token, event.getClass()); + + return repository.create(TargetEventPublication.of(event, TARGET_IDENTIFIER)); + } + private void savePublicationAt(LocalDateTime date) { em.persist(new JpaEventPublication(UUID.randomUUID(), date.toInstant(ZoneOffset.UTC), "", "", Object.class)); } diff --git a/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java index 99361b3ea..372e36f7e 100644 --- a/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java @@ -29,18 +29,20 @@ import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Update; import org.springframework.data.util.TypeInformation; -import org.springframework.modulith.events.core.EventPublication; import org.springframework.modulith.events.core.EventPublicationRepository; import org.springframework.modulith.events.core.PublicationTargetIdentifier; +import org.springframework.modulith.events.core.TargetEventPublication; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; /** - * Repository to store {@link EventPublication}s in a MongoDB. + * Repository to store {@link TargetEventPublication}s in a MongoDB. * * @author Björn Kieling * @author Dmitry Belyaev * @author Oliver Drotbohm */ +@Transactional class MongoDbEventPublicationRepository implements EventPublicationRepository { private final MongoTemplate mongoTemplate; @@ -62,7 +64,7 @@ public MongoDbEventPublicationRepository(MongoTemplate mongoTemplate) { * @see org.springframework.modulith.events.EventPublicationRepository#create(org.springframework.modulith.events.EventPublication) */ @Override - public EventPublication create(EventPublication publication) { + public TargetEventPublication create(TargetEventPublication publication) { mongoTemplate.save(domainToDocument(publication)); @@ -82,8 +84,13 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier, mongoTemplate.updateFirst(query(criteria), update, MongoDbEventPublication.class); } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.core.EventPublicationRepository#findIncompletePublications() + */ @Override - public List findIncompletePublications() { + @Transactional(readOnly = true) + public List findIncompletePublications() { var query = query(where("completionDate").isNull()) .with(Sort.by("publicationDate").ascending()); @@ -93,8 +100,13 @@ public List findIncompletePublications() { .toList(); } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.core.EventPublicationRepository#findIncompletePublicationsByEventAndTargetIdentifier(java.lang.Object, org.springframework.modulith.events.core.PublicationTargetIdentifier) + */ @Override - public Optional findIncompletePublicationsByEventAndTargetIdentifier( + @Transactional(readOnly = true) + public Optional findIncompletePublicationsByEventAndTargetIdentifier( Object event, PublicationTargetIdentifier targetIdentifier) { var documents = findDocumentsByEventAndTargetIdentifierAndCompletionDateNull(event, targetIdentifier); @@ -107,6 +119,15 @@ public Optional findIncompletePublicationsByEventAndTargetIden return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.core.EventPublicationRepository#deletePublications(java.util.List) + */ + @Override + public void deletePublications(List identifiers) { + mongoTemplate.remove(query(where("id").in(identifiers)), MongoDbEventPublication.class); + } + /* * (non-Javadoc) * @see org.springframework.modulith.events.EventPublicationRepository#deleteCompletedPublications() @@ -146,7 +167,7 @@ private Criteria byEventAndListenerId(Object event, PublicationTargetIdentifier .and("completionDate").isNull(); } - private MongoDbEventPublication domainToDocument(EventPublication publication) { + private MongoDbEventPublication domainToDocument(TargetEventPublication publication) { return new MongoDbEventPublication( // publication.getIdentifier(), // @@ -155,11 +176,11 @@ private MongoDbEventPublication domainToDocument(EventPublication publication) { publication.getEvent()); } - private EventPublication documentToDomain(MongoDbEventPublication document) { + private TargetEventPublication documentToDomain(MongoDbEventPublication document) { return new MongoDbEventPublicationAdapter(document); } - private static class MongoDbEventPublicationAdapter implements EventPublication { + private static class MongoDbEventPublicationAdapter implements TargetEventPublication { private final MongoDbEventPublication publication; diff --git a/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java b/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java index ee5517b09..0e5b105dd 100644 --- a/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java +++ b/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java @@ -24,6 +24,7 @@ import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.util.Comparator; +import java.util.List; import java.util.UUID; import org.junit.jupiter.api.AfterEach; @@ -33,8 +34,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest; import org.springframework.data.mongodb.core.MongoTemplate; -import org.springframework.modulith.events.core.EventPublication; import org.springframework.modulith.events.core.PublicationTargetIdentifier; +import org.springframework.modulith.events.core.TargetEventPublication; import org.springframework.modulith.testapp.TestApplication; import org.springframework.test.context.ContextConfiguration; @@ -66,7 +67,7 @@ void tearDown() { void shouldPersistAndUpdateEventPublication() { var testEvent = new TestEvent("abc"); - var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); + var publication = repository.create(TargetEventPublication.of(testEvent, TARGET_IDENTIFIER)); var eventPublications = repository.findIncompletePublications(); @@ -89,13 +90,13 @@ void shouldUpdateSingleEventPublication() { var testEvent1 = new TestEvent("id1"); var testEvent2 = new TestEvent("id2"); - repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); - var publication = repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.create(TargetEventPublication.of(testEvent1, TARGET_IDENTIFIER)); + var publication = repository.create(TargetEventPublication.of(testEvent2, TARGET_IDENTIFIER)); repository.markCompleted(publication, Instant.now()); assertThat(repository.findIncompletePublications()).hasSize(1) - .element(0).extracting(EventPublication::getEvent).isEqualTo(testEvent1); + .element(0).extracting(TargetEventPublication::getEvent).isEqualTo(testEvent1); } @Test // GH-133 @@ -108,7 +109,7 @@ void returnsOldestIncompletePublicationsFirst() { savePublicationAt(now.withHour(1)); assertThat(repository.findIncompletePublications()) - .isSortedAccordingTo(Comparator.comparing(EventPublication::getPublicationDate)); + .isSortedAccordingTo(Comparator.comparing(TargetEventPublication::getPublicationDate)); } private void savePublicationAt(LocalDateTime date) { @@ -126,10 +127,11 @@ void shouldFindEventPublicationByEventAndTargetIdentifier() { var testEvent1 = new TestEvent("abc"); var testEvent2 = new TestEvent("def"); - repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); - repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository.create(TargetEventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.create(TargetEventPublication.of(testEvent1, TARGET_IDENTIFIER)); repository - .create(EventPublication.of(testEvent1, PublicationTargetIdentifier.of(TARGET_IDENTIFIER.getValue() + "!"))); + .create(TargetEventPublication.of(testEvent1, + PublicationTargetIdentifier.of(TARGET_IDENTIFIER.getValue() + "!"))); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent1, TARGET_IDENTIFIER); @@ -153,7 +155,7 @@ void shouldNotReturnCompletedEvents() { TestEvent testEvent = new TestEvent("abc"); - EventPublication publication = EventPublication.of(testEvent, TARGET_IDENTIFIER); + TargetEventPublication publication = TargetEventPublication.of(testEvent, TARGET_IDENTIFIER); // Store publication repository.create(publication); @@ -169,9 +171,9 @@ void shouldReturnTheOldestEventTest() throws InterruptedException { var testEvent = new TestEvent("id"); - var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); + var publication = repository.create(TargetEventPublication.of(testEvent, TARGET_IDENTIFIER)); Thread.sleep(10); - repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); + repository.create(TargetEventPublication.of(testEvent, TARGET_IDENTIFIER)); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER); @@ -190,9 +192,9 @@ void shouldDeleteCompletedEvents() { var testEvent1 = new TestEvent("abc"); var testEvent2 = new TestEvent("def"); - var publication = repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + var publication = repository.create(TargetEventPublication.of(testEvent1, TARGET_IDENTIFIER)); - repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.create(TargetEventPublication.of(testEvent2, TARGET_IDENTIFIER)); repository.markCompleted(publication, Instant.now()); repository.deleteCompletedPublications(); @@ -207,8 +209,8 @@ void shouldDeleteCompletedEventsBefore() { var testEvent1 = new TestEvent("abc"); var testEvent2 = new TestEvent("def"); - var publication1 = repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); - var publication2 = repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + var publication1 = repository.create(TargetEventPublication.of(testEvent1, TARGET_IDENTIFIER)); + var publication2 = repository.create(TargetEventPublication.of(testEvent2, TARGET_IDENTIFIER)); var now = Instant.now(); @@ -220,6 +222,25 @@ void shouldDeleteCompletedEventsBefore() { .hasSize(1) // .element(0).extracting(it -> it.event).isEqualTo(testEvent2); } + + @Test + void deletesPublicationsByIdentifier() { + + var first = createPublication(new TestEvent("first")); + var second = createPublication(new TestEvent("second")); + + repository.deletePublications(List.of(first.getIdentifier())); + + assertThat(repository.findIncompletePublications()) + .hasSize(1) + .element(0) + .matches(it -> it.getIdentifier().equals(second.getIdentifier())) + .matches(it -> it.getEvent().equals(second.getEvent())); + } + + private TargetEventPublication createPublication(Object event) { + return repository.create(TargetEventPublication.of(event, TARGET_IDENTIFIER)); + } } @Value diff --git a/spring-modulith-events/spring-modulith-events-tests/src/test/java/example/events/PersistentDomainEventIntegrationTest.java b/spring-modulith-events/spring-modulith-events-tests/src/test/java/example/events/PersistentDomainEventIntegrationTest.java index 3e20d021e..218e75b5c 100644 --- a/spring-modulith-events/spring-modulith-events-tests/src/test/java/example/events/PersistentDomainEventIntegrationTest.java +++ b/spring-modulith-events/spring-modulith-events-tests/src/test/java/example/events/PersistentDomainEventIntegrationTest.java @@ -30,7 +30,7 @@ import org.springframework.context.event.EventListener; import org.springframework.core.env.MapPropertySource; import org.springframework.modulith.events.config.EnablePersistentDomainEvents; -import org.springframework.modulith.events.core.EventPublication; +import org.springframework.modulith.events.core.TargetEventPublication; import org.springframework.modulith.events.core.EventPublicationRegistry; import org.springframework.modulith.events.core.PublicationTargetIdentifier; import org.springframework.modulith.events.support.PersistentApplicationEventMulticaster; @@ -74,7 +74,7 @@ void exposesEventPublicationForFailedListener() throws Exception { } finally { assertThat(registry.findIncompletePublications()) // - .extracting(EventPublication::getTargetIdentifier) // + .extracting(TargetEventPublication::getTargetIdentifier) // .extracting(PublicationTargetIdentifier::getValue) // .hasSize(2) // .allSatisfy(id -> {