Skip to content

Commit

Permalink
ENHANCE: Seperate FrontCacheLogic in asyncGetBulk from MemcachedClien…
Browse files Browse the repository at this point in the history
…t to FrontCacheMemcachedClient.
  • Loading branch information
brido4125 committed Jun 20, 2023
1 parent 0119a75 commit f6534e8
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 47 deletions.
26 changes: 6 additions & 20 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.Callable;

import net.spy.memcached.auth.AuthDescriptor;
import net.spy.memcached.auth.AuthThreadMonitor;
Expand Down Expand Up @@ -1118,29 +1117,16 @@ public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys,
// Break the gets down into groups by key
final Map<MemcachedNode, List<Collection<String>>> chunks
= new HashMap<MemcachedNode, List<Collection<String>>>();
Iterator<String> key_iter = keys.iterator();
while (key_iter.hasNext() && tc_iter.hasNext()) {
String key = key_iter.next();
Transcoder<T> tc = tc_iter.next();

// FIXME This should be refactored...
// And the original front-cache implementations are really weird :-(
if (localCacheManager != null) {
final T cachedData = localCacheManager.get(key, tc);
if (cachedData != null) {
m.put(key, new LocalCacheManager.Task<T>(new Callable<T>() {
public T call() throws Exception {
return cachedData;
}
}));
continue;
}
}
Iterator<String> keyIter = keys.iterator();

while (keyIter.hasNext() && tc_iter.hasNext()) {
String key = keyIter.next();
Transcoder<T> tc = tc_iter.next();
tc_map.put(key, tc);
validateKey(key);
addKeyToChunk(chunks, key, conn.findNodeByKey(key));
}

int wholeChunkSize = getWholeChunkSize(chunks);
final CountDownLatch latch = new CountDownLatch(wholeChunkSize);
final Collection<Operation> ops = new ArrayList<Operation>(wholeChunkSize);
Expand Down Expand Up @@ -1182,7 +1168,7 @@ public void complete() {
ops.add(op);
}
}
return new BulkGetFuture<T>(m, ops, latch, localCacheManager);
return new BulkGetFuture<T>(m, ops, latch);
}

/**
Expand Down
42 changes: 16 additions & 26 deletions src/main/java/net/spy/memcached/internal/BulkGetFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import net.spy.memcached.compat.log.LoggerFactory;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.plugin.LocalCacheManager;

/**
* Future for handling results from bulk gets.
Expand All @@ -47,9 +46,6 @@ public class BulkGetFuture<T> implements BulkFuture<Map<String, T>> {
private boolean cancelled = false;
private boolean timeout = false;

// FIXME right position?
private LocalCacheManager localCacheManager;

public BulkGetFuture(Map<String, Future<T>> m,
Collection<Operation> getOps, CountDownLatch l) {
super();
Expand All @@ -58,16 +54,6 @@ public BulkGetFuture(Map<String, Future<T>> m,
latch = l;
}

public BulkGetFuture(Map<String, Future<T>> m,
Collection<Operation> getOps, CountDownLatch l,
LocalCacheManager lcm) {
super();
rvMap = m;
ops = getOps;
latch = l;
localCacheManager = lcm;
}

public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
Expand Down Expand Up @@ -148,24 +134,16 @@ private Map<String, T> internalGet(long to, TimeUnit unit,
throw new ExecutionException(op.getException());
}
}
Map<String, T> m = new HashMap<String, T>();
Map<String, T> decodedMap = new HashMap<String, T>();

for (Map.Entry<String, Future<T>> me : rvMap.entrySet()) {
String key = me.getKey();
Future<T> future = me.getValue();
T value = future.get();

// put the key into the result map.
m.put(key, value);

// cache the key locally
if (localCacheManager != null) {
// iff it is from the remote cache.
if (!(future instanceof LocalCacheManager.Task)) {
localCacheManager.put(key, value);
}
}
decodedMap.put(key, value);
}
return m;
return decodedMap;
}

public boolean isCancelled() {
Expand All @@ -184,4 +162,16 @@ public boolean isDone() {
public boolean isTimeout() {
return timeout;
}

public Collection<Operation> getOps() {
return ops;
}

public CountDownLatch getLatch() {
return latch;
}

public Map<String, Future<T>> getRvMap() {
return rvMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package net.spy.memcached.plugin;

import net.spy.memcached.internal.BulkGetFuture;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FrontCacheBulkGetFuture<T> extends BulkGetFuture<T> {
private final LocalCacheManager localCacheManager;

private final Map<String, T> localCachedData = new HashMap<String, T>();

public FrontCacheBulkGetFuture(BulkGetFuture<T> parentFuture,
LocalCacheManager localCacheManager) {
super(parentFuture.getRvMap(), parentFuture.getOps(), parentFuture.getLatch());
this.localCacheManager = localCacheManager;
}

public Map<String, T> get() throws InterruptedException, ExecutionException {
Map<String, T> noneCachedValue = null;
try {
noneCachedValue = super.get(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
putLocalCache(noneCachedValue);
noneCachedValue.putAll(localCachedData);
return noneCachedValue;
}

public Map<String, T> get(long duration, TimeUnit units)
throws InterruptedException, ExecutionException, TimeoutException {
Map<String, T> noneCachedValue = null;
try {
noneCachedValue = super.get(duration, units);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
putLocalCache(noneCachedValue);
noneCachedValue.putAll(localCachedData);
return noneCachedValue;
}

public void addLocalCacheData(String key, T value) {
localCachedData.put(key, value);
}

private void putLocalCache(Map<String, T> noneCachedValue) {
for (Map.Entry<String, T> entry : noneCachedValue.entrySet()) {
String key = entry.getKey();
T value = entry.getValue();
if (value != null) {
localCacheManager.put(key, value);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,21 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Iterator;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;

import net.sf.ehcache.Element;
import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.BulkFuture;
import net.spy.memcached.internal.BulkGetFuture;
import net.spy.memcached.internal.SingleElementInfiniteIterator;
import net.spy.memcached.transcoders.Transcoder;

/**
Expand Down Expand Up @@ -88,6 +96,71 @@ public <T> GetFuture<T> asyncGet(final String key, final Transcoder<T> tc) {
return new FrontCacheGetFuture<T>(frontElement);
}
}
@Override
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys,
Iterator<Transcoder<T>> tc_iter) {
/*
* Case 1. local cache is not used.
* All data from Arcus server.
* */
if (localCacheManager == null) {
return super.asyncGetBulk(keys, tc_iter);
}

