Skip to content

Commit

Permalink
ENHANCE: Modify decoding logic in collection get api.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Aug 2, 2023
1 parent 4fbe331 commit 8fdc42b
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 49 deletions.
72 changes: 30 additions & 42 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.SMGetFuture;
import net.spy.memcached.internal.PipedCollectionFuture;
import net.spy.memcached.internal.CollectionGetFuture;
import net.spy.memcached.ops.BTreeFindPositionOperation;
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
import net.spy.memcached.ops.BTreeGetBulkOperation;
Expand Down Expand Up @@ -481,12 +482,11 @@ private <T> CollectionFuture<List<T>> asyncLopGet(final String k,
final CollectionGet collectionGet,
final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<List<T>> rv = new CollectionFuture<List<T>>(
latch, operationTimeout);
final CollectionGetFuture.ListGetFuture<T> rv =
new CollectionGetFuture.ListGetFuture<T>(latch, operationTimeout, tc);

Operation op = opFact.collectionGet(k, collectionGet,
new CollectionGetOperation.Callback() {
private final List<T> list = new ArrayList<T>();

public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
Expand All @@ -497,7 +497,7 @@ public void receivedStatus(OperationStatus status) {
cstatus = new CollectionOperationStatus(status);
}
if (cstatus.isSuccess()) {
rv.set(list, cstatus);
rv.set(new ArrayList<T>(), cstatus);
return;
}
switch (cstatus.getResponse()) {
Expand All @@ -506,11 +506,11 @@ public void receivedStatus(OperationStatus status) {
getLogger().debug("Key(%s) not found : %s", k, cstatus);
break;
case NOT_FOUND_ELEMENT:
rv.set(list, cstatus);
rv.set(new ArrayList<T>(), cstatus);
getLogger().debug("Element(%s) not found : %s", k, cstatus);
break;
case OUT_OF_RANGE:
rv.set(list, cstatus);
rv.set(new ArrayList<T>(), cstatus);
getLogger().debug("Element(%s) not found in condition : %s", k, cstatus);
break;
case UNREADABLE:
Expand All @@ -529,7 +529,7 @@ public void complete() {
}

public void gotData(String subkey, int flags, byte[] data, byte[] eflag) {
list.add(tc.decode(new CachedData(flags, data, tc.getMaxSize())));
rv.addCachedData(new CachedData(flags, data, tc.getMaxSize()));
}
});
rv.setOperation(op);
Expand Down Expand Up @@ -562,12 +562,11 @@ private <T> CollectionFuture<Set<T>> asyncSopGet(final String k,
final CollectionGet collectionGet,
final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Set<T>> rv = new CollectionFuture<Set<T>>(latch,
operationTimeout);
final CollectionGetFuture.SetGetFuture<T> rv =
new CollectionGetFuture.SetGetFuture<T>(latch, operationTimeout, tc);

Operation op = opFact.collectionGet(k, collectionGet,
new CollectionGetOperation.Callback() {
private final Set<T> set = new HashSet<T>();

public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
Expand All @@ -578,7 +577,7 @@ public void receivedStatus(OperationStatus status) {
cstatus = new CollectionOperationStatus(status);
}
if (cstatus.isSuccess()) {
rv.set(set, cstatus);
rv.set(new HashSet<T>(), cstatus);
return;
}

Expand All @@ -588,7 +587,7 @@ public void receivedStatus(OperationStatus status) {
getLogger().debug("Key(%s) not found : %s", k, cstatus);
break;
case NOT_FOUND_ELEMENT:
rv.set(set, cstatus);
rv.set(new HashSet<T>(), cstatus);
getLogger().debug("Element(%s) not found : %s", k, cstatus);
break;
case UNREADABLE:
Expand All @@ -607,7 +606,7 @@ public void complete() {
}

public void gotData(String subkey, int flags, byte[] data, byte[] eflag) {
set.add(tc.decode(new CachedData(flags, data, tc.getMaxSize())));
rv.addCachedData(new CachedData(flags, data, tc.getMaxSize()));
}
});

Expand All @@ -629,13 +628,10 @@ private <T> CollectionFuture<Map<Long, Element<T>>> asyncBopGet(
final String k, final CollectionGet collectionGet,
final boolean reverse, final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Map<Long, Element<T>>> rv = new CollectionFuture<Map<Long, Element<T>>>(
latch, operationTimeout);
final CollectionGetFuture.BTreeGetFuture<Long, T> rv =
new CollectionGetFuture.BTreeGetFuture<Long, T>(latch, operationTimeout, tc);
Operation op = opFact.collectionGet(k, collectionGet,
new CollectionGetOperation.Callback() {
private final TreeMap<Long, Element<T>> map = new TreeMap<Long, Element<T>>(
(reverse) ? Collections.reverseOrder() : null);

public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
Expand All @@ -645,7 +641,7 @@ public void receivedStatus(OperationStatus status) {
cstatus = new CollectionOperationStatus(status);
}
if (cstatus.isSuccess()) {
rv.set(map, cstatus);
rv.set(new TreeMap<Long, Element<T>>((reverse) ? Collections.reverseOrder() : null), cstatus);
return;
}
switch (cstatus.getResponse()) {
Expand All @@ -654,7 +650,7 @@ public void receivedStatus(OperationStatus status) {
getLogger().debug("Key(%s) not found : %s", k, cstatus);
break;
case NOT_FOUND_ELEMENT:
rv.set(map, cstatus);
rv.set(new TreeMap<Long, Element<T>>((reverse) ? Collections.reverseOrder() : null), cstatus);
getLogger().debug("Element(%s) not found : %s", k, cstatus);
break;
case UNREADABLE:
Expand All @@ -673,9 +669,7 @@ public void complete() {
}

public void gotData(String bkey, int flags, byte[] data, byte[] eflag) {
long longBkey = Long.parseLong(bkey);
map.put(longBkey, new Element<T>(longBkey,
tc.decode(new CachedData(flags, data, tc.getMaxSize())), eflag));
rv.putCachedData(Long.parseLong(bkey), new CachedData(flags, data, eflag, tc.getMaxSize()));
}
});
rv.setOperation(op);
Expand All @@ -694,12 +688,10 @@ public void gotData(String bkey, int flags, byte[] data, byte[] eflag) {
private <T> CollectionFuture<Map<String, T>> asyncMopGet(
final String k, final CollectionGet collectionGet, final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Map<String, T>> rv = new CollectionFuture<Map<String, T>>(
latch, operationTimeout);
final CollectionGetFuture.MapGetFuture<T> rv =
new CollectionGetFuture.MapGetFuture<T>(latch, operationTimeout, tc);
Operation op = opFact.collectionGet(k, collectionGet,
new CollectionGetOperation.Callback() {
private final HashMap<String, T> map = new HashMap<String, T>();

public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
Expand All @@ -709,7 +701,7 @@ public void receivedStatus(OperationStatus status) {
cstatus = new CollectionOperationStatus(status);
}
if (cstatus.isSuccess()) {
rv.set(map, cstatus);
rv.set(new HashMap<String, T>(), cstatus);
return;
}
switch (cstatus.getResponse()) {
Expand All @@ -718,7 +710,7 @@ public void receivedStatus(OperationStatus status) {
getLogger().debug("Key(%s) not found : %s", k, cstatus);
break;
case NOT_FOUND_ELEMENT:
rv.set(map, cstatus);
rv.set(new HashMap<String, T>(), cstatus);
getLogger().debug("Element(%s) not found : %s", k, cstatus);
break;
case UNREADABLE:
Expand All @@ -737,7 +729,7 @@ public void complete() {
}

public void gotData(String mkey, int flags, byte[] data, byte[] eflag) {
map.put(mkey, tc.decode(new CachedData(flags, data, tc.getMaxSize())));
rv.putCachedData(mkey, new CachedData(flags, data, tc.getMaxSize()));
}
});
rv.setOperation(op);
Expand Down Expand Up @@ -2930,15 +2922,11 @@ private <T> CollectionFuture<Map<ByteArrayBKey, Element<T>>> asyncBopExtendedGet
final boolean reverse, final Transcoder<T> tc) {

final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Map<ByteArrayBKey, Element<T>>> rv =
new CollectionFuture<Map<ByteArrayBKey, Element<T>>>(latch, operationTimeout);
final CollectionGetFuture.BTreeGetFuture<ByteArrayBKey, T> rv
= new CollectionGetFuture.BTreeGetFuture<ByteArrayBKey, T>(latch, operationTimeout, tc);

Operation op = opFact.collectionGet(k, collectionGet,
new CollectionGetOperation.Callback() {
private final TreeMap<ByteArrayBKey, Element<T>> map
= new ByteArrayTreeMap<ByteArrayBKey, Element<T>>(
(reverse) ? Collections.reverseOrder() : null);

public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
Expand All @@ -2948,7 +2936,8 @@ public void receivedStatus(OperationStatus status) {
cstatus = new CollectionOperationStatus(status);
}
if (cstatus.isSuccess()) {
rv.set(map, cstatus);
rv.set(new ByteArrayTreeMap<ByteArrayBKey, Element<T>>(
(reverse) ? Collections.reverseOrder() : null), cstatus);
return;
}
switch (cstatus.getResponse()) {
Expand All @@ -2957,7 +2946,8 @@ public void receivedStatus(OperationStatus status) {
getLogger().debug("Key(%s) not found : %s", k, cstatus);
break;
case NOT_FOUND_ELEMENT:
rv.set(map, cstatus);
rv.set(new ByteArrayTreeMap<ByteArrayBKey, Element<T>>(
(reverse) ? Collections.reverseOrder() : null), cstatus);
getLogger().debug("Element(%s) not found : %s", k, cstatus);
break;
case UNREADABLE:
Expand All @@ -2976,10 +2966,8 @@ public void complete() {
}

public void gotData(String bkey, int flags, byte[] data, byte[] eflag) {
byte[] byteBkey = BTreeUtil.hexStringToByteArrays(bkey);
Element<T> element = new Element<T>(byteBkey,
tc.decode(new CachedData(flags, data, tc.getMaxSize())), eflag);
map.put(new ByteArrayBKey(byteBkey), element);
rv.putCachedData(new ByteArrayBKey(BTreeUtil.hexStringToByteArrays(bkey)),
new CachedData(flags, data, eflag, tc.getMaxSize()));
}
});
rv.setOperation(op);
Expand Down
32 changes: 25 additions & 7 deletions src/main/java/net/spy/memcached/CachedData.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,38 @@ public final class CachedData {

private final int flags;
private final byte[] data;
private byte[] eFlag;

/**
* Get a CachedData instance for the given flags and byte array.
*
* @param f the flags
* @param d the data
* @param flag the flags
* @param data the data
* @param max_size the maximum allowable size.
*/
public CachedData(int f, byte[] d, int max_size) {
public CachedData(int flag, byte[] data, int max_size) {
super();
if (d.length > max_size) {
if (data.length > max_size) {
throw new IllegalArgumentException(
"Cannot cache data larger than " + max_size
+ " bytes (you tried to cache a "
+ d.length + " byte object)");
+ data.length + " byte object)");
}
flags = f;
data = d;
flags = flag;
this.data = data;
}

/**
* Get a CachedData instance for the given flags and byte array.
*
* @param flag the flags
* @param data the data
* @param eFlag the eflag
* @param max_size the maximum allowable size.
*/
public CachedData(int flag, byte[] data, byte[] eFlag, int max_size) {
this(flag, data, max_size);
this.eFlag = eFlag;
}

/**
Expand All @@ -50,6 +64,10 @@ public int getFlags() {
return flags;
}

public byte[] getEFlag() {
return eFlag;
}

@Override
public String toString() {
return "{CachedData flags=" + flags + " data="
Expand Down
Loading

0 comments on commit 8fdc42b

Please sign in to comment.