From 8fdc42bb05e09d87f92772228ec73f0950e5ceca Mon Sep 17 00:00:00 2001 From: brido4125 Date: Tue, 27 Jun 2023 11:23:55 +0900 Subject: [PATCH] ENHANCE: Modify decoding logic in collection get api. --- .../java/net/spy/memcached/ArcusClient.java | 72 +++---- .../java/net/spy/memcached/CachedData.java | 32 ++- .../internal/CollectionGetFuture.java | 187 ++++++++++++++++++ 3 files changed, 242 insertions(+), 49 deletions(-) create mode 100644 src/main/java/net/spy/memcached/internal/CollectionGetFuture.java diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 8e06afc57..e257b4d1a 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -127,6 +127,7 @@ import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.internal.SMGetFuture; import net.spy.memcached.internal.PipedCollectionFuture; +import net.spy.memcached.internal.CollectionGetFuture; import net.spy.memcached.ops.BTreeFindPositionOperation; import net.spy.memcached.ops.BTreeFindPositionWithGetOperation; import net.spy.memcached.ops.BTreeGetBulkOperation; @@ -481,12 +482,11 @@ private CollectionFuture> asyncLopGet(final String k, final CollectionGet collectionGet, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture> rv = new CollectionFuture>( - latch, operationTimeout); + final CollectionGetFuture.ListGetFuture rv = + new CollectionGetFuture.ListGetFuture(latch, operationTimeout, tc); Operation op = opFact.collectionGet(k, collectionGet, new CollectionGetOperation.Callback() { - private final List list = new ArrayList(); public void receivedStatus(OperationStatus status) { CollectionOperationStatus cstatus; @@ -497,7 +497,7 @@ public void receivedStatus(OperationStatus status) { cstatus = new CollectionOperationStatus(status); } if (cstatus.isSuccess()) { - rv.set(list, cstatus); + rv.set(new ArrayList(), cstatus); return; } switch (cstatus.getResponse()) { @@ -506,11 +506,11 @@ public void receivedStatus(OperationStatus status) { getLogger().debug("Key(%s) not found : %s", k, cstatus); break; case NOT_FOUND_ELEMENT: - rv.set(list, cstatus); + rv.set(new ArrayList(), cstatus); getLogger().debug("Element(%s) not found : %s", k, cstatus); break; case OUT_OF_RANGE: - rv.set(list, cstatus); + rv.set(new ArrayList(), cstatus); getLogger().debug("Element(%s) not found in condition : %s", k, cstatus); break; case UNREADABLE: @@ -529,7 +529,7 @@ public void complete() { } public void gotData(String subkey, int flags, byte[] data, byte[] eflag) { - list.add(tc.decode(new CachedData(flags, data, tc.getMaxSize()))); + rv.addCachedData(new CachedData(flags, data, tc.getMaxSize())); } }); rv.setOperation(op); @@ -562,12 +562,11 @@ private CollectionFuture> asyncSopGet(final String k, final CollectionGet collectionGet, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture> rv = new CollectionFuture>(latch, - operationTimeout); + final CollectionGetFuture.SetGetFuture rv = + new CollectionGetFuture.SetGetFuture(latch, operationTimeout, tc); Operation op = opFact.collectionGet(k, collectionGet, new CollectionGetOperation.Callback() { - private final Set set = new HashSet(); public void receivedStatus(OperationStatus status) { CollectionOperationStatus cstatus; @@ -578,7 +577,7 @@ public void receivedStatus(OperationStatus status) { cstatus = new CollectionOperationStatus(status); } if (cstatus.isSuccess()) { - rv.set(set, cstatus); + rv.set(new HashSet(), cstatus); return; } @@ -588,7 +587,7 @@ public void receivedStatus(OperationStatus status) { getLogger().debug("Key(%s) not found : %s", k, cstatus); break; case NOT_FOUND_ELEMENT: - rv.set(set, cstatus); + rv.set(new HashSet(), cstatus); getLogger().debug("Element(%s) not found : %s", k, cstatus); break; case UNREADABLE: @@ -607,7 +606,7 @@ public void complete() { } public void gotData(String subkey, int flags, byte[] data, byte[] eflag) { - set.add(tc.decode(new CachedData(flags, data, tc.getMaxSize()))); + rv.addCachedData(new CachedData(flags, data, tc.getMaxSize())); } }); @@ -629,13 +628,10 @@ private CollectionFuture>> asyncBopGet( final String k, final CollectionGet collectionGet, final boolean reverse, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture>> rv = new CollectionFuture>>( - latch, operationTimeout); + final CollectionGetFuture.BTreeGetFuture rv = + new CollectionGetFuture.BTreeGetFuture(latch, operationTimeout, tc); Operation op = opFact.collectionGet(k, collectionGet, new CollectionGetOperation.Callback() { - private final TreeMap> map = new TreeMap>( - (reverse) ? Collections.reverseOrder() : null); - public void receivedStatus(OperationStatus status) { CollectionOperationStatus cstatus; if (status instanceof CollectionOperationStatus) { @@ -645,7 +641,7 @@ public void receivedStatus(OperationStatus status) { cstatus = new CollectionOperationStatus(status); } if (cstatus.isSuccess()) { - rv.set(map, cstatus); + rv.set(new TreeMap>((reverse) ? Collections.reverseOrder() : null), cstatus); return; } switch (cstatus.getResponse()) { @@ -654,7 +650,7 @@ public void receivedStatus(OperationStatus status) { getLogger().debug("Key(%s) not found : %s", k, cstatus); break; case NOT_FOUND_ELEMENT: - rv.set(map, cstatus); + rv.set(new TreeMap>((reverse) ? Collections.reverseOrder() : null), cstatus); getLogger().debug("Element(%s) not found : %s", k, cstatus); break; case UNREADABLE: @@ -673,9 +669,7 @@ public void complete() { } public void gotData(String bkey, int flags, byte[] data, byte[] eflag) { - long longBkey = Long.parseLong(bkey); - map.put(longBkey, new Element(longBkey, - tc.decode(new CachedData(flags, data, tc.getMaxSize())), eflag)); + rv.putCachedData(Long.parseLong(bkey), new CachedData(flags, data, eflag, tc.getMaxSize())); } }); rv.setOperation(op); @@ -694,12 +688,10 @@ public void gotData(String bkey, int flags, byte[] data, byte[] eflag) { private CollectionFuture> asyncMopGet( final String k, final CollectionGet collectionGet, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture> rv = new CollectionFuture>( - latch, operationTimeout); + final CollectionGetFuture.MapGetFuture rv = + new CollectionGetFuture.MapGetFuture(latch, operationTimeout, tc); Operation op = opFact.collectionGet(k, collectionGet, new CollectionGetOperation.Callback() { - private final HashMap map = new HashMap(); - public void receivedStatus(OperationStatus status) { CollectionOperationStatus cstatus; if (status instanceof CollectionOperationStatus) { @@ -709,7 +701,7 @@ public void receivedStatus(OperationStatus status) { cstatus = new CollectionOperationStatus(status); } if (cstatus.isSuccess()) { - rv.set(map, cstatus); + rv.set(new HashMap(), cstatus); return; } switch (cstatus.getResponse()) { @@ -718,7 +710,7 @@ public void receivedStatus(OperationStatus status) { getLogger().debug("Key(%s) not found : %s", k, cstatus); break; case NOT_FOUND_ELEMENT: - rv.set(map, cstatus); + rv.set(new HashMap(), cstatus); getLogger().debug("Element(%s) not found : %s", k, cstatus); break; case UNREADABLE: @@ -737,7 +729,7 @@ public void complete() { } public void gotData(String mkey, int flags, byte[] data, byte[] eflag) { - map.put(mkey, tc.decode(new CachedData(flags, data, tc.getMaxSize()))); + rv.putCachedData(mkey, new CachedData(flags, data, tc.getMaxSize())); } }); rv.setOperation(op); @@ -2930,15 +2922,11 @@ private CollectionFuture>> asyncBopExtendedGet final boolean reverse, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture>> rv = - new CollectionFuture>>(latch, operationTimeout); + final CollectionGetFuture.BTreeGetFuture rv + = new CollectionGetFuture.BTreeGetFuture(latch, operationTimeout, tc); Operation op = opFact.collectionGet(k, collectionGet, new CollectionGetOperation.Callback() { - private final TreeMap> map - = new ByteArrayTreeMap>( - (reverse) ? Collections.reverseOrder() : null); - public void receivedStatus(OperationStatus status) { CollectionOperationStatus cstatus; if (status instanceof CollectionOperationStatus) { @@ -2948,7 +2936,8 @@ public void receivedStatus(OperationStatus status) { cstatus = new CollectionOperationStatus(status); } if (cstatus.isSuccess()) { - rv.set(map, cstatus); + rv.set(new ByteArrayTreeMap>( + (reverse) ? Collections.reverseOrder() : null), cstatus); return; } switch (cstatus.getResponse()) { @@ -2957,7 +2946,8 @@ public void receivedStatus(OperationStatus status) { getLogger().debug("Key(%s) not found : %s", k, cstatus); break; case NOT_FOUND_ELEMENT: - rv.set(map, cstatus); + rv.set(new ByteArrayTreeMap>( + (reverse) ? Collections.reverseOrder() : null), cstatus); getLogger().debug("Element(%s) not found : %s", k, cstatus); break; case UNREADABLE: @@ -2976,10 +2966,8 @@ public void complete() { } public void gotData(String bkey, int flags, byte[] data, byte[] eflag) { - byte[] byteBkey = BTreeUtil.hexStringToByteArrays(bkey); - Element element = new Element(byteBkey, - tc.decode(new CachedData(flags, data, tc.getMaxSize())), eflag); - map.put(new ByteArrayBKey(byteBkey), element); + rv.putCachedData(new ByteArrayBKey(BTreeUtil.hexStringToByteArrays(bkey)), + new CachedData(flags, data, eflag, tc.getMaxSize())); } }); rv.setOperation(op); diff --git a/src/main/java/net/spy/memcached/CachedData.java b/src/main/java/net/spy/memcached/CachedData.java index d1326d82c..cf6d2d039 100644 --- a/src/main/java/net/spy/memcached/CachedData.java +++ b/src/main/java/net/spy/memcached/CachedData.java @@ -16,24 +16,38 @@ public final class CachedData { private final int flags; private final byte[] data; + private byte[] eFlag; /** * Get a CachedData instance for the given flags and byte array. * - * @param f the flags - * @param d the data + * @param flag the flags + * @param data the data * @param max_size the maximum allowable size. */ - public CachedData(int f, byte[] d, int max_size) { + public CachedData(int flag, byte[] data, int max_size) { super(); - if (d.length > max_size) { + if (data.length > max_size) { throw new IllegalArgumentException( "Cannot cache data larger than " + max_size + " bytes (you tried to cache a " - + d.length + " byte object)"); + + data.length + " byte object)"); } - flags = f; - data = d; + flags = flag; + this.data = data; + } + + /** + * Get a CachedData instance for the given flags and byte array. + * + * @param flag the flags + * @param data the data + * @param eFlag the eflag + * @param max_size the maximum allowable size. + */ + public CachedData(int flag, byte[] data, byte[] eFlag, int max_size) { + this(flag, data, max_size); + this.eFlag = eFlag; } /** @@ -50,6 +64,10 @@ public int getFlags() { return flags; } + public byte[] getEFlag() { + return eFlag; + } + @Override public String toString() { return "{CachedData flags=" + flags + " data=" diff --git a/src/main/java/net/spy/memcached/internal/CollectionGetFuture.java b/src/main/java/net/spy/memcached/internal/CollectionGetFuture.java new file mode 100644 index 000000000..69d7f2056 --- /dev/null +++ b/src/main/java/net/spy/memcached/internal/CollectionGetFuture.java @@ -0,0 +1,187 @@ +package net.spy.memcached.internal; + +import net.spy.memcached.CachedData; +import net.spy.memcached.OperationTimeoutException; +import net.spy.memcached.collection.ByteArrayBKey; +import net.spy.memcached.collection.Element; +import net.spy.memcached.transcoders.Transcoder; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public abstract class CollectionGetFuture extends CollectionFuture { + + protected final Transcoder tc; + + protected T result = null; + public CollectionGetFuture(CountDownLatch l, long opTimeout, Transcoder tc) { + super(l, opTimeout); + this.tc = tc; + } + + public static class ListGetFuture extends CollectionGetFuture, T> { + private final List cachedDataList = new ArrayList(); + + public ListGetFuture(CountDownLatch l, long opTimeout, Transcoder tc) { + super(l, opTimeout, tc); + } + + @Override + public List get() throws InterruptedException, ExecutionException { + try { + return get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new OperationTimeoutException(e); + } + } + + @Override + public List get(long duration, TimeUnit units) + throws InterruptedException, TimeoutException, ExecutionException { + + result = super.get(duration, units); + if (result == null) { + return null; + } + if (result.isEmpty()) { + for (CachedData cachedData : this.cachedDataList) { + result.add(tc.decode(cachedData)); + } + } + return result; + } + + public void addCachedData(CachedData data) { + cachedDataList.add(data); + } + } + + public static class SetGetFuture extends CollectionGetFuture, T> { + private final List cachedDataList = new ArrayList(); + + public SetGetFuture(CountDownLatch l, long opTimeout, Transcoder tc) { + super(l, opTimeout, tc); + } + + @Override + public Set get() throws InterruptedException, ExecutionException { + try { + return get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new OperationTimeoutException(e); + } + } + + @Override + public Set get(long duration, TimeUnit units) + throws InterruptedException, TimeoutException, ExecutionException { + + result = super.get(duration, units); + if (result == null) { + return null; + } + if (result.isEmpty()) { + for (CachedData cachedData : this.cachedDataList) { + result.add(tc.decode(cachedData)); + } + } + return result; + } + + public void addCachedData(CachedData data) { + cachedDataList.add(data); + } + } + + public static class BTreeGetFuture extends CollectionGetFuture>, V> { + + private final HashMap cachedDataMap = new HashMap(); + + public BTreeGetFuture(CountDownLatch l, long opTimeout, Transcoder tc) { + super(l, opTimeout, tc); + } + + @Override + public Map> get() throws InterruptedException, ExecutionException { + try { + return get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new OperationTimeoutException(e); + } + } + + @Override + public Map> get(long duration, TimeUnit units) + throws InterruptedException, TimeoutException, ExecutionException { + + result = super.get(duration, units); + if (result == null) { + return null; + } + if (result.isEmpty()) { + for (Map.Entry entry : cachedDataMap.entrySet()) { + K bKey = entry.getKey(); + CachedData cachedData = entry.getValue(); + V decodeValue = tc.decode(cachedData); + + if (bKey instanceof Long) { + result.put(bKey, + new Element((Long) bKey, decodeValue, cachedData.getEFlag())); + } else if (bKey instanceof ByteArrayBKey) { + ByteArrayBKey byteArrayBKey = (ByteArrayBKey) bKey; + result.put(bKey, + new Element(byteArrayBKey.getBytes(), decodeValue, cachedData.getEFlag())); + } + } + } + return result; + } + + public void putCachedData(K bKey, CachedData cachedData) { + cachedDataMap.put(bKey, cachedData); + } + } + + public static class MapGetFuture extends CollectionGetFuture, T> { + private final HashMap cachedDataMap = new HashMap(); + public MapGetFuture(CountDownLatch l, long opTimeout, Transcoder tc) { + super(l, opTimeout, tc); + } + + @Override + public Map get() throws InterruptedException, ExecutionException { + try { + return get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new OperationTimeoutException(e); + } + } + + @Override + public Map get(long duration, TimeUnit units) + throws InterruptedException, TimeoutException, ExecutionException { + + result = super.get(duration, units); + if (result == null) { + return null; + } + if (result.isEmpty()) { + for (Map.Entry entry : cachedDataMap.entrySet()) { + result.put(entry.getKey(), tc.decode(entry.getValue())); + } + } + return result; + } + + public void putCachedData(String key, CachedData data) { + cachedDataMap.put(key, data); + } + } +}