Skip to content

Commit

Permalink
#251 - Improve efficiency of event publication completion.
Browse files Browse the repository at this point in the history
Changed the EventPublicationRepository interface to allow marking an event as completed without having to materialize it in the first place. This allows us to get rid of CompletableEventPublication. EventPublication not exposes its identifier to make sure the stores can actually store the same id.

Introduced EventPublicationRegistry.deleteCompletedPublicationsOlderThan(Duration) to purge completed event publications before a given point in time.
  • Loading branch information
odrotbohm committed Aug 7, 2023
1 parent 22ec81b commit 4c145dc
Show file tree
Hide file tree
Showing 16 changed files with 454 additions and 306 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events;

import java.time.Instant;

/**
* Internal interface to be able to mark {@link EventPublication} instances as completed.
*
* @author Oliver Drotbohm
*/
interface Completable {

/**
* Marks the instance as completed at the given {@link Instant}.
*
* @param instant must not be {@literal null}.
*/
void markCompleted(Instant instant);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

import org.springframework.util.Assert;

/**
* Default {@link CompletableEventPublication} implementation.
* Default {@link Completable} implementation.
*
* @author Oliver Drotbohm
*/
class DefaultEventPublication implements CompletableEventPublication {
class DefaultEventPublication implements EventPublication {

private final UUID identifier;
private final Object event;
private final PublicationTargetIdentifier targetIdentifier;
private final Instant publicationDate;
Expand All @@ -47,12 +49,22 @@ class DefaultEventPublication implements CompletableEventPublication {
Assert.notNull(targetIdentifier, "PublicationTargetIdentifier must not be null!");
Assert.notNull(publicationDate, "Publication date must not be null!");

this.identifier = UUID.randomUUID();
this.event = event;
this.targetIdentifier = targetIdentifier;
this.publicationDate = publicationDate;
this.completionDate = Optional.empty();
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublication#getPublicationIdentifier()
*/
@Override
public UUID getIdentifier() {
return identifier;
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublication#getEvent()
Expand Down Expand Up @@ -89,13 +101,11 @@ public Optional<Instant> getCompletionDate() {

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.CompletableEventPublication#markCompleted()
* @see org.springframework.modulith.events.CompletableEventPublication#markCompleted(java.time.Instant)
*/
@Override
public CompletableEventPublication markCompleted() {

this.completionDate = Optional.of(Instant.now());
return this;
public void markCompleted(Instant instant) {
this.completionDate = Optional.of(instant);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.springframework.modulith.events;

import java.time.Clock;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.stream.Stream;
Expand All @@ -39,6 +40,7 @@
public class DefaultEventPublicationRegistry implements DisposableBean, EventPublicationRegistry {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventPublicationRegistry.class);
private static final String REGISTER = "Registering publication of {} for {}.";

private final EventPublicationRepository events;
private final Clock clock;
Expand All @@ -65,7 +67,8 @@ public DefaultEventPublicationRegistry(EventPublicationRepository events, Clock
@Override
public Collection<EventPublication> store(Object event, Stream<PublicationTargetIdentifier> listeners) {

return listeners.map(it -> map(event, it))
return listeners.map(it -> EventPublication.of(event, it, clock.instant()))
.peek(it -> LOGGER.debug(REGISTER, it.getEvent().getClass().getName(), it.getTargetIdentifier().getValue()))
.map(events::create)
.toList();
}
Expand All @@ -90,12 +93,24 @@ public void markCompleted(Object event, PublicationTargetIdentifier targetIdenti
Assert.notNull(event, "Domain event must not be null!");
Assert.notNull(targetIdentifier, "Listener identifier must not be null!");

events.findIncompletePublicationsByEventAndTargetIdentifier(event, targetIdentifier) //
.map(DefaultEventPublicationRegistry::logCompleted) //
.map(e -> CompletableEventPublication.of(e.getEvent(), e.getTargetIdentifier()))
.ifPresent(it -> events.update(it.markCompleted()));
LOGGER.debug("Marking publication of event {} to listener {} completed.", //
event.getClass().getName(), targetIdentifier.getValue());

events.markCompleted(event, targetIdentifier, clock.instant());
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublicationRegistry#deleteCompletedPublicationsOlderThan(java.time.Duration)
*/
@Override
public void deleteCompletedPublicationsOlderThan(Duration duration) {

Assert.notNull(duration, "Duration must not be null!");

events.deleteCompletedPublicationsBefore(clock.instant().minus(duration));
};

/*
* (non-Javadoc)
* @see org.springframework.beans.factory.DisposableBean#destroy()
Expand All @@ -121,23 +136,4 @@ public void destroy() {
LOGGER.info("{} {} - {}", prefix, it.getEvent().getClass().getName(), it.getTargetIdentifier().getValue());
}
}

private EventPublication map(Object event, PublicationTargetIdentifier targetIdentifier) {

var result = CompletableEventPublication.of(event, targetIdentifier, clock.instant());

LOGGER.debug("Registering publication of {} for {}.", //
result.getEvent().getClass().getName(), result.getTargetIdentifier().getValue());

return result;
}

private static EventPublication logCompleted(EventPublication publication) {

LOGGER.debug("Marking publication of event {} to listener {} completed.", //
publication.getEvent().getClass().getName(), publication.getTargetIdentifier().getValue());

return publication;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package org.springframework.modulith.events;

import java.time.Clock;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;

import org.springframework.context.ApplicationEvent;
import org.springframework.context.PayloadApplicationEvent;
Expand All @@ -28,7 +31,40 @@
* @author Björn Kieling
* @author Dmitry Belyaev
*/
public interface EventPublication extends Comparable<EventPublication> {
public interface EventPublication extends Comparable<EventPublication>, Completable {

/**
* Creates a {@link EventPublication} for the given event an listener identifier using a default {@link Instant}.
* Prefer using {@link #of(Object, PublicationTargetIdentifier, Instant)} with a dedicated {@link Instant} obtained
* from a {@link Clock}.
*
* @param event must not be {@literal null}.
* @param id must not be {@literal null}.
* @return will never be {@literal null}.
* @see #of(Object, PublicationTargetIdentifier, Instant)
*/
static EventPublication of(Object event, PublicationTargetIdentifier id) {
return new DefaultEventPublication(event, id, Instant.now());
}

/**
* Creates a {@link EventPublication} for the given event an listener identifier and publication date.
*
* @param event must not be {@literal null}.
* @param id must not be {@literal null}.
* @param publicationDate must not be {@literal null}.
* @return will never be {@literal null}.
*/
static EventPublication of(Object event, PublicationTargetIdentifier id, Instant publicationDate) {
return new DefaultEventPublication(event, id, publicationDate);
}

/**
* Returns a unique identifier for this publication.
*
* @return will never be {@literal null}.
*/
UUID getIdentifier();

/**
* Returns the event that is published.
Expand Down Expand Up @@ -79,6 +115,22 @@ default boolean isIdentifiedBy(PublicationTargetIdentifier identifier) {
return this.getTargetIdentifier().equals(identifier);
}

/**
* Returns the completion date of the publication.
*
* @return will never be {@literal null}.
*/
Optional<Instant> getCompletionDate();

/**
* Returns whether the publication of the event has completed.
*
* @return will never be {@literal null}.
*/
default boolean isPublicationCompleted() {
return getCompletionDate().isPresent();
}

/*
* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.modulith.events;

import java.time.Duration;
import java.util.Collection;
import java.util.stream.Stream;

Expand Down Expand Up @@ -52,4 +53,11 @@ public interface EventPublicationRegistry {
* @param targetIdentifier must not be {@literal null}.
*/
void markCompleted(Object event, PublicationTargetIdentifier targetIdentifier);

/**
* Deletes all completed {@link EventPublication}s that have been completed before the given {@link Duration}.
*
* @param duration must not be {@literal null}.
*/
void deleteCompletedPublicationsOlderThan(Duration duration);
}
Loading

0 comments on commit 4c145dc

Please sign in to comment.