From cc56a93ae71d3072dbea5d27cabfb786df75d8d2 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Tue, 27 Jun 2023 12:06:39 +0200 Subject: [PATCH] Refine locking. Closes: #4429 Original Pull Request: #4431 --- .../data/mongodb/core/MongoOperations.java | 13 ++- .../core/convert/LazyLoadingProxyFactory.java | 48 ++++++++--- .../core/messaging/CursorReadingTask.java | 82 ++++++++++-------- .../DefaultMessageListenerContainer.java | 85 +++++++++++-------- 4 files changed, 139 insertions(+), 89 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java index 51d93db0ec..db419a25e3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java @@ -18,8 +18,12 @@ import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.stream.Stream; import org.bson.Document; @@ -188,16 +192,19 @@ default SessionScoped withSession(Supplier sessionProvider) { return new SessionScoped() { - private final Object lock = new Object(); - private @Nullable ClientSession session = null; + private final Lock lock = new ReentrantLock(); + private @Nullable ClientSession session; @Override public T execute(SessionCallback action, Consumer onComplete) { - synchronized (lock) { + lock.lock(); + try { if (session == null) { session = sessionProvider.get(); } + } finally { + lock.unlock(); } try { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java index 0c14b27970..53ff2a0be3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java @@ -22,6 +22,9 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.lang.reflect.Method; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; import org.aopalliance.intercept.MethodInterceptor; @@ -134,7 +137,8 @@ public Object createLazyLoadingProxy(MongoPersistentProperty property, DbRefReso } return prepareProxyFactory(propertyType, - () -> new LazyLoadingInterceptor(property, callback, source, exceptionTranslator)).getProxy(LazyLoadingProxy.class.getClassLoader()); + () -> new LazyLoadingInterceptor(property, callback, source, exceptionTranslator)) + .getProxy(LazyLoadingProxy.class.getClassLoader()); } /** @@ -171,6 +175,8 @@ public static class LazyLoadingInterceptor } } + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final MongoPersistentProperty property; private final DbRefResolverCallback callback; private final Object source; @@ -339,25 +345,29 @@ private void readObject(ObjectInputStream in) throws IOException { } @Nullable - private synchronized Object resolve() { + private Object resolve() { - if (resolved) { + lock.readLock().lock(); + try { + if (resolved) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace(String.format("Accessing already resolved lazy loading property %s.%s", - property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace(String.format("Accessing already resolved lazy loading property %s.%s", + property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); + } + return result; } - return result; + } finally { + lock.readLock().unlock(); } - try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace(String.format("Resolving lazy loading property %s.%s", - property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); - } - - return callback.resolve(property); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace(String.format("Resolving lazy loading property %s.%s", + property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); + } + try { + return executeWhileLocked(lock.writeLock(), () -> callback.resolve(property)); } catch (RuntimeException ex) { DataAccessException translatedException = exceptionTranslator.translateExceptionIfPossible(ex); @@ -370,6 +380,16 @@ private synchronized Object resolve() { translatedException != null ? translatedException : ex); } } + + private static T executeWhileLocked(Lock lock, Supplier stuff) { + + lock.lock(); + try { + return stuff.get(); + } finally { + lock.unlock(); + } + } } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java index cc0c64eeaf..dcebebbf75 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java @@ -18,6 +18,8 @@ import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import org.springframework.dao.DataAccessResourceFailureException; @@ -39,7 +41,7 @@ */ abstract class CursorReadingTask implements Task { - private final Object lifecycleMonitor = new Object(); + private final Lock lock = new ReentrantLock(); private final MongoTemplate template; private final SubscriptionRequest request; @@ -86,19 +88,14 @@ public void run() { } } catch (InterruptedException e) { - synchronized (lifecycleMonitor) { - state = State.CANCELLED; - } + doWhileLocked(lock, () -> state = State.CANCELLED); Thread.currentThread().interrupt(); break; } } } catch (RuntimeException e) { - synchronized (lifecycleMonitor) { - state = State.CANCELLED; - } - + doWhileLocked(lock, () -> state = State.CANCELLED); errorHandler.handleError(e); } } @@ -114,30 +111,32 @@ public void run() { */ private void start() { - synchronized (lifecycleMonitor) { + doWhileLocked(lock, () -> { if (!State.RUNNING.equals(state)) { state = State.STARTING; } - } + }); do { - boolean valid = false; + // boolean valid = false; - synchronized (lifecycleMonitor) { + boolean valid = executeWhileLocked(lock, () -> { - if (State.STARTING.equals(state)) { + if (!State.STARTING.equals(state)) { + return false; + } - MongoCursor cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType)); - valid = isValidCursor(cursor); - if (valid) { - this.cursor = cursor; - state = State.RUNNING; - } else if (cursor != null) { - cursor.close(); - } + MongoCursor cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType)); + boolean isValid = isValidCursor(cursor); + if (isValid) { + this.cursor = cursor; + state = State.RUNNING; + } else if (cursor != null) { + cursor.close(); } - } + return isValid; + }); if (!valid) { @@ -145,9 +144,7 @@ private void start() { Thread.sleep(100); } catch (InterruptedException e) { - synchronized (lifecycleMonitor) { - state = State.CANCELLED; - } + doWhileLocked(lock, () -> state = State.CANCELLED); Thread.currentThread().interrupt(); } } @@ -163,7 +160,7 @@ private void start() { @Override public void cancel() throws DataAccessResourceFailureException { - synchronized (lifecycleMonitor) { + doWhileLocked(lock, () -> { if (State.RUNNING.equals(state) || State.STARTING.equals(state)) { this.state = State.CANCELLED; @@ -171,7 +168,7 @@ public void cancel() throws DataAccessResourceFailureException { cursor.close(); } } - } + }); } @Override @@ -181,10 +178,7 @@ public boolean isLongLived() { @Override public State getState() { - - synchronized (lifecycleMonitor) { - return state; - } + return executeWhileLocked(lock, () -> state); } @Override @@ -220,13 +214,12 @@ private void emitMessage(Message message) { @Nullable private T getNext() { - synchronized (lifecycleMonitor) { + return executeWhileLocked(lock, () -> { if (State.RUNNING.equals(state)) { return cursor.tryNext(); } - } - - throw new IllegalStateException(String.format("Cursor %s is not longer open", cursor)); + throw new IllegalStateException(String.format("Cursor %s is not longer open", cursor)); + }); } private static boolean isValidCursor(@Nullable MongoCursor cursor) { @@ -263,4 +256,23 @@ private V execute(Supplier callback) { throw translated != null ? translated : e; } } + + private static void doWhileLocked(Lock lock, Runnable action) { + + executeWhileLocked(lock, () -> { + action.run(); + return null; + }); + } + + @Nullable + private static T executeWhileLocked(Lock lock, Supplier stuff) { + + lock.lock(); + try { + return stuff.get(); + } finally { + lock.unlock(); + } + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java index 7eb088c491..0e8f72cfe9 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java @@ -20,6 +20,10 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.Executor; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,8 +39,7 @@ /** * Simple {@link Executor} based {@link MessageListenerContainer} implementation for running {@link Task tasks} like * listening to MongoDB Change Streams and tailable - * cursors. - *
+ * cursors.
* This message container creates long-running tasks that are executed on {@link Executor}. * * @author Christoph Strobl @@ -49,9 +52,11 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer private final TaskFactory taskFactory; private final Optional errorHandler; - private final Object lifecycleMonitor = new Object(); private final Map subscriptions = new LinkedHashMap<>(); + ReadWriteLock lifecycleMonitor = new ReentrantReadWriteLock(); + ReadWriteLock subscriptionMonitor = new ReentrantReadWriteLock(); + private boolean running = false; /** @@ -109,43 +114,34 @@ public void stop(Runnable callback) { @Override public void start() { - synchronized (lifecycleMonitor) { + doWhileLocked(lifecycleMonitor.writeLock(), () -> { + if (!this.running) { + subscriptions.values().stream() // + .filter(it -> !it.isActive()) // + .filter(TaskSubscription.class::isInstance) // + .map(TaskSubscription.class::cast) // + .map(TaskSubscription::getTask) // + .forEach(taskExecutor::execute); - if (this.running) { - return; + running = true; } - - subscriptions.values().stream() // - .filter(it -> !it.isActive()) // - .filter(TaskSubscription.class::isInstance) // - .map(TaskSubscription.class::cast) // - .map(TaskSubscription::getTask) // - .forEach(taskExecutor::execute); - - running = true; - } + }); } @Override public void stop() { - synchronized (lifecycleMonitor) { - + doWhileLocked(lifecycleMonitor.writeLock(), () -> { if (this.running) { - subscriptions.values().forEach(Cancelable::cancel); - running = false; } - } + }); } @Override public boolean isRunning() { - - synchronized (this.lifecycleMonitor) { - return running; - } + return executeWhileLocked(lifecycleMonitor.readLock(), () -> running); } @Override @@ -170,36 +166,32 @@ public Subscription register(SubscriptionRequest lookup(SubscriptionRequest request) { - - synchronized (lifecycleMonitor) { - return Optional.ofNullable(subscriptions.get(request)); - } + return executeWhileLocked(subscriptionMonitor.readLock(), () -> Optional.ofNullable(subscriptions.get(request))); } public Subscription register(SubscriptionRequest request, Task task) { - Subscription subscription = new TaskSubscription(task); - - synchronized (lifecycleMonitor) { - + return executeWhileLocked(this.subscriptionMonitor.writeLock(), () -> + { if (subscriptions.containsKey(request)) { return subscriptions.get(request); } + Subscription subscription = new TaskSubscription(task); this.subscriptions.put(request, subscription); - if (this.running) { + if (this.isRunning()) { taskExecutor.execute(task); } - } + return subscription; + }); - return subscription; } @Override public void remove(Subscription subscription) { - synchronized (lifecycleMonitor) { + doWhileLocked(this.subscriptionMonitor.writeLock(), () -> { if (subscriptions.containsValue(subscription)) { @@ -209,6 +201,25 @@ public void remove(Subscription subscription) { subscriptions.values().remove(subscription); } + }); + } + + private static void doWhileLocked(Lock lock, Runnable action) { + + executeWhileLocked(lock, () -> { + action.run(); + return null; + }); + } + + @Nullable + private static T executeWhileLocked(Lock lock, Supplier stuff) { + + lock.lock(); + try { + return stuff.get(); + } finally { + lock.unlock(); } }