Skip to content

Commit

Permalink
Pb with java MinimalCompletionStage (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso authored May 15, 2024
1 parent e6fb711 commit 2a62340
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
6 changes: 5 additions & 1 deletion jooq-async-api/src/main/java/fr/maif/jooq/PgAsyncPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ default <T> CompletionStage<T> inTransaction(Function<PgAsyncTransaction, Comple
action.apply(t)
.thenCompose(r -> t.commit().thenApply(__ -> r))
.exceptionallyCompose(e ->
t.rollback().thenCompose(__ -> CompletableFuture.failedStage(e))
t.rollback().thenCompose(__ -> {
CompletableFuture<T> cf = new CompletableFuture<>();
cf.completeExceptionally(e);
return cf;
})
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ public <R extends Record> CompletionStage<Option<QueryResult>> queryOne(Function
return rawPreparedQuery(queryFunction).thenCompose(res -> {
switch (res.size()) {
case 0:
return CompletableFuture.completedStage(Option.none());
return completedStage(Option.none());
case 1:
return CompletableFuture.completedStage(Option.of(new ReactiveRowQueryResult(res.iterator().next())));
return completedStage(Option.of(new ReactiveRowQueryResult(res.iterator().next())));
default:
return CompletableFuture.failedStage(new TooManyRowsException(String.format("Found more than one row: %d", res.size())));
return failedStage(new TooManyRowsException(String.format("Found more than one row: %d", res.size())));
}
});
}
Expand All @@ -96,9 +96,9 @@ public CompletionStage<Integer> execute(Function<DSLContext, ? extends Query> qu
public CompletionStage<Long> executeBatch(Function<DSLContext, List<? extends Query>> queryFunction) {
List<? extends Query> queries = queryFunction.apply(DSL.using(configuration));
if (queries.isEmpty()) {
return CompletableFuture.completedStage(0L);
return completedStage(0L);
}
return queries.foldLeft(CompletableFuture.completedStage(0L), (acc, query) ->
return queries.foldLeft(completedStage(0L), (acc, query) ->
acc.thenCompose(count -> {
log(query);
String preparedQuery = toPreparedQuery(query);
Expand All @@ -113,7 +113,7 @@ public CompletionStage<Long> executeBatch(Function<DSLContext, List<? extends Qu
@Override
public CompletionStage<Long> executeBatch(Function<DSLContext, ? extends Query> queryFunction, List<List<Object>> values) {
if (values.isEmpty()) {
return CompletableFuture.completedStage(0L);
return completedStage(0L);
}
CompletableFuture<RowSet<Row>> rowFuture = new CompletableFuture<>();
try {
Expand Down Expand Up @@ -274,4 +274,16 @@ JsonNode readJson(String json) {
throw new RuntimeException("Error parsing json "+json, e);
}
}

static <T> CompletionStage<T> completedStage(T value) {
CompletableFuture<T> cf = new CompletableFuture<>();
cf.complete(value);
return cf;
}

static <T> CompletionStage<T> failedStage(Throwable throwable) {
CompletableFuture<T> cf = new CompletableFuture<>();
cf.completeExceptionally(throwable);
return cf;
}
}

0 comments on commit 2a62340

Please sign in to comment.