Skip to content

Commit

Permalink
GH-748 - Allow publication completion to delete database entries.
Browse files Browse the repository at this point in the history
We now expose a spring.modulith.events.completion-mode property, defaulting the previous behavior to a value of UPDATE. The property can also be configured to DELETE, which will cause the persistence implementations to flip to removing the database entries for event publications instead of setting the completion date.
  • Loading branch information
odrotbohm committed Aug 27, 2024
1 parent 65178e4 commit cd0b5cf
Show file tree
Hide file tree
Showing 17 changed files with 572 additions and 129 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.support;

import org.springframework.core.env.Environment;
import org.springframework.util.Assert;

/**
* Different modes of event completion.
*
* @author Oliver Drotbohm
* @since 1.3
* @soundtrack Lettuce - Waffles (Unify)
*/
public enum CompletionMode {

/**
* Completes an {@link org.springframework.modulith.events.EventPublication} by setting its completion date and
* updating the database entry accordingly.
*/
UPDATE,

/**
* Completes an {@link org.springframework.modulith.events.EventPublication} by removing the database entry.
*/
DELETE;

public static final String PROPERTY = "spring.modulith.events.completion-mode";

/**
* Looks up the {@link CompletionMode} from the given environment or uses {@link #UPDATE} as default.
*
* @param environment must not be {@literal null}.
* @return will never be {@literal null}.
*/
public static CompletionMode from(Environment environment) {

Assert.notNull(environment, "Environment must not be null!");

var result = environment.getProperty(PROPERTY, CompletionMode.class);

return result == null ? CompletionMode.UPDATE : result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
"type": "java.lang.Boolean",
"description": "Whether to enable event externalization.",
"defaultValue": "true"
},
{
"name": "spring.modulith.events.completion-mode",
"type": "org.springframework.modulith.events.support.CompletionMode",
"description": "How to complete event publications.",
"defaultValue": "update"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.ResourceLoader;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.support.JdbcUtils;
import org.springframework.modulith.events.config.EventPublicationAutoConfiguration;
import org.springframework.modulith.events.config.EventPublicationConfigurationExtension;
import org.springframework.modulith.events.core.EventSerializer;
import org.springframework.modulith.events.support.CompletionMode;

/**
* @author Dmitry Belyaev
Expand All @@ -48,10 +50,16 @@ DatabaseType databaseType(DataSource dataSource) {
}

@Bean
JdbcEventPublicationRepository jdbcEventPublicationRepository(JdbcTemplate jdbcTemplate,
EventSerializer serializer, DatabaseType databaseType, JdbcConfigurationProperties properties) {
JdbcRepositorySettings jdbcEventPublicationRepositorySettings(DatabaseType databaseType,
JdbcConfigurationProperties properties, Environment environment) {

return new JdbcRepositorySettings(databaseType, CompletionMode.from(environment), properties.getSchema());
}

return new JdbcEventPublicationRepository(jdbcTemplate, serializer, databaseType, properties);
@Bean
JdbcEventPublicationRepository jdbcEventPublicationRepository(JdbcTemplate jdbcTemplate,
EventSerializer serializer, JdbcRepositorySettings settings) {
return new JdbcEventPublicationRepository(jdbcTemplate, serializer, settings);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,20 @@ class JdbcEventPublicationRepository implements EventPublicationRepository, Bean
ID IN
""";

private static final String SQL_STATEMENT_DELETE_BY_EVENT_AND_LISTENER_ID = """
DELETE FROM %s
WHERE
LISTENER_ID = ?
AND SERIALIZED_EVENT = ?
""";

private static final String SQL_STATEMENT_DELETE_BY_ID = """
DELETE
FROM %s
WHERE
ID = ?
""";

private static final String SQL_STATEMENT_DELETE_UNCOMPLETED = """
DELETE
FROM %s
Expand All @@ -134,7 +148,8 @@ class JdbcEventPublicationRepository implements EventPublicationRepository, Bean

private final JdbcOperations operations;
private final EventSerializer serializer;
private final DatabaseType databaseType;
private final JdbcRepositorySettings settings;

private ClassLoader classLoader;

private final String sqlStatementInsert,
Expand All @@ -145,6 +160,8 @@ class JdbcEventPublicationRepository implements EventPublicationRepository, Bean
sqlStatementUpdateById,
sqlStatementFindByEventAndListenerId,
sqlStatementDelete,
sqlStatementDeleteByEventAndListenerId,
sqlStatementDeleteById,
sqlStatementDeleteUncompleted,
sqlStatementDeleteUncompletedBefore;

Expand All @@ -154,22 +171,20 @@ class JdbcEventPublicationRepository implements EventPublicationRepository, Bean
*
* @param operations must not be {@literal null}.
* @param serializer must not be {@literal null}.
* @param databaseType must not be {@literal null}.
* @param properties must not be {@literal null}.
* @param settings must not be {@literal null}.
*/
public JdbcEventPublicationRepository(JdbcOperations operations, EventSerializer serializer,
DatabaseType databaseType, JdbcConfigurationProperties properties) {
JdbcRepositorySettings settings) {

Assert.notNull(operations, "JdbcOperations must not be null!");
Assert.notNull(serializer, "EventSerializer must not be null!");
Assert.notNull(databaseType, "DatabaseType must not be null!");
Assert.notNull(properties, "JdbcConfigurationProperties must not be null!");
Assert.notNull(settings, "DatabaseType must not be null!");

this.operations = operations;
this.serializer = serializer;
this.databaseType = databaseType;
this.settings = settings;

var schema = properties.getSchema();
var schema = settings.getSchema();
var table = ObjectUtils.isEmpty(schema) ? "EVENT_PUBLICATION" : schema + ".EVENT_PUBLICATION";

this.sqlStatementInsert = SQL_STATEMENT_INSERT.formatted(table);
Expand All @@ -180,6 +195,8 @@ public JdbcEventPublicationRepository(JdbcOperations operations, EventSerializer
this.sqlStatementUpdateById = SQL_STATEMENT_UPDATE_BY_ID.formatted(table);
this.sqlStatementFindByEventAndListenerId = SQL_STATEMENT_FIND_BY_EVENT_AND_LISTENER_ID.formatted(table);
this.sqlStatementDelete = SQL_STATEMENT_DELETE.formatted(table);
this.sqlStatementDeleteByEventAndListenerId = SQL_STATEMENT_DELETE_BY_EVENT_AND_LISTENER_ID.formatted(table);
this.sqlStatementDeleteById = SQL_STATEMENT_DELETE_BY_ID.formatted(table);
this.sqlStatementDeleteUncompleted = SQL_STATEMENT_DELETE_UNCOMPLETED.formatted(table);
this.sqlStatementDeleteUncompletedBefore = SQL_STATEMENT_DELETE_UNCOMPLETED_BEFORE.formatted(table);
}
Expand Down Expand Up @@ -222,10 +239,20 @@ public TargetEventPublication create(TargetEventPublication publication) {
@Transactional
public void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate) {

operations.update(sqlStatementUpdateByEventAndListenerId, //
Timestamp.from(completionDate), //
identifier.getValue(), //
serializer.serialize(event));
var targetIdentifier = identifier.getValue();
var serializedEvent = serializer.serialize(event);

if (settings.isDeleteCompletion()) {

operations.update(sqlStatementDeleteByEventAndListenerId, targetIdentifier, serializedEvent);

} else {

operations.update(sqlStatementUpdateByEventAndListenerId, //
Timestamp.from(completionDate), //
targetIdentifier, //
serializedEvent);
}
}

/*
Expand All @@ -235,7 +262,12 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,
@Override
@Transactional
public void markCompleted(UUID identifier, Instant completionDate) {
operations.update(sqlStatementUpdateById, Timestamp.from(completionDate), uuidToDatabase(identifier));

if (settings.isDeleteCompletion()) {
operations.update(sqlStatementDeleteById, uuidToDatabase(identifier));
} else {
operations.update(sqlStatementUpdateById, Timestamp.from(completionDate), uuidToDatabase(identifier));
}
}

/*
Expand Down Expand Up @@ -294,7 +326,7 @@ public List<TargetEventPublication> findIncompletePublicationsPublishedBefore(In
@Override
public void deletePublications(List<UUID> identifiers) {

var dbIdentifiers = identifiers.stream().map(databaseType::uuidToDatabase).toList();
var dbIdentifiers = identifiers.stream().map(this::uuidToDatabase).toList();

batch(dbIdentifiers, DELETE_BATCH_SIZE)
.forEach(it -> operations.update(sqlStatementDelete.concat(toParameterPlaceholders(it.length)), it));
Expand Down Expand Up @@ -376,11 +408,11 @@ private TargetEventPublication resultSetToPublication(ResultSet rs) throws SQLEx
}

private Object uuidToDatabase(UUID id) {
return databaseType.uuidToDatabase(id);
return settings.getDatabaseType().uuidToDatabase(id);
}

private UUID getUuidFromResultSet(ResultSet rs) throws SQLException {
return databaseType.databaseToUUID(rs.getObject("ID"));
return settings.getDatabaseType().databaseToUUID(rs.getObject("ID"));
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.jdbc;

import org.springframework.lang.Nullable;
import org.springframework.modulith.events.support.CompletionMode;
import org.springframework.util.Assert;

/**
* Internal abstraction of customization options for {@link JdbcEventPublicationRepository}.
*
* @author Oliver Drotbohm
* @since 1.3
* @soundtrack Jeff Coffin - Bom Bom (Only the Horizon)
*/
public class JdbcRepositorySettings {

private final DatabaseType databaseType;
private final String schema;
private final CompletionMode completionMode;

/**
* Creates a new {@link JdbcRepositorySettings} for the given {@link DatabaseType}, {@link CompletionMode} and schema
*
* @param databaseType must not be {@literal null}.
* @param schema can be {@literal null}
* @param completionMode must not be {@literal null}.
*/
JdbcRepositorySettings(DatabaseType databaseType, CompletionMode completionMode, @Nullable String schema) {

Assert.notNull(databaseType, "Database type must not be null!");
Assert.notNull(completionMode, "Completion mode must not be null!");

this.databaseType = databaseType;
this.schema = schema;
this.completionMode = completionMode;
}

/**
* Returns the {@link DatabaseType}.
*
* @return will never be {@literal null}.
*/
public DatabaseType getDatabaseType() {
return databaseType;
}

/**
* Return the schema to be used.
*
* @return can be {@literal null}.
*/
@Nullable
public String getSchema() {
return schema;
}

/**
* Returns whether we use the deleting completion mode.
*/
public boolean isDeleteCompletion() {
return completionMode == CompletionMode.DELETE;
}
}
Loading

0 comments on commit cd0b5cf

Please sign in to comment.