From e6ff79776ca420481ebfb2ac8e7ea75df45fbcdb Mon Sep 17 00:00:00 2001 From: Harmandeep Singh Date: Fri, 10 Jul 2020 14:18:26 +0530 Subject: [PATCH] Added rubix config for connection pool scavanger - removed synchronized block from increaseObjects - refactored borrowObject method --- .../src/main/java/com/qubole/rubix/spi/BookKeeperFactory.java | 2 +- rubix-spi/src/main/java/com/qubole/rubix/spi/CacheConfig.java | 3 ++- .../java/com/qubole/rubix/spi/DataTransferClientFactory.java | 2 +- .../java/com/qubole/rubix/spi/RetryingPooledThriftClient.java | 2 +- .../src/main/java/com/qubole/rubix/spi/fop/ObjectPool.java | 2 +- .../java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java | 4 +--- .../java/com/qubole/rubix/spi/client/TestPoolingClient.java | 2 +- 7 files changed, 8 insertions(+), 9 deletions(-) diff --git a/rubix-spi/src/main/java/com/qubole/rubix/spi/BookKeeperFactory.java b/rubix-spi/src/main/java/com/qubole/rubix/spi/BookKeeperFactory.java index b81c5e9d..084da4aa 100644 --- a/rubix-spi/src/main/java/com/qubole/rubix/spi/BookKeeperFactory.java +++ b/rubix-spi/src/main/java/com/qubole/rubix/spi/BookKeeperFactory.java @@ -72,7 +72,7 @@ public RetryingPooledBookkeeperClient createBookKeeperClient(String host, Config } else { Poolable obj; - obj = pool.borrowObject(host, conf); + obj = pool.borrowObject(host); RetryingPooledBookkeeperClient retryingBookkeeperClient = new RetryingPooledBookkeeperClient(obj, host, conf); return retryingBookkeeperClient; } diff --git a/rubix-spi/src/main/java/com/qubole/rubix/spi/CacheConfig.java b/rubix-spi/src/main/java/com/qubole/rubix/spi/CacheConfig.java index 7aed3430..a8620112 100644 --- a/rubix-spi/src/main/java/com/qubole/rubix/spi/CacheConfig.java +++ b/rubix-spi/src/main/java/com/qubole/rubix/spi/CacheConfig.java @@ -52,6 +52,7 @@ public class CacheConfig private static final String KEY_POOL_MIN_SIZE = "rubix.pool.size.min"; private static final String KEY_POOL_DELTA_SIZE = "rubix.pool.delta.size"; private static final String KEY_POOL_MAX_WAIT_TIMEOUT = "rubix.pool.wait.timeout"; + private static final String KEY_POOL_SCAVENGER_INTERVAL = "rubix.pool.scavenger.interval"; private static final String KEY_DATA_CACHE_EXPIRY_AFTER_WRITE = "rubix.cache.expiration.after-write"; private static final String KEY_DATA_CACHE_DIR_PREFIX = "rubix.cache.dirprefix.list"; private static final String KEY_DATA_CACHE_DIR_SUFFIX = "rubix.cache.dirsuffix"; @@ -310,7 +311,7 @@ public static int getTransportPoolMaxWait(Configuration conf) public static int getScavengeInterval(Configuration conf) { - return conf.getInt(KEY_POOL_MAX_WAIT_TIMEOUT, DEFAULT_SCAVENGE_INTERVAL); + return conf.getInt(KEY_POOL_SCAVENGER_INTERVAL, DEFAULT_SCAVENGE_INTERVAL); } public static int get(Configuration conf) diff --git a/rubix-spi/src/main/java/com/qubole/rubix/spi/DataTransferClientFactory.java b/rubix-spi/src/main/java/com/qubole/rubix/spi/DataTransferClientFactory.java index d77b55b1..1218e7d9 100644 --- a/rubix-spi/src/main/java/com/qubole/rubix/spi/DataTransferClientFactory.java +++ b/rubix-spi/src/main/java/com/qubole/rubix/spi/DataTransferClientFactory.java @@ -53,7 +53,7 @@ public static DataTransferClient getClient(String host, Configuration conf) } } } - Poolable socketChannelPoolable = pool.borrowObject(host, conf); + Poolable socketChannelPoolable = pool.borrowObject(host); return new DataTransferClient(socketChannelPoolable); } diff --git a/rubix-spi/src/main/java/com/qubole/rubix/spi/RetryingPooledThriftClient.java b/rubix-spi/src/main/java/com/qubole/rubix/spi/RetryingPooledThriftClient.java index df2a045b..1064cabe 100644 --- a/rubix-spi/src/main/java/com/qubole/rubix/spi/RetryingPooledThriftClient.java +++ b/rubix-spi/src/main/java/com/qubole/rubix/spi/RetryingPooledThriftClient.java @@ -76,7 +76,7 @@ protected V retryConnection(Callable callable) // unset transportPoolable so that close() doesnt return it again to pool if borrowObject hits an exception transportPoolable = null; - transportPoolable = objectPool.borrowObject(host, conf); + transportPoolable = objectPool.borrowObject(host); updateClient(transportPoolable); } } diff --git a/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPool.java b/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPool.java index d40788dd..520f1916 100755 --- a/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPool.java +++ b/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPool.java @@ -67,7 +67,7 @@ protected BlockingQueue> createBlockingQueue(PoolConfig poolConfig) return new ArrayBlockingQueue<>(poolConfig.getMaxSize()); } - public Poolable borrowObject(String host, Configuration conf) + public Poolable borrowObject(String host) { if (!hostToPoolMap.containsKey(host)) { synchronized (hostToPoolMap) { diff --git a/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java b/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java index 267b914e..aba2b40c 100755 --- a/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java +++ b/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java @@ -66,8 +66,6 @@ public void returnObject(Poolable object) if (!objectFactory.validate(object.getObject())) { log.debug(String.format("Invalid object...removing: %s ", object)); decreaseObject(object); - // Compensate for the removed object. Needed to prevent endless wait when in parallel a borrowObject is called - increaseObjects(1, false); return; } @@ -115,7 +113,7 @@ public Poolable getObject(boolean blocking) return freeObject; } - private synchronized Poolable increaseObjects(int delta, boolean returnObject) + private Poolable increaseObjects(int delta, boolean returnObject) { int oldCount = totalCount; if (delta + totalCount > config.getMaxSize()) { diff --git a/rubix-spi/src/test/java/com/qubole/rubix/spi/client/TestPoolingClient.java b/rubix-spi/src/test/java/com/qubole/rubix/spi/client/TestPoolingClient.java index 29b798c8..7d5acc32 100644 --- a/rubix-spi/src/test/java/com/qubole/rubix/spi/client/TestPoolingClient.java +++ b/rubix-spi/src/test/java/com/qubole/rubix/spi/client/TestPoolingClient.java @@ -114,7 +114,7 @@ private RetryingPooledThriftTestClient getClient(int retries) retries, conf, "localhost", - pool.borrowObject("localhost", conf)); + pool.borrowObject("localhost")); } private static void startServerAsync(final Configuration conf)