Skip to content

Commit

Permalink
Added rubix config for connection pool scavanger
Browse files Browse the repository at this point in the history
- removed synchronized block from increaseObjects
- refactored borrowObject method
  • Loading branch information
Harmandeep Singh committed Oct 1, 2020
1 parent 9383d53 commit e6ff797
Show file tree
Hide file tree
Showing 7 changed files with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public RetryingPooledBookkeeperClient createBookKeeperClient(String host, Config
}
else {
Poolable<TTransport> obj;
obj = pool.borrowObject(host, conf);
obj = pool.borrowObject(host);
RetryingPooledBookkeeperClient retryingBookkeeperClient = new RetryingPooledBookkeeperClient(obj, host, conf);
return retryingBookkeeperClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class CacheConfig
private static final String KEY_POOL_MIN_SIZE = "rubix.pool.size.min";
private static final String KEY_POOL_DELTA_SIZE = "rubix.pool.delta.size";
private static final String KEY_POOL_MAX_WAIT_TIMEOUT = "rubix.pool.wait.timeout";
private static final String KEY_POOL_SCAVENGER_INTERVAL = "rubix.pool.scavenger.interval";
private static final String KEY_DATA_CACHE_EXPIRY_AFTER_WRITE = "rubix.cache.expiration.after-write";
private static final String KEY_DATA_CACHE_DIR_PREFIX = "rubix.cache.dirprefix.list";
private static final String KEY_DATA_CACHE_DIR_SUFFIX = "rubix.cache.dirsuffix";
Expand Down Expand Up @@ -310,7 +311,7 @@ public static int getTransportPoolMaxWait(Configuration conf)

public static int getScavengeInterval(Configuration conf)
{
return conf.getInt(KEY_POOL_MAX_WAIT_TIMEOUT, DEFAULT_SCAVENGE_INTERVAL);
return conf.getInt(KEY_POOL_SCAVENGER_INTERVAL, DEFAULT_SCAVENGE_INTERVAL);
}

public static int get(Configuration conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static DataTransferClient getClient(String host, Configuration conf)
}
}
}
Poolable<SocketChannel> socketChannelPoolable = pool.borrowObject(host, conf);
Poolable<SocketChannel> socketChannelPoolable = pool.borrowObject(host);
return new DataTransferClient(socketChannelPoolable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected <V> V retryConnection(Callable<V> callable)

// unset transportPoolable so that close() doesnt return it again to pool if borrowObject hits an exception
transportPoolable = null;
transportPoolable = objectPool.borrowObject(host, conf);
transportPoolable = objectPool.borrowObject(host);
updateClient(transportPoolable);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected BlockingQueue<Poolable<T>> createBlockingQueue(PoolConfig poolConfig)
return new ArrayBlockingQueue<>(poolConfig.getMaxSize());
}

public Poolable<T> borrowObject(String host, Configuration conf)
public Poolable<T> borrowObject(String host)
{
if (!hostToPoolMap.containsKey(host)) {
synchronized (hostToPoolMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ public void returnObject(Poolable<T> object)
if (!objectFactory.validate(object.getObject())) {
log.debug(String.format("Invalid object...removing: %s ", object));
decreaseObject(object);
// Compensate for the removed object. Needed to prevent endless wait when in parallel a borrowObject is called
increaseObjects(1, false);
return;
}

Expand Down Expand Up @@ -115,7 +113,7 @@ public Poolable<T> getObject(boolean blocking)
return freeObject;
}

private synchronized Poolable<T> increaseObjects(int delta, boolean returnObject)
private Poolable<T> increaseObjects(int delta, boolean returnObject)
{
int oldCount = totalCount;
if (delta + totalCount > config.getMaxSize()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private RetryingPooledThriftTestClient getClient(int retries)
retries,
conf,
"localhost",
pool.borrowObject("localhost", conf));
pool.borrowObject("localhost"));
}

private static void startServerAsync(final Configuration conf)
Expand Down

0 comments on commit e6ff797

Please sign in to comment.