final Map<String, T> frontCacheHit = new ConcurrentHashMap<String, T>();
final Map<String, Transcoder<T>> frontCacheMiss =
new ConcurrentHashMap<String, Transcoder<T>>();

Iterator<String> keyIter = keys.iterator();

while (keyIter.hasNext() && tc_iter.hasNext()) {
String key = keyIter.next();
Transcoder<T> tc = tc_iter.next();
T decodedValue = localCacheManager.get(key, tc);
if (decodedValue != null) {
frontCacheHit.put(key, decodedValue);
continue;
}
frontCacheMiss.put(key, tc);
}
/*
* Case 2. local cache is used
* 1. Get cached data from local cache.
* 2. Get data from Arcus server.
* 3. Store data coming from Arucs server to local cache.
* 4. Merge data from local cache and Arcus server.
* */
BulkGetFuture<T> parent = (BulkGetFuture<T>) super.asyncGetBulk(
frontCacheMiss.keySet(), frontCacheMiss.values().iterator());

FrontCacheBulkGetFuture<T> rv = new FrontCacheBulkGetFuture<T>(parent, localCacheManager);

for (Map.Entry<String, T> entry : frontCacheHit.entrySet()) {
String key = entry.getKey();
T value = entry.getValue();
rv.addLocalCacheData(key, value);
}

return rv;
}

public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys,
Transcoder<T> tc) {
return asyncGetBulk(keys, new SingleElementInfiniteIterator<Transcoder<T>>(tc));
}

public BulkFuture<Map<String, Object>> asyncGetBulk(Collection<String> keys) {
return asyncGetBulk(keys, transcoder);
}

public <T> BulkFuture<Map<String, T>> asyncGetBulk(Transcoder<T> tc,
String... keys) {
return asyncGetBulk(Arrays.asList(keys), tc);
}

public BulkFuture<Map<String, Object>> asyncGetBulk(String... keys) {
return asyncGetBulk(Arrays.asList(keys), transcoder);
}

/**
* Delete the key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ protected void setUp() throws Exception {
ConnectionFactoryBuilder cfb = new ConnectionFactoryBuilder();
cfb.setFrontCacheExpireTime(5);
cfb.setMaxFrontCacheElements(10);
client = ArcusClient.createArcusClient("127.0.0.1:2181", "test", cfb);
client = ArcusClient.createArcusClient("127.0.0.1:2191", "brido", cfb);
}

@Override
Expand Down

0 comments on commit f6534e8

Please sign in to comment.