diff --git a/rubix-spi/src/main/java/com/qubole/rubix/spi/RetryingPooledThriftClient.java b/rubix-spi/src/main/java/com/qubole/rubix/spi/RetryingPooledThriftClient.java index 1064cabe..e55cac88 100644 --- a/rubix-spi/src/main/java/com/qubole/rubix/spi/RetryingPooledThriftClient.java +++ b/rubix-spi/src/main/java/com/qubole/rubix/spi/RetryingPooledThriftClient.java @@ -12,6 +12,7 @@ */ package com.qubole.rubix.spi; +import com.google.common.annotations.VisibleForTesting; import com.qubole.rubix.spi.fop.ObjectPool; import com.qubole.rubix.spi.fop.Poolable; import org.apache.commons.logging.Log; @@ -84,6 +85,12 @@ protected V retryConnection(Callable callable) throw new TException(); } + @VisibleForTesting + public Poolable getTransportPoolable() + { + return transportPoolable; + } + @Override public void close() { diff --git a/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPool.java b/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPool.java index b98d4787..85158d4b 100755 --- a/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPool.java +++ b/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPool.java @@ -75,19 +75,15 @@ public Poolable borrowObject(String host) } } log.debug(this.name + " : Borrowing object for partition: " + host); - for (int i = 0; i < 3; i++) { // try at most three times - Poolable result = getObject(host); - if (result == null) { - continue; - } - else if (factory.validate(result.getObject())) { - return result; - } - else { - this.hostToPoolMap.get(host).decreaseObject(result); - } + Poolable result = getObject(host); + if (result == null) { + throw new RuntimeException("Unable to find a free object from connection pool: " + this.name); + } + else if (!factory.validate(result.getObject())) { + this.hostToPoolMap.get(host).decreaseObject(result); + throw new RuntimeException("Cannot find a valid object from connection pool: " + this.name); } - throw new RuntimeException("Cannot find a valid object"); + return result; } private Poolable getObject(String host) diff --git a/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java b/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java index dd55cf8b..edf50315 100755 --- a/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java +++ b/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java @@ -107,7 +107,10 @@ public Poolable getObject() { Poolable object; try { - takeSemaphore.tryAcquire(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS); + if (!takeSemaphore.tryAcquire(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS)) { + // Not able to acquire semaphore in the given timeout, return null + return null; + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/rubix-spi/src/test/java/com/qubole/rubix/spi/TestBookKeeperFactory.java b/rubix-spi/src/test/java/com/qubole/rubix/spi/TestBookKeeperFactory.java index 368c968d..6de0673f 100644 --- a/rubix-spi/src/test/java/com/qubole/rubix/spi/TestBookKeeperFactory.java +++ b/rubix-spi/src/test/java/com/qubole/rubix/spi/TestBookKeeperFactory.java @@ -12,6 +12,7 @@ */ package com.qubole.rubix.spi; +import com.qubole.rubix.spi.fop.Poolable; import com.qubole.rubix.spi.thrift.BookKeeperService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -21,6 +22,7 @@ import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -30,7 +32,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; public class TestBookKeeperFactory { @@ -146,6 +150,43 @@ public void testCreateBookKeeperClient_startDelay_unableToConnect_socketTimeout( client.isBookKeeperAlive(); // should throw expected exception due to socket timeout } + @Test + public void testConnectionPoolSemaphoreLogic() throws TException, InterruptedException + { + final int connectTimeout = 500; + final int socketTimeout = 500; + + // Create a connection pool of size = 1 + conf.setInt("rubix.pool.size.max", 1); + server = startMockServer(true, NO_DELAY, NO_DELAY); + + RetryingPooledBookkeeperClient bookKeeperClient = createTestBookKeeperClient(socketTimeout, connectTimeout, 3); + assertTrue(bookKeeperClient.isBookKeeperAlive(), "Unable to connect to bookkeeper"); + + Poolable transportPoolable = bookKeeperClient.getTransportPoolable(); + + try { + bookKeeperFactory.createBookKeeperClient("localhost", conf); + } + catch (Exception e) { + assertEquals(e.getMessage(), "Unable to find a free object from connection pool: bks-pool"); + + // close the client which should have added back the free connection the pool. + bookKeeperClient.close(); + + bookKeeperClient = bookKeeperFactory.createBookKeeperClient("localhost", conf); + assertTrue(bookKeeperClient.isBookKeeperAlive(), "Unable to connect to bookkeeper"); + + // Verify that the pool return the same connection instead of creating the new one. + assertEquals(transportPoolable.getObject(), bookKeeperClient.getTransportPoolable().getObject(), "Same connection should be reused from the pool"); + return; + } + finally { + stopMockServer(); + } + fail("Expected exception to be thrown while creating bookkeeper client"); + } + private MockBookKeeperServer startMockServer(boolean waitForStart, int startDelay, int aliveCallDelay) throws InterruptedException { MockBookKeeperServer server = new MockBookKeeperServer(startDelay, aliveCallDelay);