Skip to content

Commit

Permalink
Hacking.
Browse files Browse the repository at this point in the history
  • Loading branch information
odrotbohm committed Sep 4, 2023
1 parent c246b29 commit bb200ec
Show file tree
Hide file tree
Showing 20 changed files with 511 additions and 191 deletions.
5 changes: 5 additions & 0 deletions spring-modulith-events/spring-modulith-events-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
<artifactId>spring-core</artifactId>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>

<dependency>
<groupId>org.jmolecules</groupId>
<artifactId>jmolecules-events</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events;

import java.time.Duration;
import java.util.Collection;
import java.util.function.Predicate;

/**
* All {@link EventPublication}s that have already been completed.
*
* @author Oliver Drotbohm
* @since 1.1
*/
public interface CompletedEventPublications {

/**
* Returns all {@link EventPublication}s that have already been completed.
*
* @return will never be {@literal null}.
*/
Collection<? extends EventPublication> findAll();

/**
* Deletes all {@link EventPublication}s matching the given {@link Predicate}. Note that implementations will iterate
* all completed {@link EventPublication}s and apply the predicate in memory.
*
* @param filter must not be {@literal null}.
*/
void deletePublications(Predicate<EventPublication> filter);

/**
* Deletes all {@link EventPublication}s whose completion date is older than the given {@link Duration}.
*
* @param duration must not be {@literal null}.
*/
void deletePublicationsOlderThan(Duration duration);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;

import org.springframework.context.ApplicationEvent;
import org.springframework.context.PayloadApplicationEvent;

/**
* An event publication.
*
* @author Oliver Drotbohm
* @since 1.1
*/
public interface EventPublication {

/**
* Returns a unique identifier for this publication.
*
* @return will never be {@literal null}.
*/
UUID getIdentifier();

/**
* Returns the event that is published.
*
* @return
*/
Object getEvent();

/**
* Returns the event as Spring {@link ApplicationEvent}, effectively wrapping it into a
* {@link PayloadApplicationEvent} in case it's not one already.
*
* @return
*/
default ApplicationEvent getApplicationEvent() {

Object event = getEvent();

return PayloadApplicationEvent.class.isInstance(event) //
? PayloadApplicationEvent.class.cast(event)
: new PayloadApplicationEvent<>(this, event);
}

/**
* Returns the time the event is published at.
*
* @return
*/
Instant getPublicationDate();

/**
* Returns the completion date of the publication.
*
* @return will never be {@literal null}.
*/
Optional<Instant> getCompletionDate();

/**
* Returns whether the publication of the event has completed.
*
* @return will never be {@literal null}.
*/
default boolean isPublicationCompleted() {
return getCompletionDate().isPresent();
}

/*
* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
default int compareTo(EventPublication that) {
return this.getPublicationDate().compareTo(that.getPublicationDate());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events;

import java.util.function.Predicate;

/**
* @author Oliver Drotbohm
* @since 1.1
*/
public interface UncompletedEventPublications {

void resubmitCompletedPublications(Predicate<EventPublication> filter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.time.Instant;

/**
* Internal interface to be able to mark {@link EventPublication} instances as completed.
* Internal interface to be able to mark {@link TargetEventPublication} instances as completed.
*
* @author Oliver Drotbohm
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*
* @author Oliver Drotbohm
*/
class DefaultEventPublication implements EventPublication {
class DefaultEventPublication implements TargetEventPublication {

private final UUID identifier;
private final Object event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationListener;
import org.springframework.modulith.events.CompletedEventPublications;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
Expand All @@ -37,7 +39,8 @@
* @author Björn Kieling
* @author Dmitry Belyaev
*/
public class DefaultEventPublicationRegistry implements DisposableBean, EventPublicationRegistry {
public class DefaultEventPublicationRegistry
implements DisposableBean, EventPublicationRegistry, CompletedEventPublications {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventPublicationRegistry.class);
private static final String REGISTER = "Registering publication of {} for {}.";
Expand Down Expand Up @@ -65,9 +68,9 @@ public DefaultEventPublicationRegistry(EventPublicationRepository events, Clock
* @see org.springframework.modulith.events.EventPublicationRegistry#store(java.lang.Object, java.util.stream.Stream)
*/
@Override
public Collection<EventPublication> store(Object event, Stream<PublicationTargetIdentifier> listeners) {
public Collection<TargetEventPublication> store(Object event, Stream<PublicationTargetIdentifier> listeners) {

return listeners.map(it -> EventPublication.of(event, it, clock.instant()))
return listeners.map(it -> TargetEventPublication.of(event, it, clock.instant()))
.peek(it -> LOGGER.debug(REGISTER, it.getEvent().getClass().getName(), it.getTargetIdentifier().getValue()))
.map(events::create)
.toList();
Expand All @@ -78,7 +81,7 @@ public Collection<EventPublication> store(Object event, Stream<PublicationTarget
* @see org.springframework.modulith.events.EventPublicationRegistry#findIncompletePublications()
*/
@Override
public Collection<EventPublication> findIncompletePublications() {
public Collection<TargetEventPublication> findIncompletePublications() {
return events.findIncompletePublications();
}

Expand Down Expand Up @@ -111,14 +114,52 @@ public void deleteCompletedPublicationsOlderThan(Duration duration) {
events.deleteCompletedPublicationsBefore(clock.instant().minus(duration));
};

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.CompletedEventPublications#findAll()
*/
@Override
public Collection<? extends TargetEventPublication> findAll() {
return findIncompletePublications();
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.CompletedEventPublications#deletePublications(java.util.function.Predicate)
*/
@Override
public void deletePublications(Predicate<org.springframework.modulith.events.EventPublication> filter) {

var identifiers = findIncompletePublications().stream()
.filter(filter)
.map(TargetEventPublication::getIdentifier)
.toList();

events.deletePublications(identifiers);
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.CompletedEventPublications#deletePublicationsOlderThan(java.time.Duration)
*/
@Override
public void deletePublicationsOlderThan(Duration duration) {

var now = clock.instant();

deletePublications(event -> event.getCompletionDate()
.filter(date -> date.isBefore(now.minus(duration)))
.isPresent());
}

/*
* (non-Javadoc)
* @see org.springframework.beans.factory.DisposableBean#destroy()
*/
@Override
public void destroy() {

List<EventPublication> publications = events.findIncompletePublications();
List<TargetEventPublication> publications = events.findIncompletePublications();

if (publications.isEmpty()) {

Expand All @@ -131,7 +172,7 @@ public void destroy() {
for (int i = 0; i < publications.size(); i++) {

String prefix = i + 1 == publications.size() ? "└─" : "├─";
EventPublication it = publications.get(i);
TargetEventPublication it = publications.get(i);

LOGGER.info("{} {} - {}", prefix, it.getEvent().getClass().getName(), it.getTargetIdentifier().getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@
public interface EventPublicationRegistry {

/**
* Stores {@link EventPublication}s for the given event and {@link ApplicationListener}s.
* Stores {@link TargetEventPublication}s for the given event and {@link ApplicationListener}s.
*
* @param event must not be {@literal null}.
* @param listeners must not be {@literal null}.
*/
Collection<EventPublication> store(Object event, Stream<PublicationTargetIdentifier> listeners);
Collection<TargetEventPublication> store(Object event, Stream<PublicationTargetIdentifier> listeners);

/**
* Returns all {@link EventPublication}s that have not been completed yet.
* Returns all {@link TargetEventPublication}s that have not been completed yet.
*
* @return will never be {@literal null}.
*/
Collection<EventPublication> findIncompletePublications();
Collection<TargetEventPublication> findIncompletePublications();

/**
* Marks the publication for the given event and {@link PublicationTargetIdentifier} as completed.
Expand All @@ -55,7 +55,7 @@ public interface EventPublicationRegistry {
void markCompleted(Object event, PublicationTargetIdentifier targetIdentifier);

/**
* Deletes all completed {@link EventPublication}s that have been completed before the given {@link Duration}.
* Deletes all completed {@link TargetEventPublication}s that have been completed before the given {@link Duration}.
*
* @param duration must not be {@literal null}.
*/
Expand Down
Loading

0 comments on commit bb200ec

Please sign in to comment.