Skip to content

Commit

Permalink
ENHANCE: Seperate FrontCacheLogic in asyncGetBulk from MemcachedClien…
Browse files Browse the repository at this point in the history
…t to FrontCacheMemcachedClient.
  • Loading branch information
brido4125 committed Jul 13, 2023
1 parent 4328484 commit 0673fb9
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 66 deletions.
41 changes: 7 additions & 34 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -68,7 +67,6 @@
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.StatsOperation;
import net.spy.memcached.ops.StoreType;
import net.spy.memcached.plugin.LocalCacheManager;
import net.spy.memcached.transcoders.TranscodeService;
import net.spy.memcached.transcoders.Transcoder;

Expand Down Expand Up @@ -127,7 +125,6 @@ public class MemcachedClient extends SpyThread

private volatile boolean running = true;
private volatile boolean shuttingDown = false;
protected LocalCacheManager localCacheManager = null;

protected final long operationTimeout;

Expand Down Expand Up @@ -1104,7 +1101,7 @@ public Object get(String key) {
*/
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys,
Iterator<Transcoder<T>> tc_iter) {
final Map<String, Future<T>> m = new ConcurrentHashMap<String, Future<T>>();
final Map<String, Future<T>> rvMap = new ConcurrentHashMap<String, Future<T>>();

// This map does not need to be a ConcurrentHashMap
// because it is fully populated when it is used and
Expand All @@ -1114,29 +1111,15 @@ public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys,
// Break the gets down into groups by key
final Map<MemcachedNode, List<Collection<String>>> chunks
= new HashMap<MemcachedNode, List<Collection<String>>>();
Iterator<String> key_iter = keys.iterator();
while (key_iter.hasNext() && tc_iter.hasNext()) {
String key = key_iter.next();
Transcoder<T> tc = tc_iter.next();

// FIXME This should be refactored...
// And the original front-cache implementations are really weird :-(
if (localCacheManager != null) {
final T cachedData = localCacheManager.get(key, tc);
if (cachedData != null) {
m.put(key, new LocalCacheManager.Task<T>(new Callable<T>() {
public T call() throws Exception {
return cachedData;
}
}));
continue;
}
}
Iterator<String> keyIter = keys.iterator();
while (keyIter.hasNext() && tc_iter.hasNext()) {
String key = keyIter.next();
Transcoder<T> tc = tc_iter.next();
tc_map.put(key, tc);
validateKey(key);
addKeyToChunk(chunks, key, conn.findNodeByKey(key));
}

int wholeChunkSize = getWholeChunkSize(chunks);
final CountDownLatch latch = new CountDownLatch(wholeChunkSize);
final Collection<Operation> ops = new ArrayList<Operation>(wholeChunkSize);
Expand All @@ -1151,7 +1134,7 @@ public void receivedStatus(OperationStatus status) {

public void gotData(String k, int flags, byte[] data) {
Transcoder<T> tc = tc_map.get(k);
m.put(k, tcService.decode(tc,
rvMap.put(k, tcService.decode(tc,
new CachedData(flags, data, tc.getMaxSize())));
}

Expand All @@ -1178,7 +1161,7 @@ public void complete() {
ops.add(op);
}
}
return new BulkGetFuture<T>(m, ops, latch, localCacheManager);
return new BulkGetFuture<T>(rvMap, ops, latch);
}

/**
Expand Down Expand Up @@ -2374,14 +2357,4 @@ int getAddedQueueSize() {
Collection<MemcachedNode> getAllNodes() {
return conn.getLocator().getAll();
}

/**
* get current local cache manager
*
* @return current local cache manager
*/
public LocalCacheManager getLocalCacheManager() {
return localCacheManager;
}

}
3 changes: 3 additions & 0 deletions src/main/java/net/spy/memcached/internal/BulkFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ public interface BulkFuture<V> extends Future<V> {
public V getSome(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException;


public int getOpCount();

}
59 changes: 27 additions & 32 deletions src/main/java/net/spy/memcached/internal/BulkGetFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import net.spy.memcached.compat.log.LoggerFactory;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.plugin.LocalCacheManager;

/**
* Future for handling results from bulk gets.
Expand All @@ -47,9 +46,6 @@ public class BulkGetFuture<T> implements BulkFuture<Map<String, T>> {
private boolean cancelled = false;
private boolean timeout = false;

// FIXME right position?
private LocalCacheManager localCacheManager;

public BulkGetFuture(Map<String, Future<T>> m,
Collection<Operation> getOps, CountDownLatch l) {
super();
Expand All @@ -58,16 +54,14 @@ public BulkGetFuture(Map<String, Future<T>> m,
latch = l;
}

public BulkGetFuture(Map<String, Future<T>> m,
Collection<Operation> getOps, CountDownLatch l,
LocalCacheManager lcm) {
public BulkGetFuture(BulkGetFuture<T> other) {
super();
rvMap = m;
ops = getOps;
latch = l;
localCacheManager = lcm;
rvMap = other.rvMap;
ops = other.ops;
latch = other.latch;
}

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
Expand All @@ -81,6 +75,7 @@ public boolean cancel(boolean ign) {
return rv;
}

@Override
public Map<String, T> get() throws InterruptedException, ExecutionException {
try {
return get(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
Expand All @@ -89,6 +84,7 @@ public Map<String, T> get() throws InterruptedException, ExecutionException {
}
}

@Override
public Map<String, T> getSome(long duration, TimeUnit units)
throws InterruptedException, ExecutionException {
Collection<Operation> timedoutOps = new HashSet<Operation>();
Expand All @@ -108,6 +104,7 @@ public Map<String, T> getSome(long duration, TimeUnit units)
*
* @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)
*/
@Override
public Map<String, T> get(long duration, TimeUnit units)
throws InterruptedException, ExecutionException, TimeoutException {
Collection<Operation> timedoutOps = new HashSet<Operation>();
Expand All @@ -119,6 +116,21 @@ public Map<String, T> get(long duration, TimeUnit units)
return ret;
}

@Override
public boolean isCancelled() {
return cancelled;
}

@Override
public boolean isDone() {
return latch.getCount() == 0;
}

@Override
public int getOpCount() {
return ops.size();
}

/**
* refactored code common to both get(long, TimeUnit) and getSome(long,
* TimeUnit)
Expand Down Expand Up @@ -148,34 +160,17 @@ private Map<String, T> internalGet(long to, TimeUnit unit,
throw new ExecutionException(op.getException());
}
}
Map<String, T> m = new HashMap<String, T>();

Map<String, T> resultMap = new HashMap<String, T>();
for (Map.Entry<String, Future<T>> me : rvMap.entrySet()) {
String key = me.getKey();
Future<T> future = me.getValue();
T value = future.get();

// put the key into the result map.
m.put(key, value);

// cache the key locally
if (localCacheManager != null) {
// iff it is from the remote cache.
if (!(future instanceof LocalCacheManager.Task)) {
localCacheManager.put(key, value);
}
}
resultMap.put(key, value);
}
return m;
}

public boolean isCancelled() {
return cancelled;
return resultMap;
}

public boolean isDone() {
return latch.getCount() == 0;
}

/*
* set to true if timeout was reached.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package net.spy.memcached.plugin;

import net.spy.memcached.OperationTimeoutException;
import net.spy.memcached.internal.BulkGetFuture;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static net.spy.memcached.DefaultConnectionFactory.DEFAULT_OPERATION_TIMEOUT;

public class FrontCacheBulkGetFuture<T> extends BulkGetFuture<T> {
private final LocalCacheManager localCacheManager;

private final Map<String, T> localCachedData;

private Map<String, T> result = null;

public FrontCacheBulkGetFuture(LocalCacheManager localCacheManager,
BulkGetFuture<T> parentFuture,
Map<String, T> localCachedData) {
super(parentFuture);
this.localCacheManager = localCacheManager;
this.localCachedData = localCachedData;
}

@Override
public Map<String, T> get() throws InterruptedException, ExecutionException {
try {
return get(DEFAULT_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new OperationTimeoutException(e);
}
}

@Override
public Map<String, T> get(long duration, TimeUnit units)
throws InterruptedException, ExecutionException, TimeoutException {
if (result == null) {
try {
result = super.get(duration, units);
} catch (TimeoutException e) {
throw new OperationTimeoutException(e);
}
putLocalCache(result);
result.putAll(localCachedData);
}
return result;
}

@Override
public Map<String, T> getSome(long duration, TimeUnit units)
throws InterruptedException, ExecutionException {
if (result != null) {
return result;
}
Map<String, T> getSomeResult = super.getSome(duration, units);
if (getSomeResult.size() == getOpCount()) {
result = getSomeResult;
}
putLocalCache(getSomeResult);
getSomeResult.putAll(localCachedData);
return getSomeResult;
}

private void putLocalCache(Map<String, T> noneCachedValue) {
for (Map.Entry<String, T> entry : noneCachedValue.entrySet()) {
String key = entry.getKey();
T value = entry.getValue();
if (value != null) {
localCacheManager.put(key, value);
}
}
}
}
Loading

0 comments on commit 0673fb9

Please sign in to comment.