Skip to content

Commit

Permalink
GH-751 - Optimize publication completion by event and target identifier.
Browse files Browse the repository at this point in the history
We now additionally guard the completion query by event and target identifier to also only apply to publications that have not been completed yet. This will allow databases to optimize the query plan to apply simple comparisons (the date being null) over complex comparisons (the event payload) to reduce the intermediate results to process further and thus improve performance.
  • Loading branch information
odrotbohm authored and ciberkleid committed Aug 6, 2024
1 parent 290cad8 commit ea7b299
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ INSERT INTO EVENT_PUBLICATION (ID, EVENT_TYPE, LISTENER_ID, PUBLICATION_DATE, SE
SET COMPLETION_DATE = ?
WHERE
LISTENER_ID = ?
AND COMPLETION_DATE IS NULL
AND SERIALIZED_EVENT = ?
""";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.jdbc;

import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.time.Instant;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.JdbcTest;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.modulith.events.core.EventPublicationRepository;
import org.springframework.modulith.events.core.EventSerializer;
import org.springframework.modulith.events.core.PublicationTargetIdentifier;
import org.springframework.modulith.events.core.TargetEventPublication;
import org.springframework.modulith.testapp.TestApplication;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.bean.override.mockito.MockitoBean;
import org.springframework.util.StopWatch;
import org.testcontainers.junit.jupiter.Testcontainers;

/**
* Not an actual test but can be executed in case of suspected regressions.
*
* @author Oliver Drotbohm
*/
@JdbcTest(properties = "spring.modulith.events.jdbc.schema-initialization.enabled=true")
@Testcontainers(disabledWithoutDocker = true)
@ContextConfiguration(classes = { TestApplication.class, JdbcEventPublicationAutoConfiguration.class })
@ActiveProfiles("postgres")
class JdbcEventPublicationRepositoryPerformance {

@Autowired JdbcOperations operations;
@Autowired EventPublicationRepository repository;
@MockitoBean EventSerializer serializer;

TargetEventPublication toComplete;
Instant now = Instant.now();
List<TargetEventPublication> publications;

int number = 10000;
int fractionCompleted = 95;
String serializedEvent = "{\"eventId\":\"id\"}";

@BeforeEach
void setUp() {

operations.update("DELETE FROM event_publication");

when(serializer.serialize(any())).thenReturn(serializedEvent);

publications = IntStream.range(0, number)
.mapToObj(it -> TargetEventPublication.of(new SampleEvent(),
PublicationTargetIdentifier.of(UUID.randomUUID().toString())))
.map(repository::create)
.toList();

new Random()
.ints(number / 100 * fractionCompleted, 0, number)
.mapToObj(publications::get)
.forEach(it -> repository.markCompleted(it, Instant.now()));

do {

var candidate = publications.get(new Random().nextInt(number));

if (!candidate.isPublicationCompleted()) {
toComplete = candidate;
}

} while (toComplete == null);
}

@Test
void marksPublicationAsCompletedById() {

runWithMeasurement("By id", () -> repository.markCompleted(toComplete.getIdentifier(), now));
}

@Test
void marksPublicationAsCompleted() {

runWithMeasurement("By event and target identifier",
() -> repository.markCompleted(toComplete.getEvent(), toComplete.getTargetIdentifier(), now));
}

void runWithMeasurement(String prefix, Runnable runnable) {

var watch = new StopWatch();
watch.start();

runnable.run();

watch.stop();

System.out.println(prefix + " took: " + watch.lastTaskInfo().getTime(TimeUnit.MILLISECONDS) + "ms");
}

static class SampleEvent {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class JpaEventPublicationRepository implements EventPublicationRepository {
set p.completionDate = ?3
where p.serializedEvent = ?1
and p.listenerId = ?2
and p.completionDate is null
""";

private static final String DELETE = """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
private static final Statement COMPLETE_STATEMENT = Cypher.match(EVENT_PUBLICATION_NODE)
.where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(Cypher.parameter(EVENT_HASH)))
.and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(Cypher.parameter(LISTENER_ID)))
.and(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNull())
.set(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).to(Cypher.parameter(COMPLETION_DATE)))
.build();

Expand Down

0 comments on commit ea7b299

Please sign in to comment.