From 6d75954665bb2f6e7b52cb0c4ae8c405a77d61ac Mon Sep 17 00:00:00 2001 From: brido4125 Date: Tue, 27 Jun 2023 11:23:55 +0900 Subject: [PATCH] ENHANCE: Modify decoding logic in collection get api. --- .../java/net/spy/memcached/ArcusClient.java | 39 ++-- .../spy/memcached/collection/BTreeGet.java | 6 +- .../memcached/collection/CollectionGet.java | 7 +- .../net/spy/memcached/collection/Element.java | 14 ++ .../internal/CollectionGetFuture.java | 180 ++++++++++++++++++ .../ascii/CollectionGetOperationImpl.java | 5 + 6 files changed, 226 insertions(+), 25 deletions(-) create mode 100644 src/main/java/net/spy/memcached/internal/CollectionGetFuture.java diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 4d51384a1..71d2803b3 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -127,6 +127,7 @@ import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.internal.SMGetFuture; import net.spy.memcached.internal.PipedCollectionFuture; +import net.spy.memcached.internal.CollectionGetFuture; import net.spy.memcached.ops.BTreeFindPositionOperation; import net.spy.memcached.ops.BTreeFindPositionWithGetOperation; import net.spy.memcached.ops.BTreeGetBulkOperation; @@ -481,8 +482,8 @@ private CollectionFuture> asyncLopGet(final String k, final CollectionGet collectionGet, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture> rv = new CollectionFuture>( - latch, operationTimeout); + final CollectionGetFuture.ListGetFuture rv = + new CollectionGetFuture.ListGetFuture(latch, operationTimeout, tc); Operation op = opFact.collectionGet(k, collectionGet, new CollectionGetOperation.Callback() { @@ -530,7 +531,7 @@ public void complete() { public void gotData(String key, int flags, String subkey, byte[] data) { assert key.equals(k) : "Wrong key returned"; - list.add(tc.decode(new CachedData(flags, data, tc.getMaxSize()))); + rv.addCachedData(new CachedData(flags, data, tc.getMaxSize())); } }); rv.setOperation(op); @@ -563,8 +564,8 @@ private CollectionFuture> asyncSopGet(final String k, final CollectionGet collectionGet, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture> rv = new CollectionFuture>(latch, - operationTimeout); + final CollectionGetFuture.SetGetFuture rv = + new CollectionGetFuture.SetGetFuture(latch, operationTimeout, tc); Operation op = opFact.collectionGet(k, collectionGet, new CollectionGetOperation.Callback() { @@ -609,7 +610,7 @@ public void complete() { public void gotData(String key, int flags, String subkey, byte[] data) { assert key.equals(k) : "Wrong key returned"; - set.add(tc.decode(new CachedData(flags, data, tc.getMaxSize()))); + rv.addCachedData(new CachedData(flags, data, tc.getMaxSize())); } }); @@ -631,8 +632,8 @@ private CollectionFuture>> asyncBopGet( final String k, final CollectionGet collectionGet, final boolean reverse, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture>> rv = new CollectionFuture>>( - latch, operationTimeout); + final CollectionGetFuture.BTreeGetFuture rv = + new CollectionGetFuture.BTreeGetFuture(latch, operationTimeout, tc); Operation op = opFact.collectionGet(k, collectionGet, new CollectionGetOperation.Callback() { private final TreeMap> map = new TreeMap>( @@ -676,10 +677,7 @@ public void complete() { public void gotData(String key, int flags, String bkey, byte[] data) { assert key.equals(k) : "Wrong key returned"; - long longBkey = Long.parseLong(bkey); - map.put(longBkey, new Element(longBkey, - tc.decode(new CachedData(flags, data, tc.getMaxSize())), - collectionGet.getElementFlag())); + rv.putCachedData(Long.parseLong(bkey), new CachedData(flags, data, tc.getMaxSize())); } }); rv.setOperation(op); @@ -698,8 +696,8 @@ public void gotData(String key, int flags, String bkey, byte[] data) { private CollectionFuture> asyncMopGet( final String k, final CollectionGet collectionGet, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture> rv = new CollectionFuture>( - latch, operationTimeout); + final CollectionGetFuture.MapGetFuture rv = + new CollectionGetFuture.MapGetFuture(latch, operationTimeout, tc); Operation op = opFact.collectionGet(k, collectionGet, new CollectionGetOperation.Callback() { private final HashMap map = new HashMap(); @@ -742,7 +740,7 @@ public void complete() { public void gotData(String key, int flags, String mkey, byte[] data) { assert key.equals(k) : "Wrong key returned"; - map.put(mkey, tc.decode(new CachedData(flags, data, tc.getMaxSize()))); + rv.putCachedData(mkey, new CachedData(flags, data, tc.getMaxSize())); } }); rv.setOperation(op); @@ -3028,8 +3026,8 @@ private CollectionFuture>> asyncBopExtendedGet final boolean reverse, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture>> rv = - new CollectionFuture>>(latch, operationTimeout); + final CollectionGetFuture.BTreeGetFuture rv + = new CollectionGetFuture.BTreeGetFuture(latch, operationTimeout, tc); Operation op = opFact.collectionGet(k, collectionGet, new CollectionGetOperation.Callback() { @@ -3075,11 +3073,8 @@ public void complete() { public void gotData(String key, int flags, String bkey, byte[] data) { assert key.equals(k) : "Wrong key returned"; - byte[] byteBkey = BTreeUtil.hexStringToByteArrays(bkey); - Element element = new Element(byteBkey, - tc.decode(new CachedData(flags, data, tc.getMaxSize())), - collectionGet.getElementFlag()); - map.put(new ByteArrayBKey(byteBkey), element); + rv.putCachedData(new ByteArrayBKey(BTreeUtil.hexStringToByteArrays(bkey)), + new CachedData(flags, data, tc.getMaxSize())); } }); rv.setOperation(op); diff --git a/src/main/java/net/spy/memcached/collection/BTreeGet.java b/src/main/java/net/spy/memcached/collection/BTreeGet.java index 2dc3a935e..b0adeaa6b 100644 --- a/src/main/java/net/spy/memcached/collection/BTreeGet.java +++ b/src/main/java/net/spy/memcached/collection/BTreeGet.java @@ -163,10 +163,14 @@ public void decodeItemHeader(String itemHeader) { // found element flag. if (splited[1].startsWith("0x")) { this.elementFlagExists = true; - this.elementFlag = BTreeUtil.hexStringToByteArrays(splited[1].substring(2)); + this.elementFlag.add( + BTreeUtil.hexStringToByteArrays( + splited[1].substring(2) + )); //this.headerCount++; headerParseStep = 2; } else { + this.elementFlag.add(null); this.dataLength = Integer.parseInt(splited[1]); } } else { diff --git a/src/main/java/net/spy/memcached/collection/CollectionGet.java b/src/main/java/net/spy/memcached/collection/CollectionGet.java index 8bb059cb1..de8632060 100644 --- a/src/main/java/net/spy/memcached/collection/CollectionGet.java +++ b/src/main/java/net/spy/memcached/collection/CollectionGet.java @@ -16,6 +16,9 @@ */ package net.spy.memcached.collection; +import java.util.ArrayList; +import java.util.List; + public abstract class CollectionGet { protected boolean delete = false; @@ -27,7 +30,7 @@ public abstract class CollectionGet { protected String subkey; protected int dataLength; - protected byte[] elementFlag; + protected List elementFlag = new ArrayList(); public boolean isDelete() { return delete; @@ -49,7 +52,7 @@ public int getDataLength() { return dataLength; } - public byte[] getElementFlag() { + public List getElementFlag() { return elementFlag; } diff --git a/src/main/java/net/spy/memcached/collection/Element.java b/src/main/java/net/spy/memcached/collection/Element.java index c5ffc3305..ea12aaeb5 100644 --- a/src/main/java/net/spy/memcached/collection/Element.java +++ b/src/main/java/net/spy/memcached/collection/Element.java @@ -51,6 +51,13 @@ public Element(long bkey, T value, byte[] eflag) { this.elementFlagUpdate = null; } + public Element(ByteArrayBKey bkey, T value, byte[] eflag) { + this.bKeyObject = new BKeyObject(bkey); + this.value = value; + this.eflag = eflag; + this.elementFlagUpdate = null; + } + public Element(byte[] bkey, T value, ElementFlagUpdate elementFlagUpdate) { this.bKeyObject = new BKeyObject(bkey); this.value = value; @@ -65,6 +72,13 @@ public Element(long bkey, T value, ElementFlagUpdate elementFlagUpdate) { this.elementFlagUpdate = elementFlagUpdate; } + public Element(ByteArrayBKey bkey, T value, ElementFlagUpdate elementFlagUpdate) { + this.bKeyObject = new BKeyObject(bkey); + this.value = value; + this.eflag = null; + this.elementFlagUpdate = elementFlagUpdate; + } + /** * get value of element flag by hex. * diff --git a/src/main/java/net/spy/memcached/internal/CollectionGetFuture.java b/src/main/java/net/spy/memcached/internal/CollectionGetFuture.java new file mode 100644 index 000000000..0a49a4792 --- /dev/null +++ b/src/main/java/net/spy/memcached/internal/CollectionGetFuture.java @@ -0,0 +1,180 @@ +package net.spy.memcached.internal; + +import net.spy.memcached.CachedData; +import net.spy.memcached.OperationTimeoutException; +import net.spy.memcached.collection.ByteArrayBKey; +import net.spy.memcached.collection.Element; +import net.spy.memcached.protocol.ascii.CollectionGetOperationImpl; +import net.spy.memcached.transcoders.Transcoder; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.ArrayList; +import java.util.List; +import java.util.LinkedHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public abstract class CollectionGetFuture extends CollectionFuture { + + protected final Transcoder tc; + + protected T result = null; + + public CollectionGetFuture(CountDownLatch l, long opTimeout, Transcoder tc) { + super(l, opTimeout); + this.tc = tc; + } + + public static class ListGetFuture extends CollectionGetFuture, T> { + private final List cachedDataList = new ArrayList(); + + public ListGetFuture(CountDownLatch l, long opTimeout, Transcoder tc) { + super(l, opTimeout, tc); + } + + @Override + public List get() throws InterruptedException, ExecutionException { + try { + return get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new OperationTimeoutException(e); + } + } + + @Override + public List get(long duration, TimeUnit units) + throws InterruptedException, TimeoutException, ExecutionException { + + if (result == null) { + result = super.get(duration, units); + for (CachedData cachedData : this.cachedDataList) { + result.add(tc.decode(cachedData)); + } + } + return result; + } + + public void addCachedData(CachedData data) { + cachedDataList.add(data); + } + } + + public static class SetGetFuture extends CollectionGetFuture, T> { + private final List cachedDataList = new ArrayList(); + + public SetGetFuture(CountDownLatch l, long opTimeout, Transcoder tc) { + super(l, opTimeout, tc); + } + + @Override + public Set get() throws InterruptedException, ExecutionException { + try { + return get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new OperationTimeoutException(e); + } + } + + @Override + public Set get(long duration, TimeUnit units) + throws InterruptedException, TimeoutException, ExecutionException { + + if (result == null) { + result = super.get(duration, units); + for (CachedData cachedData : this.cachedDataList) { + result.add(tc.decode(cachedData)); + } + } + return result; + } + + public void addCachedData(CachedData data) { + cachedDataList.add(data); + } + } + + public static class BTreeGetFuture extends CollectionGetFuture>, V> { + private final HashMap cachedDataMap + = new LinkedHashMap(); + + public BTreeGetFuture(CountDownLatch l, long opTimeout, Transcoder tc) { + super(l, opTimeout, tc); + } + + @Override + public Map> get() throws InterruptedException, ExecutionException { + try { + return get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new OperationTimeoutException(e); + } + } + + @Override + public Map> get(long duration, TimeUnit units) + throws InterruptedException, TimeoutException, ExecutionException { + + if (result == null) { + result = super.get(duration, units); + CollectionGetOperationImpl collectionGetOperation = (CollectionGetOperationImpl) op; + List elementFlag = collectionGetOperation.getElementFlag(); + int index = 0; + + for (Map.Entry entry : cachedDataMap.entrySet()) { + K bKey = entry.getKey(); + CachedData cachedData = entry.getValue(); + V decodeValue = tc.decode(cachedData); + Element elem = bKey instanceof Long ? + new Element((Long) bKey, decodeValue, elementFlag.get(index)) : + new Element((ByteArrayBKey) bKey, decodeValue, elementFlag.get(index)); + result.put(bKey, elem); + index++; + } + } + return result; + } + + public void putCachedData(K key, CachedData data) { + cachedDataMap.put(key, data); + } + } + + public static class MapGetFuture extends CollectionGetFuture, T> { + private final HashMap cachedDataMap + = new HashMap(); + + public MapGetFuture(CountDownLatch l, long opTimeout, Transcoder tc) { + super(l, opTimeout, tc); + } + + @Override + public Map get() throws InterruptedException, ExecutionException { + try { + return get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new OperationTimeoutException(e); + } + } + + @Override + public Map get(long duration, TimeUnit units) + throws InterruptedException, TimeoutException, ExecutionException { + + if (result == null) { + result = super.get(duration, units); + for (Map.Entry entry : cachedDataMap.entrySet()) { + result.put(entry.getKey(), tc.decode(entry.getValue())); + } + } + return result; + } + + public void putCachedData(String key, CachedData data) { + cachedDataMap.put(key, data); + } + } +} diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionGetOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionGetOperationImpl.java index b1a713f6f..d5ad36971 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionGetOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionGetOperationImpl.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; +import java.util.List; import net.spy.memcached.KeyUtil; import net.spy.memcached.collection.BTreeGet; @@ -298,6 +299,10 @@ public CollectionGet getGet() { return collectionGet; } + public List getElementFlag() { + return collectionGet.getElementFlag(); + } + @Override public boolean isBulkOperation() { return false;