Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.0] Performance improvement: synchronize to java.util.concurrent.locks switch to improve performance with VirtualThreads - backport from master #2118

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2020, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
Expand All @@ -14,7 +14,14 @@

import org.eclipse.persistence.internal.helper.type.ReadLockAcquisitionMetadata;

import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReadLockManager {

Expand All @@ -40,21 +47,28 @@ public class ReadLockManager {
*/
private final List<String> removeReadLockProblemsDetected = new ArrayList<>();

private final Lock instanceLock = new ReentrantLock();

/**
* add a concurrency manager as deferred locks to the DLM
*/
public synchronized void addReadLock(ConcurrencyManager concurrencyManager) {
final Thread currentThread = Thread.currentThread();
final long currentThreadId = currentThread.getId();
ReadLockAcquisitionMetadata readLockAcquisitionMetadata = ConcurrencyUtil.SINGLETON.createReadLockAcquisitionMetadata(concurrencyManager);

this.readLocks.add(FIRST_INDEX_OF_COLLECTION, concurrencyManager);
if(!mapThreadToReadLockAcquisitionMetadata.containsKey(currentThreadId)) {
List<ReadLockAcquisitionMetadata> newList = Collections.synchronizedList(new ArrayList<>());
mapThreadToReadLockAcquisitionMetadata.put(currentThreadId, newList );
public void addReadLock(ConcurrencyManager concurrencyManager) {
instanceLock.lock();
try {
final Thread currentThread = Thread.currentThread();
final long currentThreadId = currentThread.getId();
ReadLockAcquisitionMetadata readLockAcquisitionMetadata = ConcurrencyUtil.SINGLETON.createReadLockAcquisitionMetadata(concurrencyManager);

this.readLocks.add(FIRST_INDEX_OF_COLLECTION, concurrencyManager);
if (!mapThreadToReadLockAcquisitionMetadata.containsKey(currentThreadId)) {
List<ReadLockAcquisitionMetadata> newList = Collections.synchronizedList(new ArrayList<>());
mapThreadToReadLockAcquisitionMetadata.put(currentThreadId, newList);
}
List<ReadLockAcquisitionMetadata> acquiredReadLocksInCurrentTransactionList = mapThreadToReadLockAcquisitionMetadata.get(currentThreadId);
acquiredReadLocksInCurrentTransactionList.add(FIRST_INDEX_OF_COLLECTION, readLockAcquisitionMetadata);
} finally {
instanceLock.unlock();
}
List<ReadLockAcquisitionMetadata> acquiredReadLocksInCurrentTransactionList = mapThreadToReadLockAcquisitionMetadata.get(currentThreadId);
acquiredReadLocksInCurrentTransactionList.add(FIRST_INDEX_OF_COLLECTION, readLockAcquisitionMetadata);
}

/**
Expand All @@ -65,46 +79,56 @@ public synchronized void addReadLock(ConcurrencyManager concurrencyManager) {
* @param concurrencyManager
* the concurrency cache key that is about to be decrement in number of readers.
*/
public synchronized void removeReadLock(ConcurrencyManager concurrencyManager) {
final Thread currentThread = Thread.currentThread();
final long currentThreadId = currentThread.getId();
boolean readLockManagerHasTracingAboutAddedReadLocksForCurrentThread = mapThreadToReadLockAcquisitionMetadata.containsKey(currentThreadId);

if (!readLockManagerHasTracingAboutAddedReadLocksForCurrentThread) {
String errorMessage = ConcurrencyUtil.SINGLETON.readLockManagerProblem02ReadLockManageHasNoEntriesForThread(concurrencyManager, currentThreadId);
removeReadLockProblemsDetected.add(errorMessage);
return;
}
public void removeReadLock(ConcurrencyManager concurrencyManager) {
instanceLock.lock();
try {
final Thread currentThread = Thread.currentThread();
final long currentThreadId = currentThread.getId();
boolean readLockManagerHasTracingAboutAddedReadLocksForCurrentThread = mapThreadToReadLockAcquisitionMetadata.containsKey(currentThreadId);

if (!readLockManagerHasTracingAboutAddedReadLocksForCurrentThread) {
String errorMessage = ConcurrencyUtil.SINGLETON.readLockManagerProblem02ReadLockManageHasNoEntriesForThread(concurrencyManager, currentThreadId);
removeReadLockProblemsDetected.add(errorMessage);
return;
}

List<ReadLockAcquisitionMetadata> readLocksAcquiredDuringCurrentThread = mapThreadToReadLockAcquisitionMetadata.get(currentThreadId);
ReadLockAcquisitionMetadata readLockAquisitionMetadataToRemove = null;
for (ReadLockAcquisitionMetadata currentReadLockAcquisitionMetadata : readLocksAcquiredDuringCurrentThread) {
ConcurrencyManager currentCacheKeyObjectToCheck = currentReadLockAcquisitionMetadata.getCacheKeyWhoseNumberOfReadersThreadIsIncrementing();
boolean dtoToRemoveFound = concurrencyManager.getConcurrencyManagerId() == currentCacheKeyObjectToCheck.getConcurrencyManagerId();
if (dtoToRemoveFound) {
readLockAquisitionMetadataToRemove = currentReadLockAcquisitionMetadata;
break;
List<ReadLockAcquisitionMetadata> readLocksAcquiredDuringCurrentThread = mapThreadToReadLockAcquisitionMetadata.get(currentThreadId);
ReadLockAcquisitionMetadata readLockAquisitionMetadataToRemove = null;
for (ReadLockAcquisitionMetadata currentReadLockAcquisitionMetadata : readLocksAcquiredDuringCurrentThread) {
ConcurrencyManager currentCacheKeyObjectToCheck = currentReadLockAcquisitionMetadata.getCacheKeyWhoseNumberOfReadersThreadIsIncrementing();
boolean dtoToRemoveFound = concurrencyManager.getConcurrencyManagerId() == currentCacheKeyObjectToCheck.getConcurrencyManagerId();
if (dtoToRemoveFound) {
readLockAquisitionMetadataToRemove = currentReadLockAcquisitionMetadata;
break;
}
}
}

if (readLockAquisitionMetadataToRemove == null) {
String errorMessage = ConcurrencyUtil.SINGLETON.readLockManagerProblem03ReadLockManageHasNoEntriesForThread(concurrencyManager, currentThreadId);
removeReadLockProblemsDetected.add(errorMessage);
return;
}
this.readLocks.remove(concurrencyManager);
readLocksAcquiredDuringCurrentThread.remove(readLockAquisitionMetadataToRemove);
if (readLockAquisitionMetadataToRemove == null) {
String errorMessage = ConcurrencyUtil.SINGLETON.readLockManagerProblem03ReadLockManageHasNoEntriesForThread(concurrencyManager, currentThreadId);
removeReadLockProblemsDetected.add(errorMessage);
return;
}
this.readLocks.remove(concurrencyManager);
readLocksAcquiredDuringCurrentThread.remove(readLockAquisitionMetadataToRemove);

if (readLocksAcquiredDuringCurrentThread.isEmpty()) {
mapThreadToReadLockAcquisitionMetadata.remove(currentThreadId);
if (readLocksAcquiredDuringCurrentThread.isEmpty()) {
mapThreadToReadLockAcquisitionMetadata.remove(currentThreadId);
}
} finally {
instanceLock.unlock();
}
}

/**
* Return a set of the deferred locks
*/
public synchronized List<ConcurrencyManager> getReadLocks() {
return Collections.unmodifiableList(readLocks);
public List<ConcurrencyManager> getReadLocks() {
instanceLock.lock();
try {
return Collections.unmodifiableList(readLocks);
} finally {
instanceLock.unlock();
}
}

/**
Expand All @@ -114,8 +138,13 @@ public synchronized List<ConcurrencyManager> getReadLocks() {
* @param problemDetected
* the detected problem
*/
public synchronized void addRemoveReadLockProblemsDetected(String problemDetected) {
removeReadLockProblemsDetected.add(problemDetected);
public void addRemoveReadLockProblemsDetected(String problemDetected) {
instanceLock.lock();
try {
removeReadLockProblemsDetected.add(problemDetected);
} finally {
instanceLock.unlock();
}
}

/** Getter for {@link #mapThreadToReadLockAcquisitionMetadata} */
Expand All @@ -137,8 +166,13 @@ public List<String> getRemoveReadLockProblemsDetected() {
* any read lock acquired in the tracing we definitely do not want this object instance to be thrown out
* from our main tracing. It is probably revealing problems in read lock acquisition and released.
*/
public synchronized boolean isEmpty() {
return readLocks.isEmpty() && removeReadLockProblemsDetected.isEmpty();
public boolean isEmpty() {
instanceLock.lock();
try {
return readLocks.isEmpty() && removeReadLockProblemsDetected.isEmpty();
} finally {
instanceLock.unlock();
}
}

/**
Expand All @@ -151,16 +185,20 @@ public synchronized boolean isEmpty() {
* or to go about doing
*/
@Override
public synchronized ReadLockManager clone() {
ReadLockManager clone = new ReadLockManager();
clone.readLocks.addAll(this.readLocks);
for (Map.Entry<Long, List<ReadLockAcquisitionMetadata>> currentEntry : this.mapThreadToReadLockAcquisitionMetadata.entrySet()) {
Long key = currentEntry.getKey();
List<ReadLockAcquisitionMetadata> value = currentEntry.getValue();
clone.mapThreadToReadLockAcquisitionMetadata.put(key, new ArrayList<>(value));
public ReadLockManager clone() {
instanceLock.lock();
try {
ReadLockManager clone = new ReadLockManager();
clone.readLocks.addAll(this.readLocks);
for (Map.Entry<Long, List<ReadLockAcquisitionMetadata>> currentEntry : this.mapThreadToReadLockAcquisitionMetadata.entrySet()) {
Long key = currentEntry.getKey();
List<ReadLockAcquisitionMetadata> value = currentEntry.getValue();
clone.mapThreadToReadLockAcquisitionMetadata.put(key, new ArrayList<>(value));
}
clone.removeReadLockProblemsDetected.addAll(this.removeReadLockProblemsDetected);
return clone;
} finally {
instanceLock.unlock();
}
clone.removeReadLockProblemsDetected.addAll(this.removeReadLockProblemsDetected);
return clone;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (c) 1998, 2021 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1998, 2021 IBM Corporation. All rights reserved.
* Copyright (c) 1998, 2024 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1998, 2024 IBM Corporation. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
Expand All @@ -22,10 +22,6 @@
// - 526957 : Split the logging and trace messages
package org.eclipse.persistence.internal.helper;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

import org.eclipse.persistence.descriptors.ClassDescriptor;
import org.eclipse.persistence.descriptors.FetchGroupManager;
import org.eclipse.persistence.exceptions.ConcurrencyException;
Expand All @@ -40,6 +36,19 @@
import org.eclipse.persistence.logging.SessionLog;
import org.eclipse.persistence.mappings.DatabaseMapping;

import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static java.util.Collections.unmodifiableMap;

/**
Expand Down Expand Up @@ -122,6 +131,10 @@ public class WriteLockManager {
/* the first element in this list will be the prevailing thread */
protected ExposedNodeLinkedList prevailingQueue;

private final Lock toWaitOnLock = new ReentrantLock();
private final Lock instancePrevailingQueueLock = new ReentrantLock();
private final Condition toWaitOnLockCondition = toWaitOnLock.newCondition();

public WriteLockManager() {
this.prevailingQueue = new ExposedNodeLinkedList();
}
Expand Down Expand Up @@ -167,14 +180,17 @@ public Map acquireLocksForClone(Object objectForClone, ClassDescriptor descripto
// using the exact same approach we have been adding to the concurrency manager
ConcurrencyUtil.SINGLETON.determineIfReleaseDeferredLockAppearsToBeDeadLocked(toWaitOn, whileStartTimeMillis, lockManager, readLockManager, ALLOW_INTERRUPTED_EXCEPTION_TO_BE_FIRED_UP_TRUE);

synchronized (toWaitOn) {
toWaitOnLock.lock();
try {
try {
if (toWaitOn.isAcquired()) {//last minute check to insure it is still locked.
toWaitOn.wait(ConcurrencyUtil.SINGLETON.getAcquireWaitTime());// wait for lock on object to be released
toWaitOnLockCondition.await(ConcurrencyUtil.SINGLETON.getAcquireWaitTime(), TimeUnit.MILLISECONDS);// wait for lock on object to be released
}
} catch (InterruptedException ex) {
// Ignore exception thread should continue.
}
} finally {
toWaitOnLock.unlock();
}
Object waitObject = toWaitOn.getObject();
// Object may be null for loss of identity.
Expand Down Expand Up @@ -419,8 +435,11 @@ private void acquireRequiredLocksInternal(MergeManager mergeManager, UnitOfWorkC
// set the QueueNode to be the node from the
// linked list for quick removal upon
// acquiring all locks
synchronized (this.prevailingQueue) {
instancePrevailingQueueLock.lock();
try {
mergeManager.setQueueNode(this.prevailingQueue.addLast(mergeManager));
} finally {
instancePrevailingQueueLock.unlock();
}
}

Expand All @@ -430,14 +449,15 @@ private void acquireRequiredLocksInternal(MergeManager mergeManager, UnitOfWorkC
try {
if (activeCacheKey != null){
//wait on the lock of the object that we couldn't get.
synchronized (activeCacheKey) {
activeCacheKey.getInstanceLock().lock();
try {
// verify that the cache key is still locked before we wait on it, as
//it may have been released since we tried to acquire it.
if (activeCacheKey.isAcquired() && (activeCacheKey.getActiveThread() != Thread.currentThread())) {
Thread thread = activeCacheKey.getActiveThread();
if (thread.isAlive()){
long time = System.currentTimeMillis();
activeCacheKey.wait(MAX_WAIT);
activeCacheKey.getInstanceLockCondition().await(MAX_WAIT, TimeUnit.MILLISECONDS);
if (System.currentTimeMillis() - time >= MAX_WAIT){
Object[] params = new Object[]{MAX_WAIT /1000, descriptor.getJavaClassName(), activeCacheKey.getKey(), thread.getName()};
StringBuilder buffer = new StringBuilder(TraceLocalization.buildMessage("max_time_exceeded_for_acquirerequiredlocks_wait", params));
Expand All @@ -459,6 +479,8 @@ private void acquireRequiredLocksInternal(MergeManager mergeManager, UnitOfWorkC
}
}
}
} finally {
activeCacheKey.getInstanceLock().unlock();
}
}
} catch (InterruptedException exception) {
Expand Down Expand Up @@ -497,8 +519,11 @@ private void acquireRequiredLocksInternal(MergeManager mergeManager, UnitOfWorkC
}finally {
if (mergeManager.getWriteLockQueued() != null) {
//the merge manager entered the wait queue and must be cleaned up
synchronized(this.prevailingQueue) {
instancePrevailingQueueLock.lock();
try {
this.prevailingQueue.remove(mergeManager.getQueueNode());
} finally {
instancePrevailingQueueLock.unlock();
}
mergeManager.setWriteLockQueued(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.eclipse.persistence.sessions.DataRecord;
import org.eclipse.persistence.sessions.DatabaseRecord;

import java.util.concurrent.TimeUnit;

/**
* <p><b>Purpose</b>: Container class for storing objects in an IdentityMap.
* <p><b>Responsibilities</b>:<ul>
Expand Down Expand Up @@ -604,18 +606,23 @@ public void setTransactionId(Object transactionId) {
this.transactionId = transactionId;
}

public synchronized Object waitForObject(){
public Object waitForObject(){
getInstanceLock().lock();
try {
int count = 0;
while (this.object == null && isAcquired()) {
if (count > MAX_WAIT_TRIES)
throw ConcurrencyException.maxTriesLockOnBuildObjectExceded(getActiveThread(), Thread.currentThread());
wait(10);
++count;
try {
int count = 0;
while (this.object == null && isAcquired()) {
if (count > MAX_WAIT_TRIES)
throw ConcurrencyException.maxTriesLockOnBuildObjectExceded(getActiveThread(), Thread.currentThread());
getInstanceLockCondition().await(10, TimeUnit.MILLISECONDS);
++count;
}
} catch(InterruptedException ex) {
//ignore as the loop is broken
}
} catch(InterruptedException ex) {
//ignore as the loop is broken
return this.object;
} finally {
getInstanceLock().unlock();
}
return this.object;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@

warmup.iterations=20
run.iterations=20
jmh.resultFile=jmh-result.csv
jmh.resultFormat=csv
Loading
Loading