From ea7b2995255ef1ce68708f83d225a9f7b769db13 Mon Sep 17 00:00:00 2001 From: Oliver Drotbohm Date: Fri, 2 Aug 2024 18:17:39 +0200 Subject: [PATCH] GH-751 - Optimize publication completion by event and target identifier. We now additionally guard the completion query by event and target identifier to also only apply to publications that have not been completed yet. This will allow databases to optimize the query plan to apply simple comparisons (the date being null) over complex comparisons (the event payload) to reduce the intermediate results to process further and thus improve performance. --- .../jdbc/JdbcEventPublicationRepository.java | 1 + ...EventPublicationRepositoryPerformance.java | 122 ++++++++++++++++++ .../jpa/JpaEventPublicationRepository.java | 1 + .../Neo4jEventPublicationRepository.java | 1 + 4 files changed, 125 insertions(+) create mode 100644 spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryPerformance.java diff --git a/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java index c47250f61..53d1f5f5c 100644 --- a/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java @@ -85,6 +85,7 @@ INSERT INTO EVENT_PUBLICATION (ID, EVENT_TYPE, LISTENER_ID, PUBLICATION_DATE, SE SET COMPLETION_DATE = ? WHERE LISTENER_ID = ? + AND COMPLETION_DATE IS NULL AND SERIALIZED_EVENT = ? """; diff --git a/spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryPerformance.java b/spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryPerformance.java new file mode 100644 index 000000000..7e294913c --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryPerformance.java @@ -0,0 +1,122 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.modulith.events.jdbc; + +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import java.time.Instant; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.jdbc.JdbcTest; +import org.springframework.jdbc.core.JdbcOperations; +import org.springframework.modulith.events.core.EventPublicationRepository; +import org.springframework.modulith.events.core.EventSerializer; +import org.springframework.modulith.events.core.PublicationTargetIdentifier; +import org.springframework.modulith.events.core.TargetEventPublication; +import org.springframework.modulith.testapp.TestApplication; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.bean.override.mockito.MockitoBean; +import org.springframework.util.StopWatch; +import org.testcontainers.junit.jupiter.Testcontainers; + +/** + * Not an actual test but can be executed in case of suspected regressions. + * + * @author Oliver Drotbohm + */ +@JdbcTest(properties = "spring.modulith.events.jdbc.schema-initialization.enabled=true") +@Testcontainers(disabledWithoutDocker = true) +@ContextConfiguration(classes = { TestApplication.class, JdbcEventPublicationAutoConfiguration.class }) +@ActiveProfiles("postgres") +class JdbcEventPublicationRepositoryPerformance { + + @Autowired JdbcOperations operations; + @Autowired EventPublicationRepository repository; + @MockitoBean EventSerializer serializer; + + TargetEventPublication toComplete; + Instant now = Instant.now(); + List publications; + + int number = 10000; + int fractionCompleted = 95; + String serializedEvent = "{\"eventId\":\"id\"}"; + + @BeforeEach + void setUp() { + + operations.update("DELETE FROM event_publication"); + + when(serializer.serialize(any())).thenReturn(serializedEvent); + + publications = IntStream.range(0, number) + .mapToObj(it -> TargetEventPublication.of(new SampleEvent(), + PublicationTargetIdentifier.of(UUID.randomUUID().toString()))) + .map(repository::create) + .toList(); + + new Random() + .ints(number / 100 * fractionCompleted, 0, number) + .mapToObj(publications::get) + .forEach(it -> repository.markCompleted(it, Instant.now())); + + do { + + var candidate = publications.get(new Random().nextInt(number)); + + if (!candidate.isPublicationCompleted()) { + toComplete = candidate; + } + + } while (toComplete == null); + } + + @Test + void marksPublicationAsCompletedById() { + + runWithMeasurement("By id", () -> repository.markCompleted(toComplete.getIdentifier(), now)); + } + + @Test + void marksPublicationAsCompleted() { + + runWithMeasurement("By event and target identifier", + () -> repository.markCompleted(toComplete.getEvent(), toComplete.getTargetIdentifier(), now)); + } + + void runWithMeasurement(String prefix, Runnable runnable) { + + var watch = new StopWatch(); + watch.start(); + + runnable.run(); + + watch.stop(); + + System.out.println(prefix + " took: " + watch.lastTaskInfo().getTime(TimeUnit.MILLISECONDS) + "ms"); + } + + static class SampleEvent {}; +} diff --git a/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java index d773d1607..f8fabc66a 100644 --- a/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java @@ -83,6 +83,7 @@ class JpaEventPublicationRepository implements EventPublicationRepository { set p.completionDate = ?3 where p.serializedEvent = ?1 and p.listenerId = ?2 + and p.completionDate is null """; private static final String DELETE = """ diff --git a/spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepository.java index 3c25763bf..020602b57 100644 --- a/spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepository.java @@ -107,6 +107,7 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository { private static final Statement COMPLETE_STATEMENT = Cypher.match(EVENT_PUBLICATION_NODE) .where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(Cypher.parameter(EVENT_HASH))) .and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(Cypher.parameter(LISTENER_ID))) + .and(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNull()) .set(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).to(Cypher.parameter(COMPLETION_DATE))) .build();