diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index c942397c2..f5ff18a33 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -44,7 +44,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.Callable; import net.spy.memcached.auth.AuthDescriptor; import net.spy.memcached.auth.AuthThreadMonitor; @@ -1118,29 +1117,16 @@ public BulkFuture> asyncGetBulk(Collection keys, // Break the gets down into groups by key final Map>> chunks = new HashMap>>(); - Iterator key_iter = keys.iterator(); - while (key_iter.hasNext() && tc_iter.hasNext()) { - String key = key_iter.next(); - Transcoder tc = tc_iter.next(); - // FIXME This should be refactored... - // And the original front-cache implementations are really weird :-( - if (localCacheManager != null) { - final T cachedData = localCacheManager.get(key, tc); - if (cachedData != null) { - m.put(key, new LocalCacheManager.Task(new Callable() { - public T call() throws Exception { - return cachedData; - } - })); - continue; - } - } + Iterator keyIter = keys.iterator(); + + while (keyIter.hasNext() && tc_iter.hasNext()) { + String key = keyIter.next(); + Transcoder tc = tc_iter.next(); tc_map.put(key, tc); validateKey(key); addKeyToChunk(chunks, key, conn.findNodeByKey(key)); } - int wholeChunkSize = getWholeChunkSize(chunks); final CountDownLatch latch = new CountDownLatch(wholeChunkSize); final Collection ops = new ArrayList(wholeChunkSize); @@ -1182,7 +1168,7 @@ public void complete() { ops.add(op); } } - return new BulkGetFuture(m, ops, latch, localCacheManager); + return new BulkGetFuture(m, ops, latch); } /** diff --git a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java index 9bcc70db8..e256358be 100644 --- a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java +++ b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java @@ -31,7 +31,6 @@ import net.spy.memcached.compat.log.LoggerFactory; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; -import net.spy.memcached.plugin.LocalCacheManager; /** * Future for handling results from bulk gets. @@ -47,9 +46,6 @@ public class BulkGetFuture implements BulkFuture> { private boolean cancelled = false; private boolean timeout = false; - // FIXME right position? - private LocalCacheManager localCacheManager; - public BulkGetFuture(Map> m, Collection getOps, CountDownLatch l) { super(); @@ -58,16 +54,6 @@ public BulkGetFuture(Map> m, latch = l; } - public BulkGetFuture(Map> m, - Collection getOps, CountDownLatch l, - LocalCacheManager lcm) { - super(); - rvMap = m; - ops = getOps; - latch = l; - localCacheManager = lcm; - } - public boolean cancel(boolean ign) { boolean rv = false; for (Operation op : ops) { @@ -148,24 +134,16 @@ private Map internalGet(long to, TimeUnit unit, throw new ExecutionException(op.getException()); } } - Map m = new HashMap(); + Map decodedMap = new HashMap(); + for (Map.Entry> me : rvMap.entrySet()) { String key = me.getKey(); Future future = me.getValue(); T value = future.get(); - // put the key into the result map. - m.put(key, value); - - // cache the key locally - if (localCacheManager != null) { - // iff it is from the remote cache. - if (!(future instanceof LocalCacheManager.Task)) { - localCacheManager.put(key, value); - } - } + decodedMap.put(key, value); } - return m; + return decodedMap; } public boolean isCancelled() { @@ -184,4 +162,16 @@ public boolean isDone() { public boolean isTimeout() { return timeout; } + + public Collection getOps() { + return ops; + } + + public CountDownLatch getLatch() { + return latch; + } + + public Map> getRvMap() { + return rvMap; + } } diff --git a/src/main/java/net/spy/memcached/plugin/FrontCacheBulkGetFuture.java b/src/main/java/net/spy/memcached/plugin/FrontCacheBulkGetFuture.java new file mode 100644 index 000000000..b32baf147 --- /dev/null +++ b/src/main/java/net/spy/memcached/plugin/FrontCacheBulkGetFuture.java @@ -0,0 +1,60 @@ +package net.spy.memcached.plugin; + +import net.spy.memcached.internal.BulkGetFuture; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class FrontCacheBulkGetFuture extends BulkGetFuture { + private final LocalCacheManager localCacheManager; + + private final Map localCachedData = new HashMap(); + + public FrontCacheBulkGetFuture(BulkGetFuture parentFuture, + LocalCacheManager localCacheManager) { + super(parentFuture.getRvMap(), parentFuture.getOps(), parentFuture.getLatch()); + this.localCacheManager = localCacheManager; + } + + public Map get() throws InterruptedException, ExecutionException { + Map noneCachedValue = null; + try { + noneCachedValue = super.get(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + putLocalCache(noneCachedValue); + noneCachedValue.putAll(localCachedData); + return noneCachedValue; + } + + public Map get(long duration, TimeUnit units) + throws InterruptedException, ExecutionException, TimeoutException { + Map noneCachedValue = null; + try { + noneCachedValue = super.get(duration, units); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + putLocalCache(noneCachedValue); + noneCachedValue.putAll(localCachedData); + return noneCachedValue; + } + + public void addLocalCacheData(String key, T value) { + localCachedData.put(key, value); + } + + private void putLocalCache(Map noneCachedValue) { + for (Map.Entry entry : noneCachedValue.entrySet()) { + String key = entry.getKey(); + T value = entry.getValue(); + if (value != null) { + localCacheManager.put(key, value); + } + } + } +} diff --git a/src/main/java/net/spy/memcached/plugin/FrontCacheMemcachedClient.java b/src/main/java/net/spy/memcached/plugin/FrontCacheMemcachedClient.java index 9694c5991..a850b324b 100644 --- a/src/main/java/net/spy/memcached/plugin/FrontCacheMemcachedClient.java +++ b/src/main/java/net/spy/memcached/plugin/FrontCacheMemcachedClient.java @@ -18,13 +18,21 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Iterator; +import java.util.Arrays; +import java.util.concurrent.ConcurrentHashMap; import net.sf.ehcache.Element; import net.spy.memcached.ConnectionFactory; import net.spy.memcached.MemcachedClient; import net.spy.memcached.internal.GetFuture; import net.spy.memcached.internal.OperationFuture; +import net.spy.memcached.internal.BulkFuture; +import net.spy.memcached.internal.BulkGetFuture; +import net.spy.memcached.internal.SingleElementInfiniteIterator; import net.spy.memcached.transcoders.Transcoder; /** @@ -88,6 +96,71 @@ public GetFuture asyncGet(final String key, final Transcoder tc) { return new FrontCacheGetFuture(frontElement); } } + @Override + public BulkFuture> asyncGetBulk(Collection keys, + Iterator> tc_iter) { + /* + * Case 1. local cache is not used. + * All data from Arcus server. + * */ + if (localCacheManager == null) { + return super.asyncGetBulk(keys, tc_iter); + } + + final Map frontCacheHit = new ConcurrentHashMap(); + final Map> frontCacheMiss = + new ConcurrentHashMap>(); + + Iterator keyIter = keys.iterator(); + + while (keyIter.hasNext() && tc_iter.hasNext()) { + String key = keyIter.next(); + Transcoder tc = tc_iter.next(); + T decodedValue = localCacheManager.get(key, tc); + if (decodedValue != null) { + frontCacheHit.put(key, decodedValue); + continue; + } + frontCacheMiss.put(key, tc); + } + /* + * Case 2. local cache is used + * 1. Get cached data from local cache. + * 2. Get data from Arcus server. + * 3. Store data coming from Arucs server to local cache. + * 4. Merge data from local cache and Arcus server. + * */ + BulkGetFuture parent = (BulkGetFuture) super.asyncGetBulk( + frontCacheMiss.keySet(), frontCacheMiss.values().iterator()); + + FrontCacheBulkGetFuture rv = new FrontCacheBulkGetFuture(parent, localCacheManager); + + for (Map.Entry entry : frontCacheHit.entrySet()) { + String key = entry.getKey(); + T value = entry.getValue(); + rv.addLocalCacheData(key, value); + } + + return rv; + } + + public BulkFuture> asyncGetBulk(Collection keys, + Transcoder tc) { + return asyncGetBulk(keys, new SingleElementInfiniteIterator>(tc)); + } + + public BulkFuture> asyncGetBulk(Collection keys) { + return asyncGetBulk(keys, transcoder); + } + + public BulkFuture> asyncGetBulk(Transcoder tc, + String... keys) { + return asyncGetBulk(Arrays.asList(keys), tc); + } + + public BulkFuture> asyncGetBulk(String... keys) { + return asyncGetBulk(Arrays.asList(keys), transcoder); + } /** * Delete the key. diff --git a/src/test/manual/net/spy/memcached/frontcache/LocalCacheManagerTest.java b/src/test/manual/net/spy/memcached/frontcache/LocalCacheManagerTest.java index 7f07700de..ddf20fbf1 100644 --- a/src/test/manual/net/spy/memcached/frontcache/LocalCacheManagerTest.java +++ b/src/test/manual/net/spy/memcached/frontcache/LocalCacheManagerTest.java @@ -45,7 +45,7 @@ protected void setUp() throws Exception { ConnectionFactoryBuilder cfb = new ConnectionFactoryBuilder(); cfb.setFrontCacheExpireTime(5); cfb.setMaxFrontCacheElements(10); - client = ArcusClient.createArcusClient("127.0.0.1:2181", "test", cfb); + client = ArcusClient.createArcusClient("127.0.0.1:2191", "brido", cfb); } @Override