Skip to content

Commit

Permalink
Fixes coalesce and case operators in multithreaded environments (#2136)…
Browse files Browse the repository at this point in the history
… (#2168)

The bug is described in the issue #2136

TestArgumentListFunctionExpressionConcurrency.java - adds tests that reproduce the bug.
ExpressionOperator.java - printCollection method is modified so it does not modify the shared argumentIndexes field.
ArgumentListFunctionExpression.java - previous "workarounds" are removed

Co-authored-by: Igor Mukhin <[email protected]>
  • Loading branch information
igormukhin and Igor-Mukhin committed Sep 5, 2024
1 parent 6de65af commit ea091f7
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2380,15 +2380,12 @@ public void printCollection(List<Expression> items, ExpressionSQLPrinter printer
dbStringIndex = 1;
}

if (this.argumentIndices == null) {
this.argumentIndices = new int[items.size()];
for (int i = 0; i < this.argumentIndices.length; i++){
this.argumentIndices[i] = i;
}
}
// Empty `this.argumentIndices` means the operator expects a list of arguments with a variable length.
// #2136: As operator's state is shared among all threads, we are not allowed to modify the field `this.argumentIndices`.
int[] argumentIndexes = (this.argumentIndices != null ? this.argumentIndices : arrayIndexSequence(items.size()));

String[] dbStrings = getDatabaseStrings(items.size());
for (final int index : this.argumentIndices) {
for (final int index : argumentIndexes) {
Expression item = items.get(index);
if ((this.selector == Ref) || ((this.selector == Deref) && (item.isObjectExpression()))) {
DatabaseTable alias = ((ObjectExpression)item).aliasForTable(((ObjectExpression)item).getDescriptor().getTables().firstElement());
Expand All @@ -2404,6 +2401,14 @@ public void printCollection(List<Expression> items, ExpressionSQLPrinter printer
}
}

private int[] arrayIndexSequence(int size) {
int[] result = new int[size];
for (int i = 0; i < size; i++) {
result[i] = i;
}
return result;
}

/**
* INTERNAL: Print the collection onto the SQL stream.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 1998, 2022 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1998, 2024 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2021, 2022 IBM Corporation. All rights reserved.
*
* This program and the accompanying materials are made available under the
Expand Down Expand Up @@ -95,26 +95,14 @@ public void setOperator(ExpressionOperator theOperator) {
* Print SQL
*/
public void printSQL(ExpressionSQLPrinter printer) {
ListExpressionOperator realOperator;
realOperator = (ListExpressionOperator)getPlatformOperator(printer.getPlatform());
operator.copyTo(realOperator);
((ListExpressionOperator) realOperator).setIsComplete(true);
realOperator.printCollection(this.children, printer);
ListExpressionOperator operator = (ListExpressionOperator) this.operator;

operator.setIsComplete(true);
operator.printCollection(this.children, printer);
}

@Override
protected void postCopyIn(Map alreadyDone) {
/*
* Bug 463042: All ArgumentListFunctionExpression instances store the same operator reference.
* Unfortunately, ListExpressionOperator.numberOfItems stores state. If multiple ArgumentListFunctionExpression
* are run concurrently, then the ListExpressionOperator.numberOfItems state shared by all instances
* becomes inconsistent. A solution is to make sure each ArgumentListFunctionExpression has a unique operator
* reference.
*/
final ListExpressionOperator originalOperator = ((ListExpressionOperator) this.operator);
this.operator = new ListExpressionOperator();
originalOperator.copyTo(this.operator);

Boolean hasLastChildCopy = hasLastChild;
hasLastChild = null;
super.postCopyIn(alreadyDone);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2024 IBM Corporation. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0,
* or the Eclipse Distribution License v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
package org.eclipse.persistence.jpa.test.jpql;


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ObjIntConsumer;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;

import org.eclipse.persistence.jpa.test.framework.DDLGen;
import org.eclipse.persistence.jpa.test.framework.Emf;
import org.eclipse.persistence.jpa.test.framework.EmfRunner;
import org.eclipse.persistence.jpa.test.jpql.model.JPQLEntity;
import org.junit.Test;
import org.junit.runner.RunWith;

/**
* This test reproduces the issues #2136, #1867 and #1717.
*
* @author Igor Mukhin
*/
@RunWith(EmfRunner.class)
public class TestArgumentListFunctionExpressionConcurrency {

private static final int MAX_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 4);
private static final int ITERATIONS_PER_THREAD = 1000;

@Emf(name = "argumentListFunctionExpressionConcurrencyEMF", createTables = DDLGen.DROP_CREATE, classes = { JPQLEntity.class })
private EntityManagerFactory emf;

@Test
public void testConcurrentUseOfCoalesce() throws Exception {
runInParallel((em, i) -> {
String jpql = "SELECT p FROM JPQLEntity p"
+ " WHERE p.string1 = coalesce(p.string2, '" + cacheBuster(i) + "')";

em.createQuery(jpql, JPQLEntity.class).getResultList();
});
}

@Test
public void testConcurrentUseOfCaseCondition() throws Exception {
runInParallel((em, i) -> {
String jpql = "SELECT p FROM JPQLEntity p"
+ " WHERE p.string1 = case when p.string2 = '" + cacheBuster(i) + "' then null else p.string1 end";

em.createQuery(jpql, JPQLEntity.class).getResultList();
});
}

private static String cacheBuster(Integer i) {
return "cacheBuster." + Thread.currentThread().getName() + "." + i;
}

private void runInParallel(ObjIntConsumer<EntityManager> runnable) throws Exception {
AtomicReference<Exception> exception = new AtomicReference<>();

// start all threads
List<Thread> threads = new ArrayList<>();
for (int t = 0; t < MAX_THREADS; t++) {
Thread thread = new Thread(() -> {
try {
for (int i = 0; i < ITERATIONS_PER_THREAD; i++) {
if (exception.get() != null) {
return;
}

EntityManager em = emf.createEntityManager();
try {
runnable.accept(em, i);
} finally {
em.close();
}

}
} catch (Exception e) {
exception.set(e);
}
});
threads.add(thread);
thread.start();
}

// wait for all threads to finish
threads.forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
exception.set(e);
}
});

// throw the first exception that occurred
if (exception.get() != null) {
throw exception.get();
}
}
}

0 comments on commit ea091f7

Please sign in to comment.