Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3444: Add Custom TTL support for RedisLock, and JdbcLock #9053

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP

private boolean sequenceAware;

private LockRegistry lockRegistry = new DefaultLockRegistry();
private LockRegistry<?> lockRegistry = new DefaultLockRegistry();

private boolean lockRegistrySet = false;

Expand Down Expand Up @@ -193,7 +193,7 @@ public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor) {
this(processor, new SimpleMessageStore(0), null, null);
}

public void setLockRegistry(LockRegistry lockRegistry) {
public void setLockRegistry(LockRegistry<?> lockRegistry) {
Assert.isTrue(!this.lockRegistrySet, "'this.lockRegistry' can not be reset once its been set");
Assert.notNull(lockRegistry, "'lockRegistry' must not be null");
this.lockRegistry = lockRegistry;
Expand Down Expand Up @@ -499,7 +499,7 @@ protected boolean isSequenceAware() {
return this.sequenceAware;
}

protected LockRegistry getLockRegistry() {
protected LockRegistry<?> getLockRegistry() {
return this.lockRegistry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class AggregatorFactoryBean extends AbstractSimpleMessageHandlerFactoryBe

private String outputChannelName;

private LockRegistry lockRegistry;
private LockRegistry<?> lockRegistry;

private MessageGroupStore messageStore;

Expand Down Expand Up @@ -122,7 +122,7 @@ public void setOutputChannelName(String outputChannelName) {
this.outputChannelName = outputChannelName;
}

public void setLockRegistry(LockRegistry lockRegistry) {
public void setLockRegistry(LockRegistry<?> lockRegistry) {
this.lockRegistry = lockRegistry;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -299,7 +299,7 @@ public S forceReleaseAdvice(Advice... advice) {
* @param lockRegistry the {@link LockRegistry} to use.
* @return the endpoint spec.
*/
public S lockRegistry(LockRegistry lockRegistry) {
public S lockRegistry(LockRegistry<?> lockRegistry) {
Assert.notNull(lockRegistry, "'lockRegistry' must not be null.");
this.handler.setLockRegistry(lockRegistry);
return _this();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,7 +65,7 @@ public class PropertiesPersistingMetadataStore implements ConcurrentMetadataStor

private final DefaultPropertiesPersister persister = new DefaultPropertiesPersister();

private final LockRegistry lockRegistry = new DefaultLockRegistry();
private final LockRegistry<Lock> lockRegistry = new DefaultLockRegistry();

private String baseDirectory = System.getProperty("java.io.tmpdir") + "/spring-integration/";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class SimpleMessageStore extends AbstractMessageGroupStore

private final long upperBoundTimeout;

private LockRegistry lockRegistry;
private LockRegistry<?> lockRegistry;

private boolean copyOnGet = false;

Expand Down Expand Up @@ -111,7 +111,7 @@ public SimpleMessageStore(int individualCapacity, int groupCapacity, long upperB
* @param lockRegistry The lock registry.
* @see #SimpleMessageStore(int, int, long, LockRegistry)
*/
public SimpleMessageStore(int individualCapacity, int groupCapacity, LockRegistry lockRegistry) {
public SimpleMessageStore(int individualCapacity, int groupCapacity, LockRegistry<?> lockRegistry) {
this(individualCapacity, groupCapacity, 0, lockRegistry);
}

Expand All @@ -126,7 +126,7 @@ public SimpleMessageStore(int individualCapacity, int groupCapacity, LockRegistr
* @since 4.3
*/
public SimpleMessageStore(int individualCapacity, int groupCapacity, long upperBoundTimeout,
LockRegistry lockRegistry) {
LockRegistry<?> lockRegistry) {

super(false);
Assert.notNull(lockRegistry, "The LockRegistry cannot be null");
Expand Down Expand Up @@ -162,7 +162,7 @@ public void setCopyOnGet(boolean copyOnGet) {
this.copyOnGet = copyOnGet;
}

public void setLockRegistry(LockRegistry lockRegistry) {
public void setLockRegistry(LockRegistry<?> lockRegistry) {
Assert.notNull(lockRegistry, "The LockRegistry cannot be null");
Assert.isTrue(!(this.isUsed), "Cannot change the lock registry after the store has been used");
this.lockRegistry = lockRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class LockRegistryLeaderInitiator implements SmartLifecycle, DisposableBe
* A lock registry. The locks it manages should be global (whatever that means for the
* system) and expiring, in case the holder dies without notifying anyone.
*/
private final LockRegistry locks;
private final LockRegistry<?> locks;

/**
* Candidate for leader election. User injects this to receive callbacks on leadership
Expand Down Expand Up @@ -162,7 +162,7 @@ public String getRole() {
* candidate (which just logs the leadership events).
* @param locks lock registry
*/
public LockRegistryLeaderInitiator(LockRegistry locks) {
public LockRegistryLeaderInitiator(LockRegistry<?> locks) {
this(locks, new DefaultCandidate());
}

Expand All @@ -172,7 +172,7 @@ public LockRegistryLeaderInitiator(LockRegistry locks) {
* @param locks lock registry
* @param candidate leadership election candidate
*/
public LockRegistryLeaderInitiator(LockRegistry locks, Candidate candidate) {
public LockRegistryLeaderInitiator(LockRegistry<?> locks, Candidate candidate) {
Assert.notNull(locks, "'locks' must not be null");
Assert.notNull(candidate, "'candidate' must not be null");
this.locks = locks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* @since 2.1.1
*
*/
public final class DefaultLockRegistry implements LockRegistry {
public final class DefaultLockRegistry implements LockRegistry<Lock> {

private final Lock[] lockTable;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.locks;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
* A {@link Lock} implementing for spring distributed locks
*
* @author Eddie Cho
*
* @since 6.3
*/
public interface DistributedLock extends Lock {

/**
* Attempt to acquire a lock with a specific time-to-live
* @param time the maximum time to wait for the lock unit
* @param unit the time unit of the time argument
* @param customTtl the specific time-to-live for the lock status data
* @param customTtlUnit the time unit of the customTtl argument
* @return true if the lock was acquired and false if the waiting time elapsed before the lock was acquired
* @throws InterruptedException -
* if the current thread is interrupted while acquiring the lock (and interruption of lock acquisition is supported)
*/
boolean tryLock(long time, TimeUnit unit, long customTtl, TimeUnit customTtlUnit) throws InterruptedException;

/**
* Attempt to acquire a lock with a specific time-to-live
* @param customTtl the specific time-to-live for the lock status data
* @param customTtlUnit the time unit of the customTtl argument
*/
void lock(long customTtl, TimeUnit customTtlUnit);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,15 +16,18 @@

package org.springframework.integration.support.locks;

import java.util.concurrent.locks.Lock;

/**
* A {@link LockRegistry} implementing this interface supports the removal of aged locks
* that are not currently locked.
* @param <L> The expected class of the lock implementation
*
* @author Gary Russell
* @since 4.2
*
*/
public interface ExpirableLockRegistry extends LockRegistry {
public interface ExpirableLockRegistry<L extends Lock> extends LockRegistry<L> {

/**
* Remove locks last acquired more than 'age' ago that are not currently locked.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@

/**
* Strategy for maintaining a registry of shared locks.
* @param <L> The expected class of the lock implementation
*
* @author Oleg Zhurakousky
* @author Gary Russell
Expand All @@ -34,14 +35,14 @@
* @since 2.1.1
*/
@FunctionalInterface
public interface LockRegistry {
public interface LockRegistry<L extends Lock> {

/**
* Obtain the lock associated with the parameter object.
* @param lockKey The object with which the lock is associated.
* @return The associated lock.
*/
Lock obtain(Object lockKey);
L obtain(Object lockKey);

/**
* Perform the provided task when the lock for the key is locked.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,7 +33,7 @@
* @since 2.2
*
*/
public final class PassThruLockRegistry implements LockRegistry {
public final class PassThruLockRegistry implements LockRegistry<Lock> {

@Override
public Lock obtain(Object lockKey) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,16 +16,19 @@

package org.springframework.integration.support.locks;

import java.util.concurrent.locks.Lock;

/**
* A {@link LockRegistry} implementing this interface supports the renewal
* of the time to live of a lock.
* @param <L> The expected class of the lock implementation
*
* @author Alexandre Strubel
* @author Artem Bilan
*
* @since 5.4
*/
public interface RenewableLockRegistry extends LockRegistry {
public interface RenewableLockRegistry<L extends Lock> extends LockRegistry<L> {

/**
* Renew the time to live of the lock is associated with the parameter object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class LockRegistryLeaderInitiatorTests {

private CountDownLatch revoked = new CountDownLatch(1);

private final LockRegistry registry = new DefaultLockRegistry();
private final LockRegistry<Lock> registry = new DefaultLockRegistry();

private final LockRegistryLeaderInitiator initiator =
new LockRegistryLeaderInitiator(this.registry, new DefaultCandidate());
Expand Down Expand Up @@ -159,6 +159,7 @@ public void publishOnGranted(Object source, Context context, String role) {
}

@Test
@SuppressWarnings("rawtypes")
public void competingWithLock() throws Exception {
// switch used to toggle which registry obtains lock
AtomicBoolean firstLocked = new AtomicBoolean(true);
Expand Down Expand Up @@ -220,6 +221,7 @@ public void competingWithLock() throws Exception {
}

@Test
@SuppressWarnings("rawtypes")
public void testGracefulLeaderSelectorExit() throws Exception {
AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>();

Expand Down Expand Up @@ -275,7 +277,7 @@ public void testExceptionFromLock() throws Exception {
}
}).given(mockLock).tryLock(anyLong(), any(TimeUnit.class));

LockRegistry registry = lockKey -> mockLock;
LockRegistry<Lock> registry = lockKey -> mockLock;

CountDownLatch onGranted = new CountDownLatch(1);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,7 +56,7 @@ public void testBadMaskOutOfRange() { // 32bits

@Test
public void testSingleLockCreation() {
LockRegistry registry = new DefaultLockRegistry(0);
LockRegistry<Lock> registry = new DefaultLockRegistry(0);
Lock a = registry.obtain(23);
Lock b = registry.obtain(new Object());
Lock c = registry.obtain("hello");
Expand All @@ -67,7 +67,7 @@ public void testSingleLockCreation() {

@Test
public void testSame() {
LockRegistry registry = new DefaultLockRegistry();
LockRegistry<Lock> registry = new DefaultLockRegistry();
Lock lock1 = registry.obtain(new Object() {

@Override
Expand All @@ -87,7 +87,7 @@ public int hashCode() {

@Test
public void testDifferent() {
LockRegistry registry = new DefaultLockRegistry();
LockRegistry<Lock> registry = new DefaultLockRegistry();
Lock lock1 = registry.obtain(new Object() {

@Override
Expand All @@ -107,7 +107,7 @@ public int hashCode() {

@Test
public void testAllDifferentAndSame() {
LockRegistry registry = new DefaultLockRegistry(3);
LockRegistry<Lock> registry = new DefaultLockRegistry(3);
Lock[] locks = new Lock[4];
locks[0] = registry.obtain(new Object() {

Expand Down Expand Up @@ -213,7 +213,7 @@ public int hashCode() {

@Test
public void cyclicBarrierIsBrokenWhenExecutedConcurrentlyInLock() throws Exception {
LockRegistry registry = new DefaultLockRegistry(1);
LockRegistry<Lock> registry = new DefaultLockRegistry(1);

CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
CountDownLatch brokenBarrierLatch = new CountDownLatch(2);
Expand Down Expand Up @@ -245,7 +245,7 @@ public void cyclicBarrierIsBrokenWhenExecutedConcurrentlyInLock() throws Excepti

@Test
public void executeLockedIsTimedOutInOtherThread() throws Exception {
LockRegistry registry = new DefaultLockRegistry(1);
LockRegistry<Lock> registry = new DefaultLockRegistry(1);

String lockKey = "lockKey";
Duration waitLockDuration = Duration.ofMillis(100);
Expand Down
Loading
Loading