diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index b3ce26ea2..22fc78eae 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -34,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; @@ -68,7 +67,6 @@ import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.StatsOperation; import net.spy.memcached.ops.StoreType; -import net.spy.memcached.plugin.LocalCacheManager; import net.spy.memcached.transcoders.TranscodeService; import net.spy.memcached.transcoders.Transcoder; @@ -127,7 +125,6 @@ public class MemcachedClient extends SpyThread private volatile boolean running = true; private volatile boolean shuttingDown = false; - protected LocalCacheManager localCacheManager = null; protected final long operationTimeout; @@ -1104,7 +1101,7 @@ public Object get(String key) { */ public BulkFuture> asyncGetBulk(Collection keys, Iterator> tc_iter) { - final Map> m = new ConcurrentHashMap>(); + final Map> rvMap = new ConcurrentHashMap>(); // This map does not need to be a ConcurrentHashMap // because it is fully populated when it is used and @@ -1114,29 +1111,15 @@ public BulkFuture> asyncGetBulk(Collection keys, // Break the gets down into groups by key final Map>> chunks = new HashMap>>(); - Iterator key_iter = keys.iterator(); - while (key_iter.hasNext() && tc_iter.hasNext()) { - String key = key_iter.next(); - Transcoder tc = tc_iter.next(); - // FIXME This should be refactored... - // And the original front-cache implementations are really weird :-( - if (localCacheManager != null) { - final T cachedData = localCacheManager.get(key, tc); - if (cachedData != null) { - m.put(key, new LocalCacheManager.Task(new Callable() { - public T call() throws Exception { - return cachedData; - } - })); - continue; - } - } + Iterator keyIter = keys.iterator(); + while (keyIter.hasNext() && tc_iter.hasNext()) { + String key = keyIter.next(); + Transcoder tc = tc_iter.next(); tc_map.put(key, tc); validateKey(key); addKeyToChunk(chunks, key, conn.findNodeByKey(key)); } - int wholeChunkSize = getWholeChunkSize(chunks); final CountDownLatch latch = new CountDownLatch(wholeChunkSize); final Collection ops = new ArrayList(wholeChunkSize); @@ -1151,7 +1134,7 @@ public void receivedStatus(OperationStatus status) { public void gotData(String k, int flags, byte[] data) { Transcoder tc = tc_map.get(k); - m.put(k, tcService.decode(tc, + rvMap.put(k, tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()))); } @@ -1178,7 +1161,7 @@ public void complete() { ops.add(op); } } - return new BulkGetFuture(m, ops, latch, localCacheManager); + return new BulkGetFuture(rvMap, ops, latch); } /** @@ -2374,14 +2357,4 @@ int getAddedQueueSize() { Collection getAllNodes() { return conn.getLocator().getAll(); } - - /** - * get current local cache manager - * - * @return current local cache manager - */ - public LocalCacheManager getLocalCacheManager() { - return localCacheManager; - } - } diff --git a/src/main/java/net/spy/memcached/internal/BulkFuture.java b/src/main/java/net/spy/memcached/internal/BulkFuture.java index 2e7bc1090..9afa7e4b0 100644 --- a/src/main/java/net/spy/memcached/internal/BulkFuture.java +++ b/src/main/java/net/spy/memcached/internal/BulkFuture.java @@ -42,4 +42,7 @@ public interface BulkFuture extends Future { public V getSome(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException; + + public int getOpsCount(); + } diff --git a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java index 9bcc70db8..b43632f21 100644 --- a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java +++ b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java @@ -31,7 +31,6 @@ import net.spy.memcached.compat.log.LoggerFactory; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; -import net.spy.memcached.plugin.LocalCacheManager; /** * Future for handling results from bulk gets. @@ -47,9 +46,6 @@ public class BulkGetFuture implements BulkFuture> { private boolean cancelled = false; private boolean timeout = false; - // FIXME right position? - private LocalCacheManager localCacheManager; - public BulkGetFuture(Map> m, Collection getOps, CountDownLatch l) { super(); @@ -58,16 +54,14 @@ public BulkGetFuture(Map> m, latch = l; } - public BulkGetFuture(Map> m, - Collection getOps, CountDownLatch l, - LocalCacheManager lcm) { + public BulkGetFuture(BulkGetFuture other) { super(); - rvMap = m; - ops = getOps; - latch = l; - localCacheManager = lcm; + rvMap = other.rvMap; + ops = other.ops; + latch = other.latch; } + @Override public boolean cancel(boolean ign) { boolean rv = false; for (Operation op : ops) { @@ -81,6 +75,7 @@ public boolean cancel(boolean ign) { return rv; } + @Override public Map get() throws InterruptedException, ExecutionException { try { return get(Long.MAX_VALUE, TimeUnit.MILLISECONDS); @@ -89,6 +84,7 @@ public Map get() throws InterruptedException, ExecutionException { } } + @Override public Map getSome(long duration, TimeUnit units) throws InterruptedException, ExecutionException { Collection timedoutOps = new HashSet(); @@ -108,6 +104,7 @@ public Map getSome(long duration, TimeUnit units) * * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) */ + @Override public Map get(long duration, TimeUnit units) throws InterruptedException, ExecutionException, TimeoutException { Collection timedoutOps = new HashSet(); @@ -119,6 +116,21 @@ public Map get(long duration, TimeUnit units) return ret; } + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public boolean isDone() { + return latch.getCount() == 0; + } + + @Override + public int getOpsCount() { + return ops.size(); + } + /** * refactored code common to both get(long, TimeUnit) and getSome(long, * TimeUnit) @@ -148,34 +160,17 @@ private Map internalGet(long to, TimeUnit unit, throw new ExecutionException(op.getException()); } } - Map m = new HashMap(); + + Map resultMap = new HashMap(); for (Map.Entry> me : rvMap.entrySet()) { String key = me.getKey(); Future future = me.getValue(); T value = future.get(); - // put the key into the result map. - m.put(key, value); - - // cache the key locally - if (localCacheManager != null) { - // iff it is from the remote cache. - if (!(future instanceof LocalCacheManager.Task)) { - localCacheManager.put(key, value); - } - } + resultMap.put(key, value); } - return m; - } - - public boolean isCancelled() { - return cancelled; + return resultMap; } - - public boolean isDone() { - return latch.getCount() == 0; - } - /* * set to true if timeout was reached. * diff --git a/src/main/java/net/spy/memcached/plugin/FrontCacheBulkGetFuture.java b/src/main/java/net/spy/memcached/plugin/FrontCacheBulkGetFuture.java new file mode 100644 index 000000000..274a39c7d --- /dev/null +++ b/src/main/java/net/spy/memcached/plugin/FrontCacheBulkGetFuture.java @@ -0,0 +1,76 @@ +package net.spy.memcached.plugin; + +import net.spy.memcached.OperationTimeoutException; +import net.spy.memcached.internal.BulkGetFuture; + +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static net.spy.memcached.DefaultConnectionFactory.DEFAULT_OPERATION_TIMEOUT; + +public class FrontCacheBulkGetFuture extends BulkGetFuture { + private final LocalCacheManager localCacheManager; + + private final Map localCachedData; + + private Map result = null; + + public FrontCacheBulkGetFuture(LocalCacheManager localCacheManager, + BulkGetFuture parentFuture, + Map localCachedData) { + super(parentFuture); + this.localCacheManager = localCacheManager; + this.localCachedData = localCachedData; + } + + @Override + public Map get() throws InterruptedException, ExecutionException { + try { + return get(DEFAULT_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new OperationTimeoutException(e); + } + } + + @Override + public Map get(long duration, TimeUnit units) + throws InterruptedException, ExecutionException, TimeoutException { + if (result == null) { + try { + result = super.get(duration, units); + } catch (TimeoutException e) { + throw new OperationTimeoutException(e); + } + putLocalCache(result); + result.putAll(localCachedData); + } + return result; + } + + @Override + public Map getSome(long duration, TimeUnit units) + throws InterruptedException, ExecutionException { + if (result != null) { + return result; + } + Map getSomeResult = super.getSome(duration, units); + if (getSomeResult.size() == getOpsCount()) { + result = getSomeResult; + } + putLocalCache(getSomeResult); + getSomeResult.putAll(localCachedData); + return getSomeResult; + } + + private void putLocalCache(Map noneCachedValue) { + for (Map.Entry entry : noneCachedValue.entrySet()) { + String key = entry.getKey(); + T value = entry.getValue(); + if (value != null) { + localCacheManager.put(key, value); + } + } + } +} diff --git a/src/main/java/net/spy/memcached/plugin/FrontCacheMemcachedClient.java b/src/main/java/net/spy/memcached/plugin/FrontCacheMemcachedClient.java index d50dcd5ca..2f5f4eaa6 100644 --- a/src/main/java/net/spy/memcached/plugin/FrontCacheMemcachedClient.java +++ b/src/main/java/net/spy/memcached/plugin/FrontCacheMemcachedClient.java @@ -18,11 +18,16 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Collection; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.Map; +import java.util.Iterator; +import java.util.Arrays; +import java.util.HashMap; import net.spy.memcached.ConnectionFactory; import net.spy.memcached.MemcachedClient; @@ -31,6 +36,9 @@ import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.StatusCode; +import net.spy.memcached.internal.BulkFuture; +import net.spy.memcached.internal.BulkGetFuture; +import net.spy.memcached.internal.SingleElementInfiniteIterator; import net.spy.memcached.transcoders.Transcoder; /** @@ -46,6 +54,8 @@ */ public class FrontCacheMemcachedClient extends MemcachedClient { + protected LocalCacheManager localCacheManager = null; + /** * Create the memcached client and the front cache. * @@ -175,6 +185,185 @@ public OperationStatus getStatus() { return new FrontCacheGetFuture(localCacheManager, key, parent); } + /** + * Get the values for multiple keys from the cache. + * + * @param keys the keys + * @return a map of the values (for each value that exists) + * @throws OperationTimeoutException if the global operation timeout is + * exceeded + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public Map getBulk(Collection keys) { + return getBulk(keys, transcoder); + } + + /** + * Get the values for multiple keys from the cache. + * + * @param + * @param tc the transcoder to serialize and unserialize value + * @param keys the keys + * @return a map of the values (for each value that exists) + * @throws OperationTimeoutException if the global operation timeout is + * exceeded + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public Map getBulk(Transcoder tc, String... keys) { + return getBulk(Arrays.asList(keys), tc); + } + + /** + * Get the values for multiple keys from the cache. + * + * @param keys the keys + * @return a map of the values (for each value that exists) + * @throws OperationTimeoutException if the global operation timeout is + * exceeded + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public Map getBulk(String... keys) { + return getBulk(Arrays.asList(keys), transcoder); + } + + /** + * Get the values for multiple keys from the cache. + * + * @param + * @param keys the keys + * @param tc the transcoder to serialize and unserialize value + * @return a map of the values (for each value that exists) + * @throws OperationTimeoutException if the global operation timeout is + * exceeded + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public Map getBulk(Collection keys, + Transcoder tc) { + BulkFuture> future = asyncGetBulk(keys, tc); + try { + return future.get(operationTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + future.cancel(true); + throw new RuntimeException("Interrupted getting bulk values", e); + } catch (ExecutionException e) { + future.cancel(true); + throw new RuntimeException("Failed getting bulk values", e); + } catch (TimeoutException e) { + future.cancel(true); + throw new OperationTimeoutException(e); + } + } + + /** + * Asynchronously get a bunch of objects from the cache. + * + * @param + * @param keys the keys to request + * @param tc the transcoder to serialize and unserialize values + * @return a Future result of that fetch + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public BulkFuture> asyncGetBulk(Collection keys, Transcoder tc) { + return asyncGetBulk(keys, new SingleElementInfiniteIterator>(tc)); + } + + /** + * Asynchronously get a bunch of objects from the cache and decode them + * with the given transcoder. + * + * @param keys the keys to request + * @return a Future result of that fetch + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public BulkFuture> asyncGetBulk(Collection keys) { + return asyncGetBulk(keys, transcoder); + } + + /** + * Varargs wrapper for asynchronous bulk get. + * + * @param + * @param tc the transcoder to serialize and unserialize value + * @param keys one more keys to get + * @return the future values of those keys + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public BulkFuture> asyncGetBulk(Transcoder tc, + String... keys) { + return asyncGetBulk(Arrays.asList(keys), tc); + } + + /** + * Varargs wrapper for asynchronous bulk get with the default transcoder. + * + * @param keys one more keys to get + * @return the future values of those keys + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public BulkFuture> asyncGetBulk(String... keys) { + return asyncGetBulk(Arrays.asList(keys), transcoder); + } + + /** + * Asynchronously gets (with CAS support) a bunch of objects from the cache. + * If used with front cache, the front cache is checked first. + * @param + * @param keys the keys to request + * @param tc_iter an iterator of transcoders to serialize and + * unserialize values; the transcoders are matched with + * the keys in the same order. The minimum of the key + * collection length and number of transcoders is used + * and no exception is thrown if they do not match + * @return a Future result of that fetch + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + @Override + public BulkFuture> asyncGetBulk(Collection keys, + Iterator> tc_iter) { + /* + * Case 1. local cache is not used. + * All data from Arcus server. + * */ + if (localCacheManager == null) { + return super.asyncGetBulk(keys, tc_iter); + } + /* + * Case 2. local cache is used. + * 1. Check the local cache first. + * */ + final Map frontCacheHit = new HashMap(); + final Map> frontCacheMiss = + new HashMap>(); + + Iterator keyIter = keys.iterator(); + while (keyIter.hasNext() && tc_iter.hasNext()) { + String key = keyIter.next(); + Transcoder tc = tc_iter.next(); + T value = localCacheManager.get(key, tc); + if (value != null) { + frontCacheHit.put(key, value); + continue; + } + frontCacheMiss.put(key, tc); + } + /* + * 2. Send the cache miss keys to Arcus server. + * */ + BulkGetFuture parent = (BulkGetFuture) super.asyncGetBulk( + frontCacheMiss.keySet(), frontCacheMiss.values().iterator()); + + return new FrontCacheBulkGetFuture(localCacheManager, parent, frontCacheHit); + } + /** * Delete the key. * Delete the key from the local cache before sending the command to the server. @@ -190,4 +379,7 @@ public OperationFuture delete(String key) { return super.delete(key); } + public LocalCacheManager getLocalCacheManager() { + return localCacheManager; + } }