Skip to content

Commit

Permalink
Fixes coalesce and case operators in multithreaded environments with …
Browse files Browse the repository at this point in the history
…different number of arguments (backport to 4.0) (#2262)

The bug is described in the issue #2136.

TestArgumentListFunctionExpressionConcurrency.java - adds tests that reproduce the bug.
ArgumentListFunctionExpression.java - fixes printSQL to use the operator itself and not the common instance of it
ExpressionOperator.java - fixes getArgumentIndices by disabling the caching of the dynamically created argument indexes.

Co-authored-by: Igor Mukhin <[email protected]>
  • Loading branch information
igormukhin and Igor-Mukhin committed Sep 13, 2024
1 parent 74f7b21 commit d700a2b
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2598,7 +2598,7 @@ public void setArgumentIndices(int[] indices) {
}

/**
* Return the argumentIndices if set, otherwise initialize argumentIndices to the provided size
* Returns the argumentIndices if set, otherwise returns an array of indexes of the provided size.
*/
public int[] getArgumentIndices(int size) {
int[] indices = this.argumentIndices;
Expand All @@ -2610,7 +2610,11 @@ public int[] getArgumentIndices(int size) {
for (int i = 0; i < indices.length; i++) {
indices[i] = i;
}
this.argumentIndices = indices;

// NOTE: Why not cache the newly generated array of indexes like "this.argumentIndices = indices" here?
// The reason is that some operators have variable number of arguments like COALESCE and CASE WHEN.
// As instances of this class are shared between threads we cannot cache the array of indexes.

return indices;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 1998, 2021 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1998, 2024 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2021 IBM Corporation. All rights reserved.
*
* This program and the accompanying materials are made available under the
Expand Down Expand Up @@ -94,10 +94,9 @@ public void setOperator(ExpressionOperator theOperator) {
*/
@Override
public void printSQL(ExpressionSQLPrinter printer) {
ListExpressionOperator realOperator;
realOperator = (ListExpressionOperator)getPlatformOperator(printer.getPlatform());
ListExpressionOperator realOperator = (ListExpressionOperator) getPlatformOperator(printer.getPlatform());
operator.copyTo(realOperator);
((ListExpressionOperator) realOperator).setIsComplete(true);
realOperator.setIsComplete(true);
realOperator.printCollection(this.children, printer);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2024 IBM Corporation. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0,
* or the Eclipse Distribution License v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
package org.eclipse.persistence.jpa.test.jpql;


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ObjIntConsumer;

import org.eclipse.persistence.jpa.test.framework.DDLGen;
import org.eclipse.persistence.jpa.test.framework.Emf;
import org.eclipse.persistence.jpa.test.framework.EmfRunner;
import org.eclipse.persistence.jpa.test.jpql.model.JPQLEntity;
import org.junit.Test;
import org.junit.runner.RunWith;

import jakarta.persistence.EntityManager;
import jakarta.persistence.EntityManagerFactory;

/**
* This test reproduces the issues #2136, #1867 and #1717.
*
* @author Igor Mukhin
*/
@RunWith(EmfRunner.class)
public class TestArgumentListFunctionExpressionConcurrency {

private static final int MAX_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 4);
private static final int ITERATIONS_PER_THREAD = 1000;

@Emf(name = "argumentListFunctionExpressionConcurrencyEMF", createTables = DDLGen.DROP_CREATE, classes = { JPQLEntity.class })
private EntityManagerFactory emf;

@Test
public void testConcurrentUseOfCoalesce() throws Exception {
runInParallel((em, i) -> {
String jpql;
if (i % 2 == 0) {
jpql = "SELECT p FROM JPQLEntity p WHERE p.string1 = coalesce(p.string2, '" + cacheBuster(i) + "')";
} else {
jpql = "SELECT p FROM JPQLEntity p WHERE p.string1 = coalesce(p.string2, p.string1, '" + cacheBuster(i) + "')";
}
em.createQuery(jpql, JPQLEntity.class).getResultList();
System.out.println(Thread.currentThread().getName() + " - " + i % 2);
});
}

@Test
public void testConcurrentUseOfCaseCondition() throws Exception {
runInParallel((em, i) -> {
String jpql;
if (i % 2 == 0) {
jpql = "SELECT p FROM JPQLEntity p"
+ " WHERE p.string1 = case "
+ " when p.string2 = '" + cacheBuster(i) + "' then p.string1 "
+ " else null "
+ " end";
} else {
jpql = "SELECT p FROM JPQLEntity p"
+ " WHERE p.string1 = case "
+ " when p.string2 = '" + cacheBuster(i) + "' then p.string1"
+ " when p.string2 = 'x' then p.string2"
+ " else null "
+ " end";

}
em.createQuery(jpql, JPQLEntity.class).getResultList();
});
}

private static String cacheBuster(Integer i) {
return "cacheBuster." + Thread.currentThread().getName() + "." + i;
}

private void runInParallel(ObjIntConsumer<EntityManager> runnable) throws Exception {
AtomicReference<Exception> exception = new AtomicReference<>();

// start all threads
List<Thread> threads = new ArrayList<>();
for (int t = 0; t < MAX_THREADS; t++) {
Thread thread = new Thread(() -> {
try {
for (int i = 0; i < ITERATIONS_PER_THREAD; i++) {
if (exception.get() != null) {
return;
}

try (EntityManager em = emf.createEntityManager()) {
runnable.accept(em, i);
}

}
} catch (Exception e) {
exception.set(e);
}
});
threads.add(thread);
thread.start();
}

// wait for all threads to finish
threads.forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
exception.set(e);
}
});

// throw the first exception that occurred
if (exception.get() != null) {
throw exception.get();
}
}
}

0 comments on commit d700a2b

Please sign in to comment.