diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 2911f7ba6..878a89e75 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -3531,97 +3531,104 @@ public CollectionFuture asyncBopGetItemCount(String key, @Override public CollectionFuture> asyncSopPipedExistBulk(String key, List values) { - SetPipedExist exist = new SetPipedExist(key, values, collectionTranscoder); - return asyncSetPipedExist(key, exist); + if (values.size() == 0) { + throw new IllegalArgumentException( + "The number of piped operations must be larger than 0."); + } + + List> existList = new ArrayList>(); + if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) { + existList.add(new SetPipedExist(key, values, collectionTranscoder)); + } else { + PartitionedList partitionedList = new PartitionedList(values, SetPipedExist.MAX_PIPED_ITEM_COUNT); + for (List partition : partitionedList) { + existList.add(new SetPipedExist(key, partition, collectionTranscoder)); + } + } + return asyncSetPipedExist(key, existList); } @Override public CollectionFuture> asyncSopPipedExistBulk(String key, List values, Transcoder tc) { - SetPipedExist exist = new SetPipedExist(key, values, tc); - return asyncSetPipedExist(key, exist); + if (values.size() == 0) { + throw new IllegalArgumentException( + "The number of piped operations must be larger than 0."); + } + + List> existList = new ArrayList>(); + if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) { + existList.add(new SetPipedExist(key, values, tc)); + } else { + PartitionedList partitionedList = new PartitionedList(values, SetPipedExist.MAX_PIPED_ITEM_COUNT); + for (List partition : partitionedList) { + existList.add(new SetPipedExist(key, partition, tc)); + } + } + return asyncSetPipedExist(key, existList); } /** * Generic pipelined existence operation for set items. Public methods call this method. * * @param key collection item's key - * @param exist operation parameters (element values) + * @param existList list of operation parameters (element values) * @return future holding the map of elements and their existence results */ CollectionFuture> asyncSetPipedExist( - final String key, final SetPipedExist exist) { - - if (exist.getItemCount() == 0) { - throw new IllegalArgumentException( - "The number of piped operations must be larger than 0."); - } - if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) { - throw new IllegalArgumentException( - "The number of piped operations must not exceed a maximum of " - + CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + "."); - } - - final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture> rv = new CollectionFuture>( - latch, operationTimeout); - - Operation op = opFact.collectionPipedExist(key, exist, - new CollectionPipedExistOperation.Callback() { - - private final Map result = new HashMap(); - private boolean hasAnError = false; - - public void receivedStatus(OperationStatus status) { - if (hasAnError) { - return; - } - - CollectionOperationStatus cstatus; - if (status instanceof CollectionOperationStatus) { - cstatus = (CollectionOperationStatus) status; - } else { - getLogger().warn("Unhandled state: " + status); - cstatus = new CollectionOperationStatus(status); - } - rv.set(result, cstatus); - } + final String key, final List> existList) { + final CountDownLatch latch = new CountDownLatch(existList.size()); - public void complete() { - latch.countDown(); - } + final PipedCollectionFuture rv + = new PipedCollectionFuture(latch, operationTimeout); - public void gotStatus(Integer index, OperationStatus status) { - CollectionOperationStatus cstatus; - if (status instanceof CollectionOperationStatus) { - cstatus = (CollectionOperationStatus) status; - } else { - cstatus = new CollectionOperationStatus(status); - } + for (final SetPipedExist exist : existList) { + Operation op = opFact.collectionPipedExist(key, exist, + new CollectionPipedExistOperation.Callback() { + public void gotStatus(Integer index, OperationStatus status) { + CollectionOperationStatus cstatus; + if (status instanceof CollectionOperationStatus) { + cstatus = (CollectionOperationStatus) status; + } else { + getLogger().warn("Unhandled state: " + status); + cstatus = new CollectionOperationStatus(status); + } + switch (cstatus.getResponse()) { + case EXIST: + case NOT_EXIST: + rv.addEachResult(exist.getValues().get(index), + (CollectionResponse.EXIST.equals(cstatus + .getResponse()))); + break; + case UNREADABLE: + case TYPE_MISMATCH: + case NOT_FOUND: + rv.addOperationStatus(cstatus); + break; + default: + getLogger().warn("Unhandled state: " + status); + } + } - switch (cstatus.getResponse()) { - case EXIST: - case NOT_EXIST: - result.put(exist.getValues().get(index), - (CollectionResponse.EXIST.equals(cstatus - .getResponse()))); - break; - case UNREADABLE: - case TYPE_MISMATCH: - case NOT_FOUND: - hasAnError = true; - rv.set(new HashMap(0), - (CollectionOperationStatus) status); - break; - default: - getLogger().warn("Unhandled state: " + status); - } - } - }); + public void receivedStatus(OperationStatus status) { + CollectionOperationStatus cstatus; + if (status instanceof CollectionOperationStatus) { + cstatus = (CollectionOperationStatus) status; + } else { + getLogger().warn("Unhandled state: " + status); + cstatus = new CollectionOperationStatus(status); + } + rv.addOperationStatus(cstatus); + } - rv.setOperation(op); - addOp(key, op); + public void complete() { + latch.countDown(); + } + }); + rv.addOperation(op); + addOp(key, op); + } return rv; }