Skip to content

Commit

Permalink
Optimisations
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed Jul 12, 2024
1 parent ed043a5 commit 9d2f716
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public CompletionStage<Long> nextSequence(Tuple0 tx) {
return CompletionStages.completedStage(sequence_num.incrementAndGet());
}

@Override
public CompletionStage<List<Long>> nextSequences(Tuple0 tx, Integer count) {
return CompletionStages.completedStage(List.range(0, count).map(any -> sequence_num.incrementAndGet()));
}

@Override
public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events) {
events.forEach(queue::offer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ public CompletionStage<Long> nextSequence(InMemoryEventStore.Transaction<E, Meta
return CompletionStages.completedStage(sequenceNums.accumulateAndGet(value, Math::max));
}

@Override
public CompletionStage<List<Long>> nextSequences(InMemoryEventStore.Transaction<E, Meta, Context> tx, Integer count) {
return CompletionStages.traverse(List.range(0, count), c -> nextSequence(tx));
}

@Override
public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events) {
events.forEach(e -> store.put(e.sequenceNum, e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,11 @@ public CompletionStage<List<Tuple3<C, Option<S>, Either<Error, Events<E, Message
CompletionStage<List<EventEnvelope<E, Meta, Context>>> buildEnvelopes(TxCtx tx, C command, List<E> events) {
String transactionId = transactionManager.transactionId();
int nbMessages = events.length();
return traverse(events.zipWithIndex(),
t -> buildEnvelope(tx, command, t._1, t._2, nbMessages, transactionId)
).thenApply(Value::toList);
return eventStore.nextSequences(tx, events.size()).thenApply(s ->
events.zip(s).zipWithIndex().map(t ->
buildEnvelope(tx, command, t._1._1, t._1._2, t._2, nbMessages, transactionId)
)
);
}

private CompletionStage<Either<Error, Events<E, Message>>> handleCommand(TxCtx txCtx, Option<S> state, C command) {
Expand All @@ -193,32 +195,29 @@ private Option<S> getCurrentState(TxCtx ctx, Map<String, Option<S>> states, C co
}
}

private CompletionStage<EventEnvelope<E, Meta, Context>> buildEnvelope(TxCtx tx, Command<Meta, Context> command, E event, Integer numMessage, Integer nbMessages, String transactionId) {
private EventEnvelope<E, Meta, Context> buildEnvelope(TxCtx tx, Command<Meta, Context> command, E event, Long nextSequence, Integer numMessage, Integer nbMessages, String transactionId) {
LOGGER.debug("Writing event {} to envelope", event);


UUID id = UUIDgen.generate();

return eventStore.nextSequence(tx).thenApply(nextSequence -> {
EventEnvelope.Builder<E, Meta, Context> builder = EventEnvelope.<E, Meta, Context>builder()
.withId(id)
.withEmissionDate(LocalDateTime.now())
.withEntityId(event.entityId())
.withSequenceNum(nextSequence)
.withEventType(event.type().name())
.withVersion(event.type().version())
.withTotalMessageInTransaction(nbMessages)
.withNumMessageInTransaction(numMessage + 1)
.withTransactionId(transactionId)
.withEvent(event);

command.context().forEach(builder::withContext);
command.userId().forEach(builder::withUserId);
command.systemId().forEach(builder::withSystemId);
command.metadata().forEach(builder::withMetadata);

return builder.build();
});
EventEnvelope.Builder<E, Meta, Context> builder = EventEnvelope.<E, Meta, Context>builder()
.withId(id)
.withEmissionDate(LocalDateTime.now())
.withEntityId(event.entityId())
.withSequenceNum(nextSequence)
.withEventType(event.type().name())
.withVersion(event.type().version())
.withTotalMessageInTransaction(nbMessages)
.withNumMessageInTransaction(numMessage + 1)
.withTransactionId(transactionId)
.withEvent(event);

command.context().forEach(builder::withContext);
command.userId().forEach(builder::withUserId);
command.systemId().forEach(builder::withSystemId);
command.metadata().forEach(builder::withMetadata);

return builder.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ default Publisher<EventEnvelope<E, Meta, Context>> loadAllEvents() {

CompletionStage<Long> nextSequence(TxCtx tx);

CompletionStage<List<Long>> nextSequences(TxCtx tx, Integer count);

CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events);

CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(TxCtx tx, EventEnvelope<E, Meta, Context> eventEnvelope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,13 @@ public CompletionStage<Long> nextSequence(PgAsyncTransaction tx) {
).thenApply(mayBeResult -> mayBeResult.map(r -> r.get(0, Long.class)).getOrNull());
}

@Override
public CompletionStage<List<Long>> nextSequences(PgAsyncTransaction tx, Integer count) {
return tx.query(dsl ->
dsl.resultQuery("select nextval('" + this.tableNames.sequenceNumName + "') from generate_series(1, {0})", count)
).thenApply(mayBeResult -> mayBeResult.map(r -> r.get(0, Long.class)));
}

@Override
public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events) {
LOGGER.debug("Publishing event {}", events);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
package fr.maif.eventsourcing;

import io.vavr.Tuple;
import io.vavr.Tuple2;
import io.vavr.collection.HashMap;
import io.vavr.collection.List;
import io.vavr.collection.Map;
import io.vavr.collection.Traversable;
import io.vavr.control.Option;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static java.util.function.Function.identity;

public class DefaultReactorAggregateStore<S extends State<S>, E extends Event, Meta, Context, TxCtx> implements ReactorAggregateStore<S, String, TxCtx> {


Expand All @@ -21,6 +31,30 @@ public Mono<Option<S>> getAggregate(String entityId) {
return transactionManager.withTransaction(ctx -> getAggregate(ctx, entityId));
}

@Override
public Mono<Map<String, Option<S>>> getAggregates(TxCtx txCtx, List<String> entityIds) {
return this.getSnapshots(txCtx, entityIds)
.flatMap(snapshots -> {
Map<String, S> indexed = snapshots.groupBy(State::entityId).mapValues(Traversable::head);
List<Tuple2<String, Long>> idsAndSeqNums = entityIds.map(id -> Tuple.of(id, indexed.get(id).map(s -> s.sequenceNum()).getOrElse(0L)));
Map<String, Option<S>> empty = HashMap.ofEntries(entityIds.map(id -> Tuple.of(id, indexed.get(id))));
EventStore.Query query = EventStore.Query.builder().withIdsAndSequences(idsAndSeqNums).build();
Flux<EventEnvelope<E, Meta, Context>> events = this.eventStore.loadEventsByQuery(txCtx, query);
return events.reduce(
empty,
(Map<String, Option<S>> states, EventEnvelope<E, Meta, Context> event) -> {
Option<S> mayBeCurrentState = states.get(event.entityId).flatMap(identity());
return states.put(
event.entityId,
this.eventEventHandler
.applyEvent(mayBeCurrentState, event.event)
.map((S state) -> (S) state.withSequenceNum(event.sequenceNum))
);
}
);
});
}

@Override
public Mono<Option<S>> getAggregate(TxCtx txCtx, String entityId) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,13 @@ public CompletionStage<Long> nextSequence(Tx tx) {
).thenApply(mayBeResult -> mayBeResult.map(r -> r.get(0, Long.class)).getOrNull());
}

@Override
public CompletionStage<List<Long>> nextSequences(Tx tx, Integer count) {
return tx.query(dsl ->
dsl.resultQuery("select nextval('" + this.tableNames.sequenceNumName + "') from generate_series(1, {0})", count)
).thenApply(mayBeResult -> mayBeResult.map(r -> r.get(0, Long.class)));
}

@Override
public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events) {
LOGGER.debug("Publishing event {}", events);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package fr.maif.eventsourcing;

import fr.maif.concurrent.CompletionStages;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.collection.List;
import io.vavr.collection.Map;
import io.vavr.control.Option;
import reactor.core.publisher.Mono;

Expand All @@ -14,6 +16,8 @@ public interface ReactorAggregateStore<S extends State<S>, Id, TxCtx> {

Mono<Option<S>> getAggregate(TxCtx ctx, Id entityId);

Mono<Map<Id, Option<S>>> getAggregates(TxCtx ctx, List<Id> entityIds);

default Mono<Tuple0> storeSnapshot(TxCtx transactionContext, Id id, Option<S> state) {
return Mono.just(Tuple.empty());
}
Expand All @@ -22,6 +26,10 @@ default Mono<Option<S>> getSnapshot(TxCtx transactionContext, Id id) {
return Mono.just(Option.none());
}

default Mono<List<S>> getSnapshots(TxCtx transactionContext, List<Id> ids) {
return Mono.just(List.empty());
}

default <E extends Event> Mono<Option<S>> buildAggregateAndStoreSnapshot(TxCtx ctx, EventHandler<S, E> eventHandler, Option<S> state, Id id, List<E> events, Option<Long> lastSequenceNum) {

Option<S> newState = eventHandler.deriveState(state, events.filter(event -> event.entityId().equals(id)));
Expand All @@ -45,6 +53,11 @@ public CompletionStage<Option<S>> getAggregate(Id entityId) {
public CompletionStage<Option<S>> getAggregate(TxCtx txCtx, Id entityId) {
return _this.getAggregate(txCtx, entityId).toFuture();
}

@Override
public CompletionStage<Map<Id, Option<S>>> getAggregates(TxCtx txCtx, List<Id> entityIds) {
return _this.getAggregates(txCtx, entityIds).toFuture();
}
};
}

Expand All @@ -61,6 +74,11 @@ public Mono<Option<S>> getAggregate(Id entityId) {
public Mono<Option<S>> getAggregate(TxCtx txCtx, Id entityId) {
return Mono.fromCompletionStage(() -> aggregateStore.getAggregate(txCtx, entityId));
}

@Override
public Mono<Map<Id, Option<S>>> getAggregates(TxCtx txCtx, List<Id> entityIds) {
return Mono.fromCompletionStage(() -> aggregateStore.getAggregates(txCtx, entityIds));
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ public void nextSequence() {

}

@Test
public void nextSequences() {
List<Long> seq = inTransaction(ctx -> postgresEventStore.nextSequences(ctx, 5)).toCompletableFuture().join();
assertThat(seq).hasSize(5);
}

protected <T> CompletionStage<T> inTransaction(Function<PgAsyncTransaction, CompletionStage<T>> action) {
return pgAsyncPool.inTransaction(action);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import javax.sql.DataSource;
import java.io.Closeable;
import java.io.IOException;
import java.math.BigInteger;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -218,6 +219,14 @@ public CompletionStage<Long> nextSequence(Connection tx) {
), executor);
}

@Override
public CompletionStage<List<Long>> nextSequences(Connection tx, Integer count) {
return CompletionStages.fromTry(() -> Try.of(() -> {
DSLContext ctx = using(tx);
return List.ofAll(ctx.fetchValues(sequence(name(this.tableNames.sequenceNumName)).nextvals(count))).map(BigInteger::longValue);
}), executor);
}

@Override
public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events) {
return this.eventPublisher.publish(events);
Expand Down

0 comments on commit 9d2f716

Please sign in to comment.