Skip to content

Commit

Permalink
[JOOQ]: Support for unique modifying operations: UPSERT and REPLACE
Browse files Browse the repository at this point in the history
  • Loading branch information
i.kriushenkov committed May 10, 2024
1 parent 2b1a13e commit 9140b02
Show file tree
Hide file tree
Showing 74 changed files with 10,629 additions and 173 deletions.
11 changes: 10 additions & 1 deletion jooq-dialect/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,16 @@ Configure the JOOQ runtime to use the YDB dialect and JDBC driver:
String url = "jdbc:ydb:grpc://localhost:2136/local";
Connection conn = DriverManager.getConnection(url);

DSLContext dsl = new YdbDslContext(conn);
YdbDSLContext dsl = YDB.using(conn);
```

or

```java
String url = "jdbc:ydb:grpc://localhost:2136/local";
try (CloseableYdbDSLContext dsl = YDB.using(url)) {
// ...
}
```

### XML config
Expand Down
33 changes: 33 additions & 0 deletions jooq-dialect/src/main/java/org/jooq/impl/ConnectionUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.jooq.impl;

import io.r2dbc.spi.ConnectionFactory;
import org.jooq.ConnectionProvider;
import org.jooq.tools.jdbc.JDBCUtils;

import java.sql.Connection;

public final class ConnectionUtils {
private ConnectionUtils() {
throw new UnsupportedOperationException();
}

public static ConnectionProvider closeableProvider(Connection connection) {
return new DefaultCloseableConnectionProvider(connection);
}

public static void closeConnectionProvider(ConnectionProvider connectionProvider) {
if (connectionProvider instanceof DefaultCloseableConnectionProvider dcp) {
JDBCUtils.safeClose(dcp.connection);
dcp.connection = null;
}
}

public static void closeConnectionFactory(ConnectionFactory connectionFactory) {
if (connectionFactory instanceof DefaultConnectionFactory dcf) {
if (dcf.finalize) {
R2DBC.blockWrappingExceptions(dcf.connection.close());
dcf.connection = null;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,338 @@
package org.jooq.impl;

import org.jooq.*;
import org.jooq.Record;
import org.jooq.RenderContext.CastMode;

import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import static org.jooq.impl.Keywords.K_VALUES;
import static org.jooq.impl.Tools.BooleanDataKey.DATA_STORE_ASSIGNMENT;


public final class FieldMapsForUpsertReplace extends AbstractQueryPart {
private final Table<?> table;
private final Map<Field<?>, Field<?>> empty;
private final Map<Field<?>, List<Field<?>>> values;
private int rows;
private int nextRow = -1;

public FieldMapsForUpsertReplace(Table<?> table) {
this.table = table;
this.values = new LinkedHashMap<>();
this.empty = new LinkedHashMap<>();
}

public void clear() {
empty.clear();
values.clear();
rows = 0;
nextRow = -1;
}

public boolean isEmpty() {
return values.isEmpty();
}

public void from(FieldMapsForUpsertReplace i) {
empty.putAll(i.empty);

for (Entry<Field<?>, List<Field<?>>> e : i.values.entrySet()) {
values.put(e.getKey(), new ArrayList<>(e.getValue()));
}

rows = i.rows;
nextRow = i.nextRow;
}

@Override
public void accept(Context<?> ctx) {
toSQLValues(ctx);
}

private void toSQLValues(Context<?> ctx) {
ctx.formatSeparator()
.visit(K_VALUES)
.sql(' ');
toSQL92Values(ctx);
}

public static void toSQLUpsertSelect(Context<?> ctx, Select<?> select) {
ctx.formatSeparator().visit(select);
}

public Select<Record> upsertSelect() {
Select<Record> select = null;

Map<Field<?>, List<Field<?>>> v = valuesFlattened();

for (int i = 0; i < rows; i++) {
int row = i;
Select<Record> iteration = DSL.select(Tools.map(
v.entrySet(), e -> patchDefault0(e.getValue().get(row), e.getKey())
));

if (select == null) {
select = iteration;
} else {
select = select.unionAll(iteration);
}
}

return select;
}

private void toSQL92Values(Context<?> ctx) {
boolean indent = values.size() > 1;

CastMode previous = ctx.castMode();
ctx.castMode(CastMode.NEVER);

for (int row = 0; row < rows; row++) {
if (row > 0) {
ctx.sql(", ");
}

ctx.sql('(');

if (indent) {
ctx.formatIndentStart();
}

String separator = "";
for (Entry<Field<?>, List<Field<?>>> e : valuesFlattened().entrySet()) {
List<Field<?>> list = e.getValue();
ctx.sql(separator);

if (indent) {
ctx.formatNewLine();
}

ctx.visit(patchDefault0(list.get(row), e.getKey()));
separator = ", ";
}

if (indent) {
ctx.formatIndentEnd()
.formatNewLine();
}

ctx.sql(')');
}

ctx.castMode(previous);
}

private static Field<?> patchDefault0(Field<?> d, Field<?> f) {
if (d instanceof Default) {
return Tools.orElse(f.getDataType().default_(), () -> DSL.inline(null, f));
}

return d;
}


public void addFields(Collection<?> fields) {
if (rows == 0) {
newRecord();
}

initNextRow();

for (Object field : fields) {
Field<?> f = Tools.tableField(table, field);
Field<?> e = empty.computeIfAbsent(f, LazyVal::new);

values.computeIfAbsent(f, k -> rows > 0
? new ArrayList<>(Collections.nCopies(rows, e))
: new ArrayList<>());
}
}

@SuppressWarnings("unchecked")
private <T> Field<T> set(Field<T> field, Field<T> value) {
addFields(Collections.singletonList(field));
return (Field<T>) values.get(field).set(rows - 1, value);
}

public void set(Map<?, ?> map) {
addFields(map.keySet());

for (Entry<?, ?> entry : map.entrySet()) {
Object k = entry.getKey();
Object v = entry.getValue();
Field<?> field = Tools.tableField(table, k);
values.get(field).set(rows - 1, Tools.field(v, field));
}
}

private void initNextRow() {
if (rows == nextRow) {
Iterator<List<Field<?>>> v = values.values().iterator();
Iterator<Field<?>> e = empty.values().iterator();

while (v.hasNext() && e.hasNext()) {
v.next().add(e.next());
}

rows++;
}
}

public void newRecord() {
if (nextRow < rows) {
nextRow++;
}
}

private Map<Field<?>, Field<?>> map(final int index) {
return new AbstractMap<>() {
private transient Set<Entry<Field<?>, Field<?>>> entrySet;

@Override
public Set<Entry<Field<?>, Field<?>>> entrySet() {
if (entrySet == null) {
entrySet = new EntrySet();
}

return entrySet;
}

@Override
public boolean containsKey(Object key) {
return values.containsKey(key);
}

@Override
public boolean containsValue(Object value) {
return values.values().stream().anyMatch(list -> list.get(index).equals(value));
}

@Override
public Field<?> get(Object key) {
List<Field<?>> list = values.get(key);
return list == null ? null : list.get(index);
}

@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public Field<?> put(Field<?> key, Field<?> value) {
return FieldMapsForUpsertReplace.this.set(key, (Field) value);
}

@Override
public Field<?> remove(Object key) {
List<Field<?>> list = values.remove(key);
return list == null ? null : list.get(index);
}

@Override
public Set<Field<?>> keySet() {
return values.keySet();
}

private final class EntrySet extends AbstractSet<Entry<Field<?>, Field<?>>> {
@Override
public int size() {
return values.size();
}

@Override
public void clear() {
values.clear();
}

@Override
public Iterator<Entry<Field<?>, Field<?>>> iterator() {
return new Iterator<>() {
final Iterator<Entry<Field<?>, List<Field<?>>>> delegate = values.entrySet().iterator();

@Override
public boolean hasNext() {
return delegate.hasNext();
}

@Override
public Entry<Field<?>, Field<?>> next() {
Entry<Field<?>, List<Field<?>>> entry = delegate.next();
return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue().get(index));
}

@Override
public void remove() {
delegate.remove();
}
};
}
}
};
}

public Map<Field<?>, Field<?>> lastMap() {
return map(rows - 1);
}

public boolean isExecutable() {
return rows > 0;
}

public Set<Field<?>> toSQLReferenceKeys(Context<?> ctx) {
if (values.keySet().stream().allMatch(AbstractStoreQuery.UnknownField.class::isInstance)) {
return Collections.emptySet();
}

Set<Field<?>> fields = keysFlattened();

if (!fields.isEmpty()) {
ctx.data(DATA_STORE_ASSIGNMENT, true, c -> c.sql(" (").visit(QueryPartCollectionView.wrap(fields).qualify(false)).sql(')'));
}

return fields;
}

public Set<Field<?>> keysFlattened() {
return valuesFlattened().keySet();
}

private Map<Field<?>, List<Field<?>>> valuesFlattened() {
Map<Field<?>, List<Field<?>>> result = new LinkedHashMap<>();

for (Entry<Field<?>, List<Field<?>>> entry : values.entrySet()) {
Field<?> key = entry.getKey();
DataType<?> keyType = key.getDataType();
List<Field<?>> value = entry.getValue();

if (keyType.isEmbeddable()) {
List<Iterator<? extends Field<?>>> valueFlattened = new ArrayList<>(value.size());

for (Field<?> v : value) {
valueFlattened.add(Tools.flatten(v).iterator());
}

for (Field<?> k : Tools.flatten(key)) {
List<Field<?>> list = new ArrayList<>(value.size());

for (Iterator<? extends Field<?>> v : valueFlattened) {
list.add(v.hasNext() ? v.next() : null);
}

result.put(k, list);
}
} else {
result.put(key, value);
}
}

return result;
}
}
Loading

0 comments on commit 9140b02

Please sign in to comment.