Skip to content

Commit

Permalink
WIP optimisations
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed Jul 10, 2024
1 parent b4943b1 commit cc3ba27
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 94 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package fr.maif.eventsourcing;

import fr.maif.concurrent.CompletionStages;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.collection.List;
import io.vavr.collection.Map;
import io.vavr.concurrent.Future;
import io.vavr.control.Option;

Expand All @@ -13,6 +15,8 @@ public interface AggregateStore<S extends State<S>, Id, TxCtx> {

CompletionStage<Option<S>> getAggregate(Id entityId);

CompletionStage<Map<String, Option<S>>> getAggregates(TxCtx ctx, List<String> entityIds);

CompletionStage<Option<S>> getAggregate(TxCtx ctx, Id entityId);

default CompletionStage<Tuple0> storeSnapshot(TxCtx transactionContext, Id id, Option<S> state) {
Expand All @@ -23,6 +27,10 @@ default CompletionStage<Option<S>> getSnapshot(TxCtx transactionContext, Id id)
return CompletableFuture.completedStage(Option.none());
}

default CompletionStage<List<S>> getSnapshots(TxCtx transactionContext, List<Id> ids) {
return CompletableFuture.completedStage(List.empty());
}

default <E extends Event> CompletionStage<Option<S>> buildAggregateAndStoreSnapshot(TxCtx ctx, EventHandler<S, E> eventHandler, Option<S> state, Id id, List<E> events, Option<Long> lastSequenceNum) {

Option<S> newState = eventHandler.deriveState(state, events.filter(event -> event.entityId().equals(id)));
Expand Down
188 changes: 95 additions & 93 deletions thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.vavr.Tuple3;
import io.vavr.Value;
import io.vavr.collection.List;
import io.vavr.collection.Map;
import io.vavr.collection.Seq;
import io.vavr.control.Either;
import io.vavr.control.Option;
Expand All @@ -20,12 +21,14 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

import static io.vavr.API.List;
import static io.vavr.API.None;
import static io.vavr.API.Tuple;
import static fr.maif.concurrent.CompletionStages.traverse;
import static java.util.function.Function.identity;

public class EventProcessorImpl<Error, S extends State<S>, C extends Command<Meta, Context>, E extends Event, TxCtx, Message, Meta, Context> implements EventProcessor<Error, S, C, E, TxCtx, Message, Meta, Context> {

Expand Down Expand Up @@ -68,90 +71,92 @@ public CompletionStage<List<Either<Error, ProcessingSuccess<S, E, Meta, Context,
@Override
public CompletionStage<InTransactionResult<List<Either<Error, ProcessingSuccess<S, E, Meta, Context, Message>>>>> batchProcessCommand(TxCtx ctx, List<C> commands) {
// Collect all states from db
return traverseCommands(commands, (c, events) ->
this.getCurrentState(ctx, c, events).thenCompose(mayBeState ->
//handle command with state to get events
handleCommand(ctx, mayBeState, c)
// Return command + state + (error or events)
.thenApply(r -> Tuple(c, mayBeState, r))
)
)
.thenCompose(commandsAndResults -> {
// Extract errors from command handling
List<Either<Error, ProcessingSuccess<S, E, Meta, Context, Message>>> errors = commandsAndResults
.map(Tuple3::_3)
.filter(Either::isLeft)
.map(e -> Either.left(e.swap().get()));

// Extract success and generate envelopes for each result
CompletionStage<List<CommandStateAndEvent>> success = traverse(commandsAndResults.filter(t -> t._3.isRight()), t -> {
C command = t._1;
Option<S> mayBeState = t._2;
List<E> events = t._3.get().events.toList();
return buildEnvelopes(ctx, command, events).thenApply(eventEnvelopes -> {
Option<Long> mayBeLastSeqNum = eventEnvelopes.lastOption().map(evl -> evl.sequenceNum);
return new CommandStateAndEvent(command, mayBeState, eventEnvelopes, events, t._3.get().message, mayBeLastSeqNum);
});
});

return success.thenApply(s -> Tuple(s.toList(), errors));
})
.thenCompose(successAndErrors -> {

List<Either<Error, ProcessingSuccess<S, E, Meta, Context, Message>>> errors = successAndErrors._2;
List<CommandStateAndEvent> success = successAndErrors._1;

// Get all envelopes
List<EventEnvelope<E, Meta, Context>> envelopes = success.flatMap(CommandStateAndEvent::getEventEnvelopes);

CompletionStage<Seq<ProcessingSuccess<S, E, Meta, Context, Message>>> stored = eventStore
// Persist all envelopes
.persist(ctx, envelopes)
.thenCompose(__ ->
// Persist states
traverse(success, s -> {
LOGGER.debug("Storing state {} to DB", s);
List<Long> sequences = envelopes.filter(env -> env.entityId.equals(s.command.entityId().get())).map(env -> env.sequenceNum);
return aggregateStore
.buildAggregateAndStoreSnapshot(
ctx,
eventHandler,
s.getState(),
s.getCommand().entityId().get(),
s.getEvents(),
sequences.max()
)
.thenApply(mayBeNextState ->
new ProcessingSuccess<>(s.state, mayBeNextState, s.getEventEnvelopes(), s.getMessage())
);
})
)
.thenCompose(mayBeNextState ->
// Apply events to projections
traverse(projections, p -> {
LOGGER.debug("Applying envelopes {} to projection", envelopes);
return p.storeProjection(ctx, envelopes);
})
.thenApply(__ -> mayBeNextState)
);
return stored.thenApply(results ->
errors.appendAll(results.map(Either::right))
);
})
.thenApply(results -> {
Supplier<CompletionStage<Tuple0>> postTransactionProcess = () -> {
List<EventEnvelope<E, Meta, Context>> envelopes = results.flatMap(Value::toList).flatMap(ProcessingSuccess::getEvents);
LOGGER.debug("Publishing events {} to kafka", envelopes);
return eventStore.publish(envelopes)
.thenApply(__ -> Tuple.empty())
.exceptionally(e -> Tuple.empty());
};
var inTransactionResult = new InTransactionResult<>(
results,
postTransactionProcess
);
return inTransactionResult;
});
return aggregateStore.getAggregates(ctx, commands.filter(c -> c.hasId()).map(c -> c.entityId().get()))
.thenCompose(states ->
traverseCommands(commands, (c, events) -> {
//handle command with state to get events
Option<S> mayBeState = this.getCurrentState(ctx, states, c, events);
return handleCommand(ctx, mayBeState, c)
// Return command + state + (error or events)
.thenApply(r -> Tuple(c, mayBeState, r));
})
.thenCompose(commandsAndResults -> {
// Extract errors from command handling
List<Either<Error, ProcessingSuccess<S, E, Meta, Context, Message>>> errors = commandsAndResults
.map(Tuple3::_3)
.filter(Either::isLeft)
.map(e -> Either.left(e.swap().get()));

// Extract success and generate envelopes for each result
CompletionStage<List<CommandStateAndEvent>> success = traverse(commandsAndResults.filter(t -> t._3.isRight()), t -> {
C command = t._1;
Option<S> mayBeState = t._2;
List<E> events = t._3.get().events.toList();
return buildEnvelopes(ctx, command, events).thenApply(eventEnvelopes -> {
Option<Long> mayBeLastSeqNum = eventEnvelopes.lastOption().map(evl -> evl.sequenceNum);
return new CommandStateAndEvent(command, mayBeState, eventEnvelopes, events, t._3.get().message, mayBeLastSeqNum);
});
});

return success.thenApply(s -> Tuple(s.toList(), errors));
})
.thenCompose(successAndErrors -> {

List<Either<Error, ProcessingSuccess<S, E, Meta, Context, Message>>> errors = successAndErrors._2;
List<CommandStateAndEvent> success = successAndErrors._1;

// Get all envelopes
List<EventEnvelope<E, Meta, Context>> envelopes = success.flatMap(CommandStateAndEvent::getEventEnvelopes);

CompletionStage<Seq<ProcessingSuccess<S, E, Meta, Context, Message>>> stored = eventStore
// Persist all envelopes
.persist(ctx, envelopes)
.thenCompose(__ ->
// Persist states
traverse(success, s -> {
LOGGER.debug("Storing state {} to DB", s);
List<Long> sequences = envelopes.filter(env -> env.entityId.equals(s.command.entityId().get())).map(env -> env.sequenceNum);
return aggregateStore
.buildAggregateAndStoreSnapshot(
ctx,
eventHandler,
s.getState(),
s.getCommand().entityId().get(),
s.getEvents(),
sequences.max()
)
.thenApply(mayBeNextState ->
new ProcessingSuccess<>(s.state, mayBeNextState, s.getEventEnvelopes(), s.getMessage())
);
})
)
.thenCompose(mayBeNextState ->
// Apply events to projections
traverse(projections, p -> {
LOGGER.debug("Applying envelopes {} to projection", envelopes);
return p.storeProjection(ctx, envelopes);
})
.thenApply(__ -> mayBeNextState)
);
return stored.thenApply(results ->
errors.appendAll(results.map(Either::right))
);
})
.thenApply(results -> {
Supplier<CompletionStage<Tuple0>> postTransactionProcess = () -> {
List<EventEnvelope<E, Meta, Context>> envelopes = results.flatMap(Value::toList).flatMap(ProcessingSuccess::getEvents);
LOGGER.debug("Publishing events {} to kafka", envelopes);
return eventStore.publish(envelopes)
.thenApply(__ -> Tuple.empty())
.exceptionally(e -> Tuple.empty());
};
var inTransactionResult = new InTransactionResult<>(
results,
postTransactionProcess
);
return inTransactionResult;
})
);
}

public CompletionStage<List<Tuple3<C, Option<S>, Either<Error, Events<E, Message>>>>> traverseCommands(List<C> elements, BiFunction<C, List<E>, CompletionStage<Tuple3<C, Option<S>, Either<Error, Events<E, Message>>>>> handler) {
Expand All @@ -160,9 +165,9 @@ public CompletionStage<List<Tuple3<C, Option<S>, Either<Error, Events<E, Message
(fResult, elt) ->
fResult.thenCompose(listResult -> handler.apply(elt, listResult._2.flatMap(e -> e.events))
.thenApply(r ->
Tuple(
listResult._1.append(r),
listResult._2.append(r._3.getOrElse(Events.empty())))
Tuple(
listResult._1.append(r),
listResult._2.append(r._3.getOrElse(Events.empty())))
))
).thenApply(t -> t._1);
}
Expand All @@ -179,15 +184,12 @@ private CompletionStage<Either<Error, Events<E, Message>>> handleCommand(TxCtx t
return commandHandler.handleCommand(txCtx, state, command);
}

private CompletionStage<Option<S>> getCurrentState(TxCtx ctx, C command, List<E> previousEvent) {
private Option<S> getCurrentState(TxCtx ctx, Map<String, Option<S>> states, C command, List<E> previousEvent) {
if (command.hasId()) {
String entityId = command.entityId().get();
return aggregateStore.getAggregate(ctx, entityId)
.thenApply(state ->
eventHandler.deriveState(state, previousEvent.filter(e -> e.entityId().equals(entityId)))
);
return eventHandler.deriveState(states.get(entityId).flatMap(identity()), previousEvent.filter(e -> e.entityId().equals(entityId)));
} else {
return CompletionStages.successful(None());
return None();
}
}

Expand Down
12 changes: 12 additions & 0 deletions thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import fr.maif.concurrent.CompletionStages;
import io.vavr.Tuple0;
import io.vavr.Tuple2;
import io.vavr.Value;
import io.vavr.collection.List;
import io.vavr.control.Option;
Expand Down Expand Up @@ -76,6 +77,7 @@ class Query {
public final Long sequenceFrom;
public final Long sequenceTo;
public final Boolean published;
public final List<Tuple2<String, Long>> idsAndSequences;

private Query(Query.Builder builder) {
this.dateFrom = builder.dateFrom;
Expand All @@ -87,6 +89,7 @@ private Query(Query.Builder builder) {
this.published = builder.published;
this.sequenceFrom = builder.sequenceFrom;
this.sequenceTo = builder.sequenceTo;
this.idsAndSequences = Objects.requireNonNullElse(builder.idsAndSequences, List.empty());
}

public static Builder builder() {
Expand Down Expand Up @@ -125,6 +128,10 @@ public Option<Long> sequenceTo() {
return Option.of(sequenceTo);
}

public List<Tuple2<String, Long>> idsAndSequences() {
return idsAndSequences;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -156,6 +163,7 @@ public static class Builder {
Boolean published;
Long sequenceFrom;
Long sequenceTo;
List<Tuple2<String, Long>> idsAndSequences;

public Builder withDateFrom(LocalDateTime dateFrom) {
this.dateFrom = dateFrom;
Expand Down Expand Up @@ -201,6 +209,10 @@ public Builder withSequenceTo(Long sequenceTo) {
this.sequenceTo = sequenceTo;
return this;
}
public Builder withIdsAndSequences(List<Tuple2<String, Long>> idsAndSequences) {
this.idsAndSequences = idsAndSequences;
return this;
}

public Query build() {
return new Query(this);
Expand Down
2 changes: 2 additions & 0 deletions thoth-core/src/main/java/fr/maif/eventsourcing/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

public interface State<S> {

String entityId();

Long sequenceNum();

S withSequenceNum(Long sequenceNum);
Expand Down
Loading

0 comments on commit cc3ba27

Please sign in to comment.