Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JOOQ]: Support for unique modifying operations #126

Merged
merged 2 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion jooq-dialect/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,16 @@ Configure the JOOQ runtime to use the YDB dialect and JDBC driver:
String url = "jdbc:ydb:grpc://localhost:2136/local";
Connection conn = DriverManager.getConnection(url);

DSLContext dsl = new YdbDslContext(conn);
YdbDSLContext dsl = YDB.using(conn);
```

or

```java
String url = "jdbc:ydb:grpc://localhost:2136/local";
try (CloseableYdbDSLContext dsl = YDB.using(url)) {
// ...
}
```

### XML config
Expand Down
33 changes: 33 additions & 0 deletions jooq-dialect/src/main/java/org/jooq/impl/ConnectionUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.jooq.impl;

import io.r2dbc.spi.ConnectionFactory;
import org.jooq.ConnectionProvider;
import org.jooq.tools.jdbc.JDBCUtils;

import java.sql.Connection;

public final class ConnectionUtils {
private ConnectionUtils() {
throw new UnsupportedOperationException();
}

public static ConnectionProvider closeableProvider(Connection connection) {
return new DefaultCloseableConnectionProvider(connection);
}

public static void closeConnectionProvider(ConnectionProvider connectionProvider) {
if (connectionProvider instanceof DefaultCloseableConnectionProvider dcp) {
JDBCUtils.safeClose(dcp.connection);
dcp.connection = null;
}
}

public static void closeConnectionFactory(ConnectionFactory connectionFactory) {
if (connectionFactory instanceof DefaultConnectionFactory dcf) {
if (dcf.finalize) {
R2DBC.blockWrappingExceptions(dcf.connection.close());
dcf.connection = null;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,338 @@
package org.jooq.impl;

import org.jooq.*;
import org.jooq.Record;
import org.jooq.RenderContext.CastMode;

import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import static org.jooq.impl.Keywords.K_VALUES;
import static org.jooq.impl.Tools.BooleanDataKey.DATA_STORE_ASSIGNMENT;


public final class FieldMapsForUpsertReplace extends AbstractQueryPart {
private final Table<?> table;
private final Map<Field<?>, Field<?>> empty;
private final Map<Field<?>, List<Field<?>>> values;
private int rows;
private int nextRow = -1;

public FieldMapsForUpsertReplace(Table<?> table) {
this.table = table;
this.values = new LinkedHashMap<>();
this.empty = new LinkedHashMap<>();
}

public void clear() {
empty.clear();
values.clear();
rows = 0;
nextRow = -1;
}

public boolean isEmpty() {
return values.isEmpty();
}

public void from(FieldMapsForUpsertReplace i) {
empty.putAll(i.empty);

for (Entry<Field<?>, List<Field<?>>> e : i.values.entrySet()) {
values.put(e.getKey(), new ArrayList<>(e.getValue()));
}

rows = i.rows;
nextRow = i.nextRow;
}

@Override
public void accept(Context<?> ctx) {
toSQLValues(ctx);
}

private void toSQLValues(Context<?> ctx) {
ctx.formatSeparator()
.visit(K_VALUES)
.sql(' ');
toSQL92Values(ctx);
}

public static void toSQLUpsertSelect(Context<?> ctx, Select<?> select) {
ctx.formatSeparator().visit(select);
}

public Select<Record> upsertSelect() {
Select<Record> select = null;

Map<Field<?>, List<Field<?>>> v = valuesFlattened();

for (int i = 0; i < rows; i++) {
int row = i;
Select<Record> iteration = DSL.select(Tools.map(
v.entrySet(), e -> patchDefault0(e.getValue().get(row), e.getKey())
));

if (select == null) {
select = iteration;
} else {
select = select.unionAll(iteration);
}
}

return select;
}

private void toSQL92Values(Context<?> ctx) {
boolean indent = values.size() > 1;

CastMode previous = ctx.castMode();
ctx.castMode(CastMode.NEVER);

for (int row = 0; row < rows; row++) {
if (row > 0) {
ctx.sql(", ");
}

ctx.sql('(');

if (indent) {
ctx.formatIndentStart();
}

String separator = "";
for (Entry<Field<?>, List<Field<?>>> e : valuesFlattened().entrySet()) {
List<Field<?>> list = e.getValue();
ctx.sql(separator);

if (indent) {
ctx.formatNewLine();
}

ctx.visit(patchDefault0(list.get(row), e.getKey()));
separator = ", ";
}

if (indent) {
ctx.formatIndentEnd()
.formatNewLine();
}

ctx.sql(')');
}

ctx.castMode(previous);
}

private static Field<?> patchDefault0(Field<?> d, Field<?> f) {
if (d instanceof Default) {
return Tools.orElse(f.getDataType().default_(), () -> DSL.inline(null, f));
}

return d;
}


public void addFields(Collection<?> fields) {
if (rows == 0) {
newRecord();
}

initNextRow();

for (Object field : fields) {
Field<?> f = Tools.tableField(table, field);
Field<?> e = empty.computeIfAbsent(f, LazyVal::new);

values.computeIfAbsent(f, k -> rows > 0
? new ArrayList<>(Collections.nCopies(rows, e))
: new ArrayList<>());
}
}

@SuppressWarnings("unchecked")
private <T> Field<T> set(Field<T> field, Field<T> value) {
addFields(Collections.singletonList(field));
return (Field<T>) values.get(field).set(rows - 1, value);
}

public void set(Map<?, ?> map) {
addFields(map.keySet());

for (Entry<?, ?> entry : map.entrySet()) {
Object k = entry.getKey();
Object v = entry.getValue();
Field<?> field = Tools.tableField(table, k);
values.get(field).set(rows - 1, Tools.field(v, field));
}
}

private void initNextRow() {
if (rows == nextRow) {
Iterator<List<Field<?>>> v = values.values().iterator();
Iterator<Field<?>> e = empty.values().iterator();

while (v.hasNext() && e.hasNext()) {
v.next().add(e.next());
}

rows++;
}
}

public void newRecord() {
if (nextRow < rows) {
nextRow++;
}
}

private Map<Field<?>, Field<?>> map(final int index) {
return new AbstractMap<>() {
private transient Set<Entry<Field<?>, Field<?>>> entrySet;

@Override
public Set<Entry<Field<?>, Field<?>>> entrySet() {
if (entrySet == null) {
entrySet = new EntrySet();
}

return entrySet;
}

@Override
public boolean containsKey(Object key) {
return values.containsKey(key);
}

@Override
public boolean containsValue(Object value) {
return values.values().stream().anyMatch(list -> list.get(index).equals(value));
}

@Override
public Field<?> get(Object key) {
List<Field<?>> list = values.get(key);
return list == null ? null : list.get(index);
}

@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public Field<?> put(Field<?> key, Field<?> value) {
return FieldMapsForUpsertReplace.this.set(key, (Field) value);
}

@Override
public Field<?> remove(Object key) {
List<Field<?>> list = values.remove(key);
return list == null ? null : list.get(index);
}

@Override
public Set<Field<?>> keySet() {
return values.keySet();
}

private final class EntrySet extends AbstractSet<Entry<Field<?>, Field<?>>> {
@Override
public int size() {
return values.size();
}

@Override
public void clear() {
values.clear();
}

@Override
public Iterator<Entry<Field<?>, Field<?>>> iterator() {
return new Iterator<>() {
final Iterator<Entry<Field<?>, List<Field<?>>>> delegate = values.entrySet().iterator();

@Override
public boolean hasNext() {
return delegate.hasNext();
}

@Override
public Entry<Field<?>, Field<?>> next() {
Entry<Field<?>, List<Field<?>>> entry = delegate.next();
return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue().get(index));
}

@Override
public void remove() {
delegate.remove();
}
};
}
}
};
}

public Map<Field<?>, Field<?>> lastMap() {
return map(rows - 1);
}

public boolean isExecutable() {
return rows > 0;
}

public Set<Field<?>> toSQLReferenceKeys(Context<?> ctx) {
if (values.keySet().stream().allMatch(AbstractStoreQuery.UnknownField.class::isInstance)) {
return Collections.emptySet();
}

Set<Field<?>> fields = keysFlattened();

if (!fields.isEmpty()) {
ctx.data(DATA_STORE_ASSIGNMENT, true, c -> c.sql(" (").visit(QueryPartCollectionView.wrap(fields).qualify(false)).sql(')'));
}

return fields;
}

public Set<Field<?>> keysFlattened() {
return valuesFlattened().keySet();
}

private Map<Field<?>, List<Field<?>>> valuesFlattened() {
Map<Field<?>, List<Field<?>>> result = new LinkedHashMap<>();

for (Entry<Field<?>, List<Field<?>>> entry : values.entrySet()) {
Field<?> key = entry.getKey();
DataType<?> keyType = key.getDataType();
List<Field<?>> value = entry.getValue();

if (keyType.isEmbeddable()) {
List<Iterator<? extends Field<?>>> valueFlattened = new ArrayList<>(value.size());

for (Field<?> v : value) {
valueFlattened.add(Tools.flatten(v).iterator());
}

for (Field<?> k : Tools.flatten(key)) {
List<Field<?>> list = new ArrayList<>(value.size());

for (Iterator<? extends Field<?>> v : valueFlattened) {
list.add(v.hasNext() ? v.next() : null);
}

result.put(k, list);
}
} else {
result.put(key, value);
}
}

return result;
}
}
Loading