From da1b831decad534b2ff0a70453cfd19746cb4ff8 Mon Sep 17 00:00:00 2001 From: brido4125 Date: Fri, 16 Jun 2023 16:20:13 +0900 Subject: [PATCH] ENHANCE: change put method logic in LocalCacheManager in aysncGet. --- .../net/spy/memcached/MemcachedClient.java | 4 - .../net/spy/memcached/internal/GetFuture.java | 21 +++- .../memcached/plugin/FrontCacheGetFuture.java | 51 ++------- .../plugin/FrontCacheMemcachedClient.java | 105 ++++++++++++++++-- 4 files changed, 122 insertions(+), 59 deletions(-) diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index 28120b9df..b3ce26ea2 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -929,10 +929,6 @@ public void gotData(String k, int flags, byte[] data) { } public void complete() { - // FIXME weird... - if (localCacheManager != null) { - localCacheManager.put(key, val, operationTimeout); - } latch.countDown(); } }); diff --git a/src/main/java/net/spy/memcached/internal/GetFuture.java b/src/main/java/net/spy/memcached/internal/GetFuture.java index 8bd479120..bb882f81d 100644 --- a/src/main/java/net/spy/memcached/internal/GetFuture.java +++ b/src/main/java/net/spy/memcached/internal/GetFuture.java @@ -24,27 +24,31 @@ public GetFuture(CountDownLatch l, long opTimeout) { this.rv = new OperationFuture>(l, opTimeout); } + public GetFuture(GetFuture parent) { + this.rv = parent.getRv(); + } + public boolean cancel(boolean ign) { return rv.cancel(ign); } public T get() throws InterruptedException, ExecutionException { - Future v = rv.get(); - return v == null ? null : v.get(); + Future decodedTask = rv.get(); + return decodedTask == null ? null : decodedTask.get(); } public T get(long duration, TimeUnit units) throws InterruptedException, TimeoutException, ExecutionException { - Future v = rv.get(duration, units); - return v == null ? null : v.get(); + Future decodedTask = rv.get(duration, units); + return decodedTask == null ? null : decodedTask.get(); } public OperationStatus getStatus() { return rv.getStatus(); } - public void set(Future d, OperationStatus s) { - rv.set(d, s); + public void set(Future decodedTask, OperationStatus status) { + rv.set(decodedTask, status); } public void setOperation(Operation to) { @@ -58,4 +62,9 @@ public boolean isCancelled() { public boolean isDone() { return rv.isDone(); } + + public OperationFuture> getRv() { + return rv; + } + } diff --git a/src/main/java/net/spy/memcached/plugin/FrontCacheGetFuture.java b/src/main/java/net/spy/memcached/plugin/FrontCacheGetFuture.java index 550c63dbf..0e4b1ca88 100644 --- a/src/main/java/net/spy/memcached/plugin/FrontCacheGetFuture.java +++ b/src/main/java/net/spy/memcached/plugin/FrontCacheGetFuture.java @@ -21,10 +21,6 @@ import java.util.concurrent.TimeoutException; import net.spy.memcached.internal.GetFuture; -import net.spy.memcached.ops.OperationStatus; -import net.spy.memcached.ops.StatusCode; - -import net.sf.ehcache.Element; /** * Future returned for GET operations. @@ -34,51 +30,28 @@ * @param Type of object returned from the get */ public class FrontCacheGetFuture extends GetFuture { + private final LocalCacheManager localCacheManager; - private static final OperationStatus END = - new OperationStatus(true, "END", StatusCode.SUCCESS); - private final Element element; - + private final String key; - public FrontCacheGetFuture(Element element) { - super(null, 0); - this.element = element; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; + public FrontCacheGetFuture(LocalCacheManager localCacheManager, String key, GetFuture parent) { + super(parent); + this.localCacheManager = localCacheManager; + this.key = key; } @Override public T get() throws InterruptedException, ExecutionException { - return getValue(); - } - - @Override - public OperationStatus getStatus() { - return END; - } - - @SuppressWarnings("unchecked") - private T getValue() { - return (T) this.element.getObjectValue(); + T t = super.get(); + localCacheManager.put(key, t); + return t; } @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return getValue(); - } - - @Override - public boolean isCancelled() { - return false; + T t = super.get(timeout, unit); + localCacheManager.put(key, t); + return t; } - - @Override - public boolean isDone() { - return true; - } - } diff --git a/src/main/java/net/spy/memcached/plugin/FrontCacheMemcachedClient.java b/src/main/java/net/spy/memcached/plugin/FrontCacheMemcachedClient.java index 19d9f5185..d50dcd5ca 100644 --- a/src/main/java/net/spy/memcached/plugin/FrontCacheMemcachedClient.java +++ b/src/main/java/net/spy/memcached/plugin/FrontCacheMemcachedClient.java @@ -19,15 +19,20 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import net.spy.memcached.ConnectionFactory; import net.spy.memcached.MemcachedClient; +import net.spy.memcached.OperationTimeoutException; import net.spy.memcached.internal.GetFuture; import net.spy.memcached.internal.OperationFuture; +import net.spy.memcached.ops.OperationStatus; +import net.spy.memcached.ops.StatusCode; import net.spy.memcached.transcoders.Transcoder; -import net.sf.ehcache.Element; - /** * Front cache for some Arcus commands. * For now, it supports get commands. The front cache stores the value from a get operation. @@ -67,6 +72,62 @@ public FrontCacheMemcachedClient(ConnectionFactory cf, } } + /** + * Get with a single key and decode using the default transcoder. + * + * @param key the key to get + * @return the result from the cache (null if there is none) + * @throws OperationTimeoutException if the global operation timeout is + * exceeded + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public Object get(String key) { + return get(key, transcoder); + } + + /** + * Get with a single key. + * + * @param Type of object to get. + * @param key the key to get + * @param tc the transcoder to serialize and unserialize value + * @return the result from the cache (null if there is none) + * @throws OperationTimeoutException if the global operation timeout is + * exceeded + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + + public T get(String key, Transcoder tc) { + Future future = asyncGet(key, tc); + try { + return future.get(operationTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + future.cancel(true); + throw new RuntimeException("Interrupted waiting for value", e); + } catch (ExecutionException e) { + future.cancel(true); + throw new RuntimeException("Exception waiting for value", e); + } catch (TimeoutException e) { + future.cancel(true); + throw new OperationTimeoutException(e); + } + } + + /** + * Get the given key asynchronously and decode with the default + * transcoder. + * + * @param key the key to fetch + * @return a future that will hold the return value of the fetch + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public GetFuture asyncGet(final String key) { + return asyncGet(key, transcoder); + } + /** * Get the value of the key. * Check the local cache first. If the key is not found, send the command to the server. @@ -77,17 +138,41 @@ public FrontCacheMemcachedClient(ConnectionFactory cf, */ @Override public GetFuture asyncGet(final String key, final Transcoder tc) { - Element frontElement = null; - - if (localCacheManager != null) { - frontElement = localCacheManager.getElement(key); + if (localCacheManager == null) { + return super.asyncGet(key, tc); } - if (frontElement == null) { - return super.asyncGet(key, tc); - } else { - return new FrontCacheGetFuture(frontElement); + final T t = localCacheManager.get(key, tc); + if (t != null) { + return new GetFuture(null, 0) { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + @Override + public boolean isCancelled() { + return false; + } + @Override + public boolean isDone() { + return true; + } + @Override + public T get() { + return t; + } + @Override + public T get(long timeout, TimeUnit unit) { + return t; + } + @Override + public OperationStatus getStatus() { + return new OperationStatus(true, "END", StatusCode.SUCCESS); + } + }; } + GetFuture parent = super.asyncGet(key, tc); + return new FrontCacheGetFuture(localCacheManager, key, parent); } /**