Skip to content

Commit

Permalink
Fixes coalesce and case operators in multithreaded environments with …
Browse files Browse the repository at this point in the history
…different number of arguments (backport to 3.0)

The bug is described in the issue #2136.

TestArgumentListFunctionExpressionConcurrency.java - adds tests that reproduce the bug.
ExpressionOperator.java - fixes getArgumentIndices by disabling the caching of the dynamically created argument indexes.
  • Loading branch information
Igor-Mukhin committed Sep 12, 2024
1 parent 178d3f2 commit ea060fb
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2379,12 +2379,9 @@ public void printCollection(List<Expression> items, ExpressionSQLPrinter printer
dbStringIndex = 1;
}

if (this.argumentIndices == null) {
this.argumentIndices = new int[items.size()];
for (int i = 0; i < this.argumentIndices.length; i++){
this.argumentIndices[i] = i;
}
}
// Empty `this.argumentIndices` means the operator expects a list of arguments with a variable length.
// #2136: As operator's state is shared among all threads, we are not allowed to modify the field `this.argumentIndices`.
int[] argumentIndexes = (this.argumentIndices != null ? this.argumentIndices : arrayIndexSequence(items.size()));

String[] dbStrings = getDatabaseStrings(items.size());
for (final int index : this.argumentIndices) {
Expand All @@ -2403,6 +2400,14 @@ public void printCollection(List<Expression> items, ExpressionSQLPrinter printer
}
}

private int[] arrayIndexSequence(int size) {
int[] result = new int[size];
for (int i = 0; i < size; i++) {
result[i] = i;
}
return result;
}

/**
* INTERNAL: Print the collection onto the SQL stream.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2024 IBM Corporation. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0,
* or the Eclipse Distribution License v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
package org.eclipse.persistence.jpa.test.jpql;


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ObjIntConsumer;

import org.eclipse.persistence.jpa.test.framework.DDLGen;
import org.eclipse.persistence.jpa.test.framework.Emf;
import org.eclipse.persistence.jpa.test.framework.EmfRunner;
import org.eclipse.persistence.jpa.test.jpql.model.JPQLEntity;
import org.junit.Test;
import org.junit.runner.RunWith;

import jakarta.persistence.EntityManager;
import jakarta.persistence.EntityManagerFactory;

/**
* This test reproduces the issues #2136, #1867 and #1717.
*
* @author Igor Mukhin
*/
@RunWith(EmfRunner.class)
public class TestArgumentListFunctionExpressionConcurrency {

private static final int MAX_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 4);
private static final int ITERATIONS_PER_THREAD = 1000;

@Emf(name = "argumentListFunctionExpressionConcurrencyEMF", createTables = DDLGen.DROP_CREATE, classes = { JPQLEntity.class })
private EntityManagerFactory emf;

@Test
public void testConcurrentUseOfCoalesce() throws Exception {
runInParallel((em, i) -> {
String jpql;
if (i % 2 == 0) {
jpql = "SELECT p FROM JPQLEntity p WHERE p.string1 = coalesce(p.string2, '" + cacheBuster(i) + "')";
} else {
jpql = "SELECT p FROM JPQLEntity p WHERE p.string1 = coalesce(p.string2, p.string1, '" + cacheBuster(i) + "')";
}
em.createQuery(jpql, JPQLEntity.class).getResultList();
System.out.println(Thread.currentThread().getName() + " - " + i % 2);
});
}

@Test
public void testConcurrentUseOfCaseCondition() throws Exception {
runInParallel((em, i) -> {
String jpql;
if (i % 2 == 0) {
jpql = "SELECT p FROM JPQLEntity p"
+ " WHERE p.string1 = case "
+ " when p.string2 = '" + cacheBuster(i) + "' then p.string1 "
+ " else null "
+ " end";
} else {
jpql = "SELECT p FROM JPQLEntity p"
+ " WHERE p.string1 = case "
+ " when p.string2 = '" + cacheBuster(i) + "' then p.string1"
+ " when p.string2 = 'x' then p.string2"
+ " else null "
+ " end";

}
em.createQuery(jpql, JPQLEntity.class).getResultList();
});
}

private static String cacheBuster(Integer i) {
return "cacheBuster." + Thread.currentThread().getName() + "." + i;
}

private void runInParallel(ObjIntConsumer<EntityManager> runnable) throws Exception {
AtomicReference<Exception> exception = new AtomicReference<>();

// start all threads
List<Thread> threads = new ArrayList<>();
for (int t = 0; t < MAX_THREADS; t++) {
Thread thread = new Thread(() -> {
try {
for (int i = 0; i < ITERATIONS_PER_THREAD; i++) {
if (exception.get() != null) {
return;
}

try (EntityManager em = emf.createEntityManager()) {
runnable.accept(em, i);
}

}
} catch (Exception e) {
exception.set(e);
}
});
threads.add(thread);
thread.start();
}

// wait for all threads to finish
threads.forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
exception.set(e);
}
});

// throw the first exception that occurred
if (exception.get() != null) {
throw exception.get();
}
}
}

0 comments on commit ea060fb

Please sign in to comment.