Skip to content

Commit

Permalink
WIP optimisations
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed Jul 10, 2024
1 parent 52b8cf1 commit 2414065
Show file tree
Hide file tree
Showing 13 changed files with 62 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ public Long sequenceNum() {
return sequenceNum;
}

@Override
public String entityId() {
return id;
}

@Override
public Account withSequenceNum(Long sequenceNum) {
this.sequenceNum = sequenceNum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ public String getId() {
return id;
}

@Override
public String entityId() {
return id;
}

public BigDecimal getBalance() {
return balance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ public Long sequenceNum() {
return sequenceNum;
}

@Override
public String entityId() {
return id;
}

@Override
public Account withSequenceNum(Long sequenceNum) {
this.sequenceNum = sequenceNum;
Expand Down
5 changes: 5 additions & 0 deletions sample/src/main/java/fr/maif/thoth/sample/state/Account.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ public class Account extends AbstractState<Account> {
public BigDecimal balance;
public long sequenceNum;

@Override
public String entityId() {
return id;
}

public static class AccountBuilder{
String id;
BigDecimal balance;
Expand Down
5 changes: 5 additions & 0 deletions thoth-core-akka/src/test/java/fr/maif/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,11 @@ public Viking(String id, String name, Long sequenceNum) {
this.sequenceNum = sequenceNum;
}

@Override
public String entityId() {
return id;
}

@Override
public Long sequenceNum() {
return sequenceNum;
Expand Down
5 changes: 5 additions & 0 deletions thoth-core-reactor/src/test/java/fr/maif/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,11 @@ public Viking(String id, String name, Integer age, Long sequenceNum) {
this.sequenceNum = sequenceNum;
}

@Override
public String entityId() {
return id;
}

@Override
public Long sequenceNum() {
return sequenceNum;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package fr.maif.eventsourcing;

public class AbstractState<T> implements State<T> {
public abstract class AbstractState<T> implements State<T> {
protected long sequenceNum;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import fr.maif.concurrent.CompletionStages;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.collection.HashMap;
import io.vavr.collection.List;
import io.vavr.collection.Map;
import io.vavr.control.Option;
Expand All @@ -13,7 +14,10 @@ public interface AggregateStore<S extends State<S>, Id, TxCtx> {

CompletionStage<Option<S>> getAggregate(Id entityId);

CompletionStage<Map<String, Option<S>>> getAggregates(TxCtx ctx, List<String> entityIds);
default CompletionStage<Map<Id, Option<S>>> getAggregates(TxCtx ctx, List<Id> entityIds) {
return CompletionStages.traverse(entityIds, id -> getAggregate(ctx, id).thenApply(agg -> Tuple.of(id, agg)))
.thenApply(HashMap::ofEntries);
}

CompletionStage<Option<S>> getAggregate(TxCtx ctx, Id entityId);

Expand Down
5 changes: 5 additions & 0 deletions thoth-core/src/test/java/fr/maif/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ public Long sequenceNum() {
return sequenceNum;
}

@Override
public String entityId() {
return id;
}

@Override
public Viking withSequenceNum(Long sequenceNum) {
this.sequenceNum = sequenceNum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.vavr.Tuple0;
import io.vavr.collection.List;
import io.vavr.collection.Seq;
import io.vavr.collection.Traversable;
import io.vavr.concurrent.Future;
import io.vavr.control.Either;
import io.vavr.control.Option;
Expand Down Expand Up @@ -228,7 +229,10 @@ public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQuery(PgAsyncTrans
query.userId().map(USER_ID::eq),
query.published().map(PUBLISHED::eq),
query.sequenceTo().map(SEQUENCE_NUM::le),
query.sequenceFrom().map(SEQUENCE_NUM::ge)
query.sequenceFrom().map(SEQUENCE_NUM::ge),
Option.of(query.idsAndSequences()).filter(Traversable::nonEmpty).map(l ->
l.map(t -> SEQUENCE_NUM.ge(t._2).and(ENTITY_ID.eq(t._1))).reduce(Condition::or)
)
).flatMap(identity());

return Source.fromPublisher(tx.stream(500, dsl -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.vavr.Tuple0;
import io.vavr.collection.List;
import io.vavr.collection.Seq;
import io.vavr.collection.Traversable;
import io.vavr.control.Either;
import io.vavr.control.Option;
import io.vavr.control.Try;
Expand Down Expand Up @@ -260,7 +261,10 @@ public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQuery(Tx tx, Query
query.userId().map(USER_ID::eq),
query.published().map(PUBLISHED::eq),
query.sequenceTo().map(SEQUENCE_NUM::le),
query.sequenceFrom().map(SEQUENCE_NUM::ge)
query.sequenceFrom().map(SEQUENCE_NUM::ge),
Option.of(query.idsAndSequences()).filter(Traversable::nonEmpty).map(l ->
l.map(t -> SEQUENCE_NUM.ge(t._2).and(ENTITY_ID.eq(t._1))).reduce(Condition::or)
)
).flatMap(identity());

return Flux.from(tx.stream(500, dsl -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,10 @@ public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQueryWithOptions(C
query.userId().map(d -> field(" user_id").eq(d)),
query.published().map(d -> field(" published").eq(d)),
query.sequenceTo().map(d -> field(" sequence_num").lessOrEqual(d)),
query.sequenceFrom().map(d -> field(" sequence_num").greaterOrEqual(d))
query.sequenceFrom().map(d -> field(" sequence_num").greaterOrEqual(d)),
Option.of(query.idsAndSequences()).filter(Traversable::nonEmpty).map(l ->
l.map(t -> field(" sequence_num").greaterOrEqual(t._2).and(field(" entity_id").eq(t._1))).reduce(Condition::or)
)
).flatMap(identity());

var tmpJooqQuery = DSL.using(tx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ public class TestState extends AbstractState<TestState> {
public final String id;
public final int count;


public TestState(String id, int count) {
this.id = id;
this.count = count;
}

@Override
public String entityId() {
return id;
}


}

0 comments on commit 2414065

Please sign in to comment.