This API is a solution to use jooq with reactive clients for RDBMS.
At the moment there are 2 implementations:
- a blocking jdbc implementation
- a vertx reactive implementation for postgresql only
Jcenter hosts this library.
<dependency>
<groupId>fr.maif</groupId>
<artifactId>jooq-async-jdbc</artifactId>
<version>${version}</version>
</dependency>
OR
<dependency>
<groupId>fr.maif</groupId>
<artifactId>jooq-async-reactive</artifactId>
<version>${version}</version>
</dependency>
implementation "fr.maif:jooq-async-api:${version}"
OR
implementation "fr.maif:jooq-async-reactive:${version}"
The JDBC one :
PGSimpleDataSource dataSource = new PGSimpleDataSource();
dataSource.setUrl(url);
dataSource.setUser(user);
dataSource.setPassword(password);
PgAsyncPool jdbcPgAsyncPool = new JdbcPgAsyncPool(SQLDialect.POSTGRES, dataSource, Executors.newFixedThreadPool(5));
The reactive one :
DefaultConfiguration jooqConfig = new DefaultConfiguration();
jooqConfig.setSQLDialect(SQLDialect.POSTGRES);
PgConnectOptions options = new PgConnectOptions()
.setPort(port)
.setHost(host)
.setDatabase(database)
.setUser(user)
.setPassword(password);
PoolOptions poolOptions = new PoolOptions().setMaxSize(10);
Vertx vertx = Vertx.vertx();
Pool client = PgBuilder.pool()
.using(vertx)
.connectingTo(options)
.with(poolOptions)
.build();
PgAsyncPool reactivePgAsyncPool = new ReactivePgAsyncPool(client, jooqConfig);
The idea is to use the jooq DSL as a builder to write the query. The query is then run against the underlying library.
CompletionStage<Option<String>> futureResult = reactivePgAsyncPool
.queryOne(dsl -> dsl.select(name).from(table).where(name.eq("Ragnar")))
.map(mayBeResult -> mayBeResult.map(row -> row.get(name)));
CompletionStage<List<String>> futureResult = reactivePgAsyncPool
.query(dsl -> dsl.select(name).from(table)))
.map(results -> results.map(row -> row.get(name)));
Publisher<String, NotUsed> stream = reactivePgAsyncPool
.stream(500 /*fetch size*/, dsl -> dsl.select(name).from(table))
.map(q -> q.get(name));
The publisher comes from the reactive streams API.
CompletionStage<Integer> insertResult = reactivePgAsyncPool.inTransaction(t ->
t.execute(dsl -> dsl.insertInto(table).set(name, "test"))
);
With this version you can send a statement once and then send all parameters. This version is the most performant if you have one statement with multiple values.
List<String> names = List.range(0, 10).map(i -> "name-" + i);
CompletionStage<Long> batchResult = reactivePgAsyncPool.executeBatch(
dsl -> dslContext.insertInto(table).columns(name).values((String) null),
names.map(List::of)
);
With this version, you can batch a set of statements. You should use this version if your statements are all different.
List<String> names = List.range(0, 10).map(i -> "name-" + i);
CompletionStage<Long> batchResult = reactivePgAsyncPool.executeBatch(dsl ->
names.map(n -> dslContext.insertInto(table).set(name, n))
);
The jooq-async-reactive
module expose operations with the Mono
/ Flux
API.
PgAsyncPool pgAsyncPool = PgAsyncPool.create(client, jooqConfig);
Mono<Option<String>> result = pgAsyncPool.queryOneOne(dsl -> dsl
.select(name)
.from(table)
.where(name.eq("Ragnar"))
)
.map(mayBeResult -> mayBeResult.map(row -> row.get(name)));