Skip to content

Commit

Permalink
ENHANCE: change put method logic in LocalCacheManager in aysncGet.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 authored and jhpark816 committed Jul 5, 2023
1 parent dfb21c1 commit 2f03ed7
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 59 deletions.
4 changes: 0 additions & 4 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -929,10 +929,6 @@ public void gotData(String k, int flags, byte[] data) {
}

public void complete() {
// FIXME weird...
if (localCacheManager != null) {
localCacheManager.put(key, val, operationTimeout);
}
latch.countDown();
}
});
Expand Down
21 changes: 15 additions & 6 deletions src/main/java/net/spy/memcached/internal/GetFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,31 @@ public GetFuture(CountDownLatch l, long opTimeout) {
this.rv = new OperationFuture<Future<T>>(l, opTimeout);
}

public GetFuture(GetFuture<T> parent) {
this.rv = parent.getRv();
}

public boolean cancel(boolean ign) {
return rv.cancel(ign);
}

public T get() throws InterruptedException, ExecutionException {
Future<T> v = rv.get();
return v == null ? null : v.get();
Future<T> decodedTask = rv.get();
return decodedTask == null ? null : decodedTask.get();
}

public T get(long duration, TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {
Future<T> v = rv.get(duration, units);
return v == null ? null : v.get();
Future<T> decodedTask = rv.get(duration, units);
return decodedTask == null ? null : decodedTask.get();
}

public OperationStatus getStatus() {
return rv.getStatus();
}

public void set(Future<T> d, OperationStatus s) {
rv.set(d, s);
public void set(Future<T> decodedTask, OperationStatus status) {
rv.set(decodedTask, status);
}

public void setOperation(Operation to) {
Expand All @@ -58,4 +62,9 @@ public boolean isCancelled() {
public boolean isDone() {
return rv.isDone();
}

public OperationFuture<Future<T>> getRv() {
return rv;
}

}
51 changes: 12 additions & 39 deletions src/main/java/net/spy/memcached/plugin/FrontCacheGetFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@
import java.util.concurrent.TimeoutException;

import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.StatusCode;

import net.sf.ehcache.Element;

/**
* Future returned for GET operations.
Expand All @@ -34,51 +30,28 @@
* @param <T> Type of object returned from the get
*/
public class FrontCacheGetFuture<T> extends GetFuture<T> {
private final LocalCacheManager localCacheManager;

private static final OperationStatus END =
new OperationStatus(true, "END", StatusCode.SUCCESS);
private final Element element;

private final String key;

public FrontCacheGetFuture(Element element) {
super(null, 0);
this.element = element;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
public FrontCacheGetFuture(LocalCacheManager localCacheManager, String key, GetFuture<T> parent) {
super(parent);
this.localCacheManager = localCacheManager;
this.key = key;
}

@Override
public T get() throws InterruptedException, ExecutionException {
return getValue();
}

@Override
public OperationStatus getStatus() {
return END;
}

@SuppressWarnings("unchecked")
private T getValue() {
return (T) this.element.getObjectValue();
T t = super.get();
localCacheManager.put(key, t);
return t;
}

@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return getValue();
}

@Override
public boolean isCancelled() {
return false;
T t = super.get(timeout, unit);
localCacheManager.put(key, t);
return t;
}

@Override
public boolean isDone() {
return true;
}

}
105 changes: 95 additions & 10 deletions src/main/java/net/spy/memcached/plugin/FrontCacheMemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.OperationTimeoutException;
import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.StatusCode;
import net.spy.memcached.transcoders.Transcoder;

import net.sf.ehcache.Element;

/**
* Front cache for some Arcus commands.
* For now, it supports get commands. The front cache stores the value from a get operation.
Expand Down Expand Up @@ -67,6 +72,62 @@ public FrontCacheMemcachedClient(ConnectionFactory cf,
}
}

/**
* Get with a single key and decode using the default transcoder.
*
* @param key the key to get
* @return the result from the cache (null if there is none)
* @throws OperationTimeoutException if the global operation timeout is
* exceeded
* @throws IllegalStateException in the rare circumstance where queue
* is too full to accept any more requests
*/
public Object get(String key) {
return get(key, transcoder);
}

/**
* Get with a single key.
*
* @param <T> Type of object to get.
* @param key the key to get
* @param tc the transcoder to serialize and unserialize value
* @return the result from the cache (null if there is none)
* @throws OperationTimeoutException if the global operation timeout is
* exceeded
* @throws IllegalStateException in the rare circumstance where queue
* is too full to accept any more requests
*/

public <T> T get(String key, Transcoder<T> tc) {
Future<T> future = asyncGet(key, tc);
try {
return future.get(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
future.cancel(true);
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
future.cancel(true);
throw new RuntimeException("Exception waiting for value", e);
} catch (TimeoutException e) {
future.cancel(true);
throw new OperationTimeoutException(e);
}
}

/**
* Get the given key asynchronously and decode with the default
* transcoder.
*
* @param key the key to fetch
* @return a future that will hold the return value of the fetch
* @throws IllegalStateException in the rare circumstance where queue
* is too full to accept any more requests
*/
public GetFuture<Object> asyncGet(final String key) {
return asyncGet(key, transcoder);
}

/**
* Get the value of the key.
* Check the local cache first. If the key is not found, send the command to the server.
Expand All @@ -77,17 +138,41 @@ public FrontCacheMemcachedClient(ConnectionFactory cf,
*/
@Override
public <T> GetFuture<T> asyncGet(final String key, final Transcoder<T> tc) {
Element frontElement = null;

if (localCacheManager != null) {
frontElement = localCacheManager.getElement(key);
if (localCacheManager == null) {
return super.asyncGet(key, tc);
}

if (frontElement == null) {
return super.asyncGet(key, tc);
} else {
return new FrontCacheGetFuture<T>(frontElement);
final T t = localCacheManager.get(key, tc);
if (t != null) {
return new GetFuture<T>(null, 0) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
@Override
public T get() {
return t;
}
@Override
public T get(long timeout, TimeUnit unit) {
return t;
}
@Override
public OperationStatus getStatus() {
return new OperationStatus(true, "END", StatusCode.SUCCESS);
}
};
}
GetFuture<T> parent = super.asyncGet(key, tc);
return new FrontCacheGetFuture<T>(localCacheManager, key, parent);
}

/**
Expand Down

0 comments on commit 2f03ed7

Please sign in to comment.