Skip to content

Commit

Permalink
ENHANCE: Change asyncSetPipedExist method logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Jul 24, 2023
1 parent 4bc1139 commit 9a0d84c
Showing 1 changed file with 79 additions and 72 deletions.
151 changes: 79 additions & 72 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3531,97 +3531,104 @@ public CollectionFuture<Integer> asyncBopGetItemCount(String key,
@Override
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key,
List<Object> values) {
SetPipedExist<Object> exist = new SetPipedExist<Object>(key, values, collectionTranscoder);
return asyncSetPipedExist(key, exist);
if (values.size() == 0) {
throw new IllegalArgumentException(
"The number of piped operations must be larger than 0.");
}

List<SetPipedExist<Object>> existList = new ArrayList<SetPipedExist<Object>>();
if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) {
existList.add(new SetPipedExist<Object>(key, values, collectionTranscoder));
} else {
PartitionedList<Object> partitionedList = new PartitionedList<Object>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT);
for (List<Object> partition : partitionedList) {
existList.add(new SetPipedExist<Object>(key, partition, collectionTranscoder));
}
}
return asyncSetPipedExist(key, existList);
}

@Override
public <T> CollectionFuture<Map<T, Boolean>> asyncSopPipedExistBulk(String key,
List<T> values,
Transcoder<T> tc) {
SetPipedExist<T> exist = new SetPipedExist<T>(key, values, tc);
return asyncSetPipedExist(key, exist);
if (values.size() == 0) {
throw new IllegalArgumentException(
"The number of piped operations must be larger than 0.");
}

List<SetPipedExist<T>> existList = new ArrayList<SetPipedExist<T>>();
if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) {
existList.add(new SetPipedExist<T>(key, values, tc));
} else {
PartitionedList<T> partitionedList = new PartitionedList<T>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT);
for (List<T> partition : partitionedList) {
existList.add(new SetPipedExist<T>(key, partition, tc));
}
}
return asyncSetPipedExist(key, existList);
}

/**
* Generic pipelined existence operation for set items. Public methods call this method.
*
* @param key collection item's key
* @param exist operation parameters (element values)
* @param existList list of operation parameters (element values)
* @return future holding the map of elements and their existence results
*/
<T> CollectionFuture<Map<T, Boolean>> asyncSetPipedExist(
final String key, final SetPipedExist<T> exist) {

if (exist.getItemCount() == 0) {
throw new IllegalArgumentException(
"The number of piped operations must be larger than 0.");
}
if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
throw new IllegalArgumentException(
"The number of piped operations must not exceed a maximum of "
+ CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + ".");
}

final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Map<T, Boolean>> rv = new CollectionFuture<Map<T, Boolean>>(
latch, operationTimeout);

Operation op = opFact.collectionPipedExist(key, exist,
new CollectionPipedExistOperation.Callback() {

private final Map<T, Boolean> result = new HashMap<T, Boolean>();
private boolean hasAnError = false;

public void receivedStatus(OperationStatus status) {
if (hasAnError) {
return;
}

CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.set(result, cstatus);
}
final String key, final List<SetPipedExist<T>> existList) {
final CountDownLatch latch = new CountDownLatch(existList.size());

public void complete() {
latch.countDown();
}
final PipedCollectionFuture<T, Boolean> rv
= new PipedCollectionFuture<T, Boolean>(latch, operationTimeout);

public void gotStatus(Integer index, OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
cstatus = new CollectionOperationStatus(status);
}
for (final SetPipedExist<T> exist : existList) {
Operation op = opFact.collectionPipedExist(key, exist,
new CollectionPipedExistOperation.Callback() {
public void gotStatus(Integer index, OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
switch (cstatus.getResponse()) {
case EXIST:
case NOT_EXIST:
rv.addEachResult(exist.getValues().get(index),
(CollectionResponse.EXIST.equals(cstatus
.getResponse())));
break;
case UNREADABLE:
case TYPE_MISMATCH:
case NOT_FOUND:
rv.addOperationStatus(cstatus);
break;
default:
getLogger().warn("Unhandled state: " + status);
}
}

switch (cstatus.getResponse()) {
case EXIST:
case NOT_EXIST:
result.put(exist.getValues().get(index),
(CollectionResponse.EXIST.equals(cstatus
.getResponse())));
break;
case UNREADABLE:
case TYPE_MISMATCH:
case NOT_FOUND:
hasAnError = true;
rv.set(new HashMap<T, Boolean>(0),
(CollectionOperationStatus) status);
break;
default:
getLogger().warn("Unhandled state: " + status);
}
}
});
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.addOperationStatus(cstatus);
}

rv.setOperation(op);
addOp(key, op);
public void complete() {
latch.countDown();
}
});
rv.addOperation(op);
addOp(key, op);
}
return rv;
}

Expand Down

0 comments on commit 9a0d84c

Please sign in to comment.