Skip to content

Commit

Permalink
ENHANCE: Modify decoding logic in collection get api.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Jul 18, 2023
1 parent 4b8252b commit 6d75954
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 25 deletions.
39 changes: 17 additions & 22 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.SMGetFuture;
import net.spy.memcached.internal.PipedCollectionFuture;
import net.spy.memcached.internal.CollectionGetFuture;
import net.spy.memcached.ops.BTreeFindPositionOperation;
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
import net.spy.memcached.ops.BTreeGetBulkOperation;
Expand Down Expand Up @@ -481,8 +482,8 @@ private <T> CollectionFuture<List<T>> asyncLopGet(final String k,
final CollectionGet collectionGet,
final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<List<T>> rv = new CollectionFuture<List<T>>(
latch, operationTimeout);
final CollectionGetFuture.ListGetFuture<T> rv =
new CollectionGetFuture.ListGetFuture<T>(latch, operationTimeout, tc);

Operation op = opFact.collectionGet(k, collectionGet,
new CollectionGetOperation.Callback() {
Expand Down Expand Up @@ -530,7 +531,7 @@ public void complete() {

public void gotData(String key, int flags, String subkey, byte[] data) {
assert key.equals(k) : "Wrong key returned";
list.add(tc.decode(new CachedData(flags, data, tc.getMaxSize())));
rv.addCachedData(new CachedData(flags, data, tc.getMaxSize()));
}
});
rv.setOperation(op);
Expand Down Expand Up @@ -563,8 +564,8 @@ private <T> CollectionFuture<Set<T>> asyncSopGet(final String k,
final CollectionGet collectionGet,
final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Set<T>> rv = new CollectionFuture<Set<T>>(latch,
operationTimeout);
final CollectionGetFuture.SetGetFuture<T> rv =
new CollectionGetFuture.SetGetFuture<T>(latch, operationTimeout, tc);

Operation op = opFact.collectionGet(k, collectionGet,
new CollectionGetOperation.Callback() {
Expand Down Expand Up @@ -609,7 +610,7 @@ public void complete() {

public void gotData(String key, int flags, String subkey, byte[] data) {
assert key.equals(k) : "Wrong key returned";
set.add(tc.decode(new CachedData(flags, data, tc.getMaxSize())));
rv.addCachedData(new CachedData(flags, data, tc.getMaxSize()));
}
});

Expand All @@ -631,8 +632,8 @@ private <T> CollectionFuture<Map<Long, Element<T>>> asyncBopGet(
final String k, final CollectionGet collectionGet,
final boolean reverse, final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Map<Long, Element<T>>> rv = new CollectionFuture<Map<Long, Element<T>>>(
latch, operationTimeout);
final CollectionGetFuture.BTreeGetFuture<Long, T> rv =
new CollectionGetFuture.BTreeGetFuture<Long, T>(latch, operationTimeout, tc);
Operation op = opFact.collectionGet(k, collectionGet,
new CollectionGetOperation.Callback() {
private final TreeMap<Long, Element<T>> map = new TreeMap<Long, Element<T>>(
Expand Down Expand Up @@ -676,10 +677,7 @@ public void complete() {

public void gotData(String key, int flags, String bkey, byte[] data) {
assert key.equals(k) : "Wrong key returned";
long longBkey = Long.parseLong(bkey);
map.put(longBkey, new Element<T>(longBkey,
tc.decode(new CachedData(flags, data, tc.getMaxSize())),
collectionGet.getElementFlag()));
rv.putCachedData(Long.parseLong(bkey), new CachedData(flags, data, tc.getMaxSize()));
}
});
rv.setOperation(op);
Expand All @@ -698,8 +696,8 @@ public void gotData(String key, int flags, String bkey, byte[] data) {
private <T> CollectionFuture<Map<String, T>> asyncMopGet(
final String k, final CollectionGet collectionGet, final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Map<String, T>> rv = new CollectionFuture<Map<String, T>>(
latch, operationTimeout);
final CollectionGetFuture.MapGetFuture<T> rv =
new CollectionGetFuture.MapGetFuture<T>(latch, operationTimeout, tc);
Operation op = opFact.collectionGet(k, collectionGet,
new CollectionGetOperation.Callback() {
private final HashMap<String, T> map = new HashMap<String, T>();
Expand Down Expand Up @@ -742,7 +740,7 @@ public void complete() {

public void gotData(String key, int flags, String mkey, byte[] data) {
assert key.equals(k) : "Wrong key returned";
map.put(mkey, tc.decode(new CachedData(flags, data, tc.getMaxSize())));
rv.putCachedData(mkey, new CachedData(flags, data, tc.getMaxSize()));
}
});
rv.setOperation(op);
Expand Down Expand Up @@ -3028,8 +3026,8 @@ private <T> CollectionFuture<Map<ByteArrayBKey, Element<T>>> asyncBopExtendedGet
final boolean reverse, final Transcoder<T> tc) {

final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Map<ByteArrayBKey, Element<T>>> rv =
new CollectionFuture<Map<ByteArrayBKey, Element<T>>>(latch, operationTimeout);
final CollectionGetFuture.BTreeGetFuture<ByteArrayBKey, T> rv
= new CollectionGetFuture.BTreeGetFuture<ByteArrayBKey, T>(latch, operationTimeout, tc);

Operation op = opFact.collectionGet(k, collectionGet,
new CollectionGetOperation.Callback() {
Expand Down Expand Up @@ -3075,11 +3073,8 @@ public void complete() {

public void gotData(String key, int flags, String bkey, byte[] data) {
assert key.equals(k) : "Wrong key returned";
byte[] byteBkey = BTreeUtil.hexStringToByteArrays(bkey);
Element<T> element = new Element<T>(byteBkey,
tc.decode(new CachedData(flags, data, tc.getMaxSize())),
collectionGet.getElementFlag());
map.put(new ByteArrayBKey(byteBkey), element);
rv.putCachedData(new ByteArrayBKey(BTreeUtil.hexStringToByteArrays(bkey)),
new CachedData(flags, data, tc.getMaxSize()));
}
});
rv.setOperation(op);
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/net/spy/memcached/collection/BTreeGet.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,14 @@ public void decodeItemHeader(String itemHeader) {
// found element flag.
if (splited[1].startsWith("0x")) {
this.elementFlagExists = true;
this.elementFlag = BTreeUtil.hexStringToByteArrays(splited[1].substring(2));
this.elementFlag.add(
BTreeUtil.hexStringToByteArrays(
splited[1].substring(2)
));
//this.headerCount++;
headerParseStep = 2;
} else {
this.elementFlag.add(null);
this.dataLength = Integer.parseInt(splited[1]);
}
} else {
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/net/spy/memcached/collection/CollectionGet.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package net.spy.memcached.collection;

import java.util.ArrayList;
import java.util.List;

public abstract class CollectionGet {

protected boolean delete = false;
Expand All @@ -27,7 +30,7 @@ public abstract class CollectionGet {
protected String subkey;
protected int dataLength;

protected byte[] elementFlag;
protected List<byte[]> elementFlag = new ArrayList<byte[]>();

public boolean isDelete() {
return delete;
Expand All @@ -49,7 +52,7 @@ public int getDataLength() {
return dataLength;
}

public byte[] getElementFlag() {
public List<byte[]> getElementFlag() {
return elementFlag;
}

Expand Down
14 changes: 14 additions & 0 deletions src/main/java/net/spy/memcached/collection/Element.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public Element(long bkey, T value, byte[] eflag) {
this.elementFlagUpdate = null;
}

public Element(ByteArrayBKey bkey, T value, byte[] eflag) {
this.bKeyObject = new BKeyObject(bkey);
this.value = value;
this.eflag = eflag;
this.elementFlagUpdate = null;
}

public Element(byte[] bkey, T value, ElementFlagUpdate elementFlagUpdate) {
this.bKeyObject = new BKeyObject(bkey);
this.value = value;
Expand All @@ -65,6 +72,13 @@ public Element(long bkey, T value, ElementFlagUpdate elementFlagUpdate) {
this.elementFlagUpdate = elementFlagUpdate;
}

public Element(ByteArrayBKey bkey, T value, ElementFlagUpdate elementFlagUpdate) {
this.bKeyObject = new BKeyObject(bkey);
this.value = value;
this.eflag = null;
this.elementFlagUpdate = elementFlagUpdate;
}

/**
* get value of element flag by hex.
*
Expand Down
180 changes: 180 additions & 0 deletions src/main/java/net/spy/memcached/internal/CollectionGetFuture.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package net.spy.memcached.internal;

import net.spy.memcached.CachedData;
import net.spy.memcached.OperationTimeoutException;
import net.spy.memcached.collection.ByteArrayBKey;
import net.spy.memcached.collection.Element;
import net.spy.memcached.protocol.ascii.CollectionGetOperationImpl;
import net.spy.memcached.transcoders.Transcoder;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.ArrayList;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public abstract class CollectionGetFuture<T, R> extends CollectionFuture<T> {

protected final Transcoder<R> tc;

protected T result = null;

public CollectionGetFuture(CountDownLatch l, long opTimeout, Transcoder<R> tc) {
super(l, opTimeout);
this.tc = tc;
}

public static class ListGetFuture<T> extends CollectionGetFuture<List<T>, T> {
private final List<CachedData> cachedDataList = new ArrayList<CachedData>();

public ListGetFuture(CountDownLatch l, long opTimeout, Transcoder<T> tc) {
super(l, opTimeout, tc);
}

@Override
public List<T> get() throws InterruptedException, ExecutionException {
try {
return get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new OperationTimeoutException(e);
}
}

@Override
public List<T> get(long duration, TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

if (result == null) {
result = super.get(duration, units);
for (CachedData cachedData : this.cachedDataList) {
result.add(tc.decode(cachedData));
}
}
return result;
}

public void addCachedData(CachedData data) {
cachedDataList.add(data);
}
}

public static class SetGetFuture<T> extends CollectionGetFuture<Set<T>, T> {
private final List<CachedData> cachedDataList = new ArrayList<CachedData>();

public SetGetFuture(CountDownLatch l, long opTimeout, Transcoder<T> tc) {
super(l, opTimeout, tc);
}

@Override
public Set<T> get() throws InterruptedException, ExecutionException {
try {
return get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new OperationTimeoutException(e);
}
}

@Override
public Set<T> get(long duration, TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

if (result == null) {
result = super.get(duration, units);
for (CachedData cachedData : this.cachedDataList) {
result.add(tc.decode(cachedData));
}
}
return result;
}

public void addCachedData(CachedData data) {
cachedDataList.add(data);
}
}

public static class BTreeGetFuture<K, V> extends CollectionGetFuture<Map<K, Element<V>>, V> {
private final HashMap<K, CachedData> cachedDataMap
= new LinkedHashMap<K, CachedData>();

public BTreeGetFuture(CountDownLatch l, long opTimeout, Transcoder<V> tc) {
super(l, opTimeout, tc);
}

@Override
public Map<K, Element<V>> get() throws InterruptedException, ExecutionException {
try {
return get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new OperationTimeoutException(e);
}
}

@Override
public Map<K, Element<V>> get(long duration, TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

if (result == null) {
result = super.get(duration, units);
CollectionGetOperationImpl collectionGetOperation = (CollectionGetOperationImpl) op;
List<byte[]> elementFlag = collectionGetOperation.getElementFlag();
int index = 0;

for (Map.Entry<K, CachedData> entry : cachedDataMap.entrySet()) {
K bKey = entry.getKey();
CachedData cachedData = entry.getValue();
V decodeValue = tc.decode(cachedData);
Element<V> elem = bKey instanceof Long ?
new Element<V>((Long) bKey, decodeValue, elementFlag.get(index)) :
new Element<V>((ByteArrayBKey) bKey, decodeValue, elementFlag.get(index));
result.put(bKey, elem);
index++;
}
}
return result;
}

public void putCachedData(K key, CachedData data) {
cachedDataMap.put(key, data);
}
}

public static class MapGetFuture<T> extends CollectionGetFuture<Map<String, T>, T> {
private final HashMap<String, CachedData> cachedDataMap
= new HashMap<String, CachedData>();

public MapGetFuture(CountDownLatch l, long opTimeout, Transcoder<T> tc) {
super(l, opTimeout, tc);
}

@Override
public Map<String, T> get() throws InterruptedException, ExecutionException {
try {
return get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new OperationTimeoutException(e);
}
}

@Override
public Map<String, T> get(long duration, TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

if (result == null) {
result = super.get(duration, units);
for (Map.Entry<String, CachedData> entry : cachedDataMap.entrySet()) {
result.put(entry.getKey(), tc.decode(entry.getValue()));
}
}
return result;
}

public void putCachedData(String key, CachedData data) {
cachedDataMap.put(key, data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import net.spy.memcached.KeyUtil;
import net.spy.memcached.collection.BTreeGet;
Expand Down Expand Up @@ -298,6 +299,10 @@ public CollectionGet getGet() {
return collectionGet;
}

public List<byte[]> getElementFlag() {
return collectionGet.getElementFlag();
}

@Override
public boolean isBulkOperation() {
return false;
Expand Down

0 comments on commit 6d75954

Please sign in to comment.