Skip to content

Commit

Permalink
Added unit test around semaphore logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Harmandeep Singh committed Oct 6, 2020
1 parent a6cf9aa commit e59dc21
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package com.qubole.rubix.spi;

import com.google.common.annotations.VisibleForTesting;
import com.qubole.rubix.spi.fop.ObjectPool;
import com.qubole.rubix.spi.fop.Poolable;
import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -84,6 +85,12 @@ protected <V> V retryConnection(Callable<V> callable)
throw new TException();
}

@VisibleForTesting
public Poolable<TTransport> getTransportPoolable()
{
return transportPoolable;
}

@Override
public void close()
{
Expand Down
20 changes: 8 additions & 12 deletions rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,15 @@ public Poolable<T> borrowObject(String host)
}
}
log.debug(this.name + " : Borrowing object for partition: " + host);
for (int i = 0; i < 3; i++) { // try at most three times
Poolable<T> result = getObject(host);
if (result == null) {
continue;
}
else if (factory.validate(result.getObject())) {
return result;
}
else {
this.hostToPoolMap.get(host).decreaseObject(result);
}
Poolable<T> result = getObject(host);
if (result == null) {
throw new RuntimeException("Unable to find a free object from connection pool: " + this.name);
}
else if (!factory.validate(result.getObject())) {
this.hostToPoolMap.get(host).decreaseObject(result);
throw new RuntimeException("Cannot find a valid object from connection pool: " + this.name);
}
throw new RuntimeException("Cannot find a valid object");
return result;
}

private Poolable<T> getObject(String host)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ public Poolable<T> getObject()
{
Poolable<T> object;
try {
takeSemaphore.tryAcquire(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS);
if (!takeSemaphore.tryAcquire(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS)) {
// Not able to acquire semaphore in the given timeout, return null
return null;
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package com.qubole.rubix.spi;

import com.qubole.rubix.spi.fop.Poolable;
import com.qubole.rubix.spi.thrift.BookKeeperService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -21,6 +22,7 @@
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand All @@ -30,7 +32,9 @@

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class TestBookKeeperFactory
{
Expand Down Expand Up @@ -146,6 +150,43 @@ public void testCreateBookKeeperClient_startDelay_unableToConnect_socketTimeout(
client.isBookKeeperAlive(); // should throw expected exception due to socket timeout
}

@Test
public void testConnectionPoolSemaphoreLogic() throws TException, InterruptedException
{
final int connectTimeout = 500;
final int socketTimeout = 500;

// Create a connection pool of size = 1
conf.setInt("rubix.pool.size.max", 1);
server = startMockServer(true, NO_DELAY, NO_DELAY);

RetryingPooledBookkeeperClient bookKeeperClient = createTestBookKeeperClient(socketTimeout, connectTimeout, 3);
assertTrue(bookKeeperClient.isBookKeeperAlive(), "Unable to connect to bookkeeper");

Poolable<TTransport> transportPoolable = bookKeeperClient.getTransportPoolable();

try {
bookKeeperFactory.createBookKeeperClient("localhost", conf);
}
catch (Exception e) {
assertEquals(e.getMessage(), "Unable to find a free object from connection pool: bks-pool");

// close the client which should have added back the free connection the pool.
bookKeeperClient.close();

bookKeeperClient = bookKeeperFactory.createBookKeeperClient("localhost", conf);
assertTrue(bookKeeperClient.isBookKeeperAlive(), "Unable to connect to bookkeeper");

// Verify that the pool return the same connection instead of creating the new one.
assertEquals(transportPoolable.getObject(), bookKeeperClient.getTransportPoolable().getObject(), "Same connection should be reused from the pool");
return;
}
finally {
stopMockServer();
}
fail("Expected exception to be thrown while creating bookkeeper client");
}

private MockBookKeeperServer startMockServer(boolean waitForStart, int startDelay, int aliveCallDelay) throws InterruptedException
{
MockBookKeeperServer server = new MockBookKeeperServer(startDelay, aliveCallDelay);
Expand Down

0 comments on commit e59dc21

Please sign in to comment.