Skip to content

Commit

Permalink
ENHANCE: Modify decoding logic in collection get api.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 authored and jhpark816 committed Aug 9, 2023
1 parent 7960754 commit d731a06
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 53 deletions.
135 changes: 93 additions & 42 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.SMGetFuture;
import net.spy.memcached.internal.PipedCollectionFuture;
import net.spy.memcached.internal.CollectionGetFuture;
import net.spy.memcached.ops.BTreeFindPositionOperation;
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
import net.spy.memcached.ops.BTreeGetBulkOperation;
Expand Down Expand Up @@ -481,12 +482,12 @@ private <T> CollectionFuture<List<T>> asyncLopGet(final String k,
final CollectionGet collectionGet,
final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<List<T>> rv = new CollectionFuture<List<T>>(
latch, operationTimeout);
final CollectionGetFuture<List<T>> rv = new CollectionGetFuture<List<T>>(latch, operationTimeout);

Operation op = opFact.collectionGet(k, collectionGet,
new CollectionGetOperation.Callback() {
private final List<T> list = new ArrayList<T>();
private final List<T> result = new ArrayList<T>();
private final List<CachedData> cachedDataList = new ArrayList<CachedData>();

public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
Expand All @@ -497,7 +498,7 @@ public void receivedStatus(OperationStatus status) {
cstatus = new CollectionOperationStatus(status);
}
if (cstatus.isSuccess()) {
rv.set(list, cstatus);
rv.set(result, cstatus);
return;
}
switch (cstatus.getResponse()) {
Expand All @@ -506,11 +507,11 @@ public void receivedStatus(OperationStatus status) {
getLogger().debug("Key(%s) not found : %s", k, cstatus);
break;
case NOT_FOUND_ELEMENT:
rv.set(list, cstatus);
rv.set(result, cstatus);
getLogger().debug("Element(%s) not found : %s", k, cstatus);
break;
case OUT_OF_RANGE:
rv.set(list, cstatus);
rv.set(result, cstatus);
getLogger().debug("Element(%s) not found in condition : %s", k, cstatus);
break;
case UNREADABLE:
Expand All @@ -529,7 +530,16 @@ public void complete() {
}

public void gotData(String subkey, int flags, byte[] data, byte[] eflag) {
list.add(tc.decode(new CachedData(flags, data, tc.getMaxSize())));
cachedDataList.add(new CachedData(flags, data, tc.getMaxSize()));
}

@Override
public void addResult() {
if (result.isEmpty() && !cachedDataList.isEmpty()) {
for (CachedData cachedData : cachedDataList) {
result.add(tc.decode(cachedData));
}
}
}
});
rv.setOperation(op);
Expand Down Expand Up @@ -562,12 +572,15 @@ private <T> CollectionFuture<Set<T>> asyncSopGet(final String k,
final CollectionGet collectionGet,
final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Set<T>> rv = new CollectionFuture<Set<T>>(latch,
operationTimeout);
final CollectionGetFuture<Set<T>> rv =
new CollectionGetFuture<Set<T>>(latch, operationTimeout);

Operation op = opFact.collectionGet(k, collectionGet,
new CollectionGetOperation.Callback() {
private final Set<T> set = new HashSet<T>();

private final HashSet<T> result = new HashSet<T>();
private final HashSet<CachedData> cachedDataSet = new HashSet<CachedData>();


public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
Expand All @@ -578,7 +591,7 @@ public void receivedStatus(OperationStatus status) {
cstatus = new CollectionOperationStatus(status);
}
if (cstatus.isSuccess()) {
rv.set(set, cstatus);
rv.set(result, cstatus);
return;
}

Expand All @@ -588,7 +601,7 @@ public void receivedStatus(OperationStatus status) {
getLogger().debug("Key(%s) not found : %s", k, cstatus);
break;
case NOT_FOUND_ELEMENT:
rv.set(set, cstatus);
rv.set(result, cstatus);
getLogger().debug("Element(%s) not found : %s", k, cstatus);
break;
case UNREADABLE:
Expand All @@ -607,7 +620,16 @@ public void complete() {
}

public void gotData(String subkey, int flags, byte[] data, byte[] eflag) {
set.add(tc.decode(new CachedData(flags, data, tc.getMaxSize())));
cachedDataSet.add(new CachedData(flags, data, tc.getMaxSize()));
}

@Override
public void addResult() {
if (result.isEmpty() && !cachedDataSet.isEmpty()) {
for (CachedData cachedData : cachedDataSet) {
result.add(tc.decode(cachedData));
}
}
}
});

Expand All @@ -629,13 +651,13 @@ private <T> CollectionFuture<Map<Long, Element<T>>> asyncBopGet(
final String k, final CollectionGet collectionGet,
final boolean reverse, final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Map<Long, Element<T>>> rv = new CollectionFuture<Map<Long, Element<T>>>(
latch, operationTimeout);
final CollectionGetFuture<Map<Long, Element<T>>> rv =
new CollectionGetFuture<Map<Long, Element<T>>>(latch, operationTimeout);
Operation op = opFact.collectionGet(k, collectionGet,
new CollectionGetOperation.Callback() {
private final TreeMap<Long, Element<T>> map = new TreeMap<Long, Element<T>>(
(reverse) ? Collections.reverseOrder() : null);

private final TreeMap<Long, Element<T>> result =
new TreeMap<Long, Element<T>>((reverse) ? Collections.reverseOrder() : null);
private final HashMap<Long, CachedData> cachedDataMap = new HashMap<Long, CachedData>();
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
Expand All @@ -645,7 +667,7 @@ public void receivedStatus(OperationStatus status) {
cstatus = new CollectionOperationStatus(status);
}
if (cstatus.isSuccess()) {
rv.set(map, cstatus);
rv.set(result, cstatus);
return;
}
switch (cstatus.getResponse()) {
Expand All @@ -654,7 +676,7 @@ public void receivedStatus(OperationStatus status) {
getLogger().debug("Key(%s) not found : %s", k, cstatus);
break;
case NOT_FOUND_ELEMENT:
rv.set(map, cstatus);
rv.set(result, cstatus);
getLogger().debug("Element(%s) not found : %s", k, cstatus);
break;
case UNREADABLE:
Expand All @@ -672,10 +694,19 @@ public void complete() {
latch.countDown();
}

public void gotData(String bkey, int flags, byte[] data, byte[] eflag) {
long longBkey = Long.parseLong(bkey);
map.put(longBkey, new Element<T>(longBkey,
tc.decode(new CachedData(flags, data, tc.getMaxSize())), eflag));
public void gotData(String bKey, int flags, byte[] data, byte[] eflag) {
cachedDataMap.put(Long.parseLong(bKey), new CachedData(flags, data, eflag, tc.getMaxSize()));
}

@Override
public void addResult() {
if (result.isEmpty() && !cachedDataMap.isEmpty()) {
for (Entry<Long, CachedData> cachedDataEntry : this.cachedDataMap.entrySet()) {
Long bKey = cachedDataEntry.getKey();
CachedData cachedData = cachedDataEntry.getValue();
result.put(bKey, new Element<T>(bKey, tc.decode(cachedData), cachedData.getEFlag()));
}
}
}
});
rv.setOperation(op);
Expand All @@ -694,12 +725,13 @@ public void gotData(String bkey, int flags, byte[] data, byte[] eflag) {
private <T> CollectionFuture<Map<String, T>> asyncMopGet(
final String k, final CollectionGet collectionGet, final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Map<String, T>> rv = new CollectionFuture<Map<String, T>>(
latch, operationTimeout);
final CollectionGetFuture<Map<String, T>> rv =
new CollectionGetFuture<Map<String, T>>(latch, operationTimeout);
Operation op = opFact.collectionGet(k, collectionGet,
new CollectionGetOperation.Callback() {
private final HashMap<String, T> map = new HashMap<String, T>();

private final HashMap<String, T> result = new HashMap<String, T>();
private final HashMap<String, CachedData> cachedDataMap = new HashMap<String, CachedData>();
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
Expand All @@ -709,7 +741,7 @@ public void receivedStatus(OperationStatus status) {
cstatus = new CollectionOperationStatus(status);
}
if (cstatus.isSuccess()) {
rv.set(map, cstatus);
rv.set(result, cstatus);
return;
}
switch (cstatus.getResponse()) {
Expand All @@ -718,7 +750,7 @@ public void receivedStatus(OperationStatus status) {
getLogger().debug("Key(%s) not found : %s", k, cstatus);
break;
case NOT_FOUND_ELEMENT:
rv.set(map, cstatus);
rv.set(result, cstatus);
getLogger().debug("Element(%s) not found : %s", k, cstatus);
break;
case UNREADABLE:
Expand All @@ -737,7 +769,18 @@ public void complete() {
}

public void gotData(String mkey, int flags, byte[] data, byte[] eflag) {
map.put(mkey, tc.decode(new CachedData(flags, data, tc.getMaxSize())));
cachedDataMap.put(mkey, new CachedData(flags, data, eflag, tc.getMaxSize()));
}

@Override
public void addResult() {
if (result.isEmpty() && !cachedDataMap.isEmpty()) {
for (Entry<String, CachedData> cachedDataEntry : this.cachedDataMap.entrySet()) {
String mKey = cachedDataEntry.getKey();
CachedData cachedData = cachedDataEntry.getValue();
result.put(mKey, tc.decode(cachedData));
}
}
}
});
rv.setOperation(op);
Expand Down Expand Up @@ -2920,15 +2963,14 @@ private <T> CollectionFuture<Map<ByteArrayBKey, Element<T>>> asyncBopExtendedGet
final boolean reverse, final Transcoder<T> tc) {

final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Map<ByteArrayBKey, Element<T>>> rv =
new CollectionFuture<Map<ByteArrayBKey, Element<T>>>(latch, operationTimeout);
final CollectionGetFuture<Map<ByteArrayBKey, Element<T>>> rv
= new CollectionGetFuture<Map<ByteArrayBKey, Element<T>>>(latch, operationTimeout);

Operation op = opFact.collectionGet(k, collectionGet,
new CollectionGetOperation.Callback() {
private final TreeMap<ByteArrayBKey, Element<T>> map
= new ByteArrayTreeMap<ByteArrayBKey, Element<T>>(
(reverse) ? Collections.reverseOrder() : null);

private final TreeMap<ByteArrayBKey, Element<T>> result
= new ByteArrayTreeMap<ByteArrayBKey, Element<T>>((reverse) ? Collections.reverseOrder() : null);
private final HashMap<ByteArrayBKey, CachedData> cachedDataMap = new HashMap<ByteArrayBKey, CachedData>();
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
Expand All @@ -2938,7 +2980,7 @@ public void receivedStatus(OperationStatus status) {
cstatus = new CollectionOperationStatus(status);
}
if (cstatus.isSuccess()) {
rv.set(map, cstatus);
rv.set(result, cstatus);
return;
}
switch (cstatus.getResponse()) {
Expand All @@ -2947,7 +2989,7 @@ public void receivedStatus(OperationStatus status) {
getLogger().debug("Key(%s) not found : %s", k, cstatus);
break;
case NOT_FOUND_ELEMENT:
rv.set(map, cstatus);
rv.set(result, cstatus);
getLogger().debug("Element(%s) not found : %s", k, cstatus);
break;
case UNREADABLE:
Expand All @@ -2966,10 +3008,19 @@ public void complete() {
}

public void gotData(String bkey, int flags, byte[] data, byte[] eflag) {
byte[] byteBkey = BTreeUtil.hexStringToByteArrays(bkey);
Element<T> element = new Element<T>(byteBkey,
tc.decode(new CachedData(flags, data, tc.getMaxSize())), eflag);
map.put(new ByteArrayBKey(byteBkey), element);
cachedDataMap.put(new ByteArrayBKey(BTreeUtil.hexStringToByteArrays(bkey)),
new CachedData(flags, data, eflag, tc.getMaxSize()));
}

@Override
public void addResult() {
if (result.isEmpty() && !cachedDataMap.isEmpty()) {
for (Entry<ByteArrayBKey, CachedData> cachedDataEntry : this.cachedDataMap.entrySet()) {
ByteArrayBKey bKey = cachedDataEntry.getKey();
CachedData cachedData = cachedDataEntry.getValue();
result.put(bKey, new Element<T>(bKey.getBytes(), tc.decode(cachedData), cachedData.getEFlag()));
}
}
}
});
rv.setOperation(op);
Expand Down
39 changes: 29 additions & 10 deletions src/main/java/net/spy/memcached/CachedData.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,38 @@ public final class CachedData {

private final int flags;
private final byte[] data;
private byte[] eFlag;

/**
* Get a CachedData instance for the given flags and byte array.
*
* @param f the flags
* @param d the data
* @param max_size the maximum allowable size.
* @param flags the flags
* @param data the data
* @param max_size the maximum allowable size.
*/
public CachedData(int f, byte[] d, int max_size) {
public CachedData(int flags, byte[] data, int max_size) {
super();
if (d.length > max_size) {
if (data.length > max_size) {
throw new IllegalArgumentException(
"Cannot cache data larger than " + max_size
+ " bytes (you tried to cache a "
+ d.length + " byte object)");
+ data.length + " byte object)");
}
flags = f;
data = d;
this.flags = flags;
this.data = data;
}

/**
* Get a CachedData instance for the given flags and byte array.
*
* @param flag the flags
* @param data the data
* @param eFlag the eFlag
* @param max_size the maximum allowable size.
*/
public CachedData(int flag, byte[] data, byte[] eFlag, int max_size) {
this(flag, data, max_size);
this.eFlag = eFlag;
}

/**
Expand All @@ -50,9 +64,14 @@ public int getFlags() {
return flags;
}

public byte[] getEFlag() {
return eFlag;
}

@Override
public String toString() {
return "{CachedData flags=" + flags + " data="
+ Arrays.toString(data) + "}";
return "{CachedData flags=" + flags +
" data=" + Arrays.toString(data) +
" eFlag=" + Arrays.toString(eFlag) + " }";
}
}
27 changes: 27 additions & 0 deletions src/main/java/net/spy/memcached/internal/CollectionGetFuture.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package net.spy.memcached.internal;

import net.spy.memcached.ops.CollectionGetOperation.Callback;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CollectionGetFuture<T> extends CollectionFuture<T> {

public CollectionGetFuture(CountDownLatch l, long opTimeout) {
super(l, opTimeout);
}

@Override
public T get(long duration, TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

T result = super.get(duration, units);
if (result != null) {
Callback callback = (Callback) op.getCallback();
callback.addResult();
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public interface CollectionGetOperation extends KeyedOperation {

interface Callback extends OperationCallback {
void gotData(String subkey, int flags, byte[] data, byte[] eflag);
void addResult();
}

}
Loading

0 comments on commit d731a06

Please sign in to comment.