From fbefe746a9eee8664540406fce97f272f8789b56 Mon Sep 17 00:00:00 2001 From: brido4125 Date: Mon, 12 Jun 2023 18:29:48 +0900 Subject: [PATCH] INTERNAL: Creating piped collection operation future. --- .../java/net/spy/memcached/ArcusClient.java | 198 ++---------------- .../internal/PipedCollectionFuture.java | 122 +++++++++++ 2 files changed, 137 insertions(+), 183 deletions(-) create mode 100644 src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 84da5aae3..1744ea4af 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -126,6 +126,7 @@ import net.spy.memcached.internal.CollectionGetBulkFuture; import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.internal.SMGetFuture; +import net.spy.memcached.internal.PipedCollectionFuture; import net.spy.memcached.ops.BTreeFindPositionOperation; import net.spy.memcached.ops.BTreeFindPositionWithGetOperation; import net.spy.memcached.ops.BTreeGetBulkOperation; @@ -948,15 +949,9 @@ public void gotStatus(Integer index, OperationStatus status) { CollectionFuture> asyncCollectionPipedUpdate( final String key, final List> updateList) { - final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue(); - final CountDownLatch latch = new CountDownLatch(updateList.size()); - - final List mergedOperationStatus = Collections - .synchronizedList(new ArrayList(updateList.size())); - - final Map mergedResult = - new ConcurrentHashMap(); + final PipedCollectionFuture rv = + new PipedCollectionFuture(latch, operationTimeout, updateList.size()); for (int i = 0; i < updateList.size(); i++) { final CollectionPipedUpdate update = updateList.get(i); @@ -974,7 +969,7 @@ public void receivedStatus(OperationStatus status) { getLogger().warn("Unhandled state: " + status); cstatus = new CollectionOperationStatus(status); } - mergedOperationStatus.add(cstatus); + rv.addOperationStatus(cstatus); } // complete @@ -985,97 +980,18 @@ public void complete() { // got status public void gotStatus(Integer index, OperationStatus status) { if (status instanceof CollectionOperationStatus) { - mergedResult.put(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT), + rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT), (CollectionOperationStatus) status); } else { - mergedResult.put(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT), + rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT), new CollectionOperationStatus(status)); } } }); addOp(key, op); - ops.add(op); + rv.addOperation(op); } - - return new CollectionFuture>( - latch, operationTimeout) { - - @Override - public boolean cancel(boolean ign) { - boolean rv = false; - for (Operation op : ops) { - op.cancel("by application."); - rv |= op.getState() == OperationState.WRITE_QUEUED; - } - return rv; - } - - @Override - public boolean isCancelled() { - for (Operation op : ops) { - if (op.isCancelled()) { - return true; - } - } - return false; - } - - @Override - public Map get(long duration, - TimeUnit units) - throws InterruptedException, TimeoutException, ExecutionException { - - if (!latch.await(duration, units)) { - Collection timedoutOps = new HashSet(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - timedoutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (timedoutOps.size() > 0) { - MemcachedConnection.opTimedOut(timedoutOps.iterator().next()); - throw new CheckedOperationTimeoutException(duration, units, timedoutOps); - } - } else { - // continuous timeout counter will be reset only once in pipe - MemcachedConnection.opSucceeded(ops.iterator().next()); - } - - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); - } - - if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); - } - } - - return mergedResult; - } - - @Override - public CollectionOperationStatus getOperationStatus() { - for (CollectionOperationStatus status : mergedOperationStatus) { - if (!status.isSuccess()) { - return status; - } - } - return new CollectionOperationStatus(true, "END", CollectionResponse.END); - } - - @Override - public boolean isDone() { - for (Operation op : ops) { - if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) { - return false; - } - } - return true; - } - }; + return rv; } /** @@ -3903,15 +3819,9 @@ public SMGetFuture>> asyncBopSortMergeGet( CollectionFuture> asyncCollectionPipedInsert( final String key, final List> insertList) { - final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue(); - final CountDownLatch latch = new CountDownLatch(insertList.size()); - - final List mergedOperationStatus = Collections - .synchronizedList(new ArrayList(insertList.size())); - - final Map mergedResult = - new ConcurrentHashMap(); + final PipedCollectionFuture rv = + new PipedCollectionFuture(latch, operationTimeout, insertList.size()); for (int i = 0; i < insertList.size(); i++) { final CollectionPipedInsert insert = insertList.get(i); @@ -3929,7 +3839,7 @@ public void receivedStatus(OperationStatus status) { getLogger().warn("Unhandled state: " + status); cstatus = new CollectionOperationStatus(status); } - mergedOperationStatus.add(cstatus); + rv.addOperationStatus(cstatus); } // complete @@ -3940,96 +3850,18 @@ public void complete() { // got status public void gotStatus(Integer index, OperationStatus status) { if (status instanceof CollectionOperationStatus) { - mergedResult.put(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT), + rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT), (CollectionOperationStatus) status); } else { - mergedResult.put(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT), + rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT), new CollectionOperationStatus(status)); } } }); addOp(key, op); - ops.add(op); + rv.addOperation(op); } - - return new CollectionFuture>( - latch, operationTimeout) { - - @Override - public boolean cancel(boolean ign) { - boolean rv = false; - for (Operation op : ops) { - op.cancel("by application."); - rv |= op.getState() == OperationState.WRITE_QUEUED; - } - return rv; - } - - @Override - public boolean isCancelled() { - for (Operation op : ops) { - if (op.isCancelled()) { - return true; - } - } - return false; - } - - @Override - public Map get(long duration, - TimeUnit units) - throws InterruptedException, TimeoutException, ExecutionException { - - if (!latch.await(duration, units)) { - Collection timedoutOps = new HashSet(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - timedoutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (timedoutOps.size() > 0) { - MemcachedConnection.opTimedOut(timedoutOps.iterator().next()); - throw new CheckedOperationTimeoutException(duration, units, timedoutOps); - } - } else { - // continuous timeout counter will be reset only once in pipe - MemcachedConnection.opSucceeded(ops.iterator().next()); - } - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); - } - - if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); - } - } - - return mergedResult; - } - - @Override - public CollectionOperationStatus getOperationStatus() { - for (CollectionOperationStatus status : mergedOperationStatus) { - if (!status.isSuccess()) { - return status; - } - } - return new CollectionOperationStatus(true, "END", CollectionResponse.END); - } - - @Override - public boolean isDone() { - for (Operation op : ops) { - if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) { - return false; - } - } - return true; - } - }; + return rv; } @Override diff --git a/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java new file mode 100644 index 000000000..32c523595 --- /dev/null +++ b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java @@ -0,0 +1,122 @@ +package net.spy.memcached.internal; + +import net.spy.memcached.MemcachedConnection; +import net.spy.memcached.ops.CollectionOperationStatus; +import net.spy.memcached.ops.Operation; +import net.spy.memcached.ops.OperationState; + +import java.util.List; +import java.util.Map; +import java.util.Collections; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.ExecutionException; + +public class PipedCollectionFuture + extends CollectionFuture> { + private final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue(); + private final List mergedOperationStatus; + + private final Map mergedResult = + new ConcurrentHashMap(); + + public PipedCollectionFuture(CountDownLatch l, long opTimeout, int opSize) { + super(l, opTimeout); + mergedOperationStatus = Collections + .synchronizedList(new ArrayList(opSize)); + } + + @Override + public boolean cancel(boolean ign) { + boolean rv = false; + for (Operation op : ops) { + op.cancel("by application."); + rv |= op.getState() == OperationState.WRITE_QUEUED; + } + return rv; + } + + @Override + public boolean isCancelled() { + for (Operation op : ops) { + if (op.isCancelled()) { + return true; + } + } + return false; + } + + @Override + public boolean isDone() { + for (Operation op : ops) { + if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) { + return false; + } + } + return true; + } + + @Override + public Map get(long duration, + TimeUnit units) + throws InterruptedException, TimeoutException, ExecutionException { + + if (!latch.await(duration, units)) { + Collection timedoutOps = new HashSet(); + for (Operation op : ops) { + if (op.getState() != OperationState.COMPLETE) { + timedoutOps.add(op); + } else { + MemcachedConnection.opSucceeded(op); + } + } + if (timedoutOps.size() > 0) { + MemcachedConnection.opTimedOut(timedoutOps.iterator().next()); + throw new CheckedOperationTimeoutException(duration, units, timedoutOps); + } + } else { + // continuous timeout counter will be reset only once in pipe + MemcachedConnection.opSucceeded(ops.iterator().next()); + } + + for (Operation op : ops) { + if (op != null && op.hasErrored()) { + throw new ExecutionException(op.getException()); + } + + if (op != null && op.isCancelled()) { + throw new ExecutionException(new RuntimeException(op.getCancelCause())); + } + } + + return mergedResult; + } + + @Override + public CollectionOperationStatus getOperationStatus() { + for (CollectionOperationStatus status : mergedOperationStatus) { + if (!status.isSuccess()) { + return status; + } + } + return mergedOperationStatus.get(0); + } + + public void addOperationStatus(CollectionOperationStatus status) { + mergedOperationStatus.add(status); + } + + public void addEachResult(K index, V status) { + mergedResult.put(index, status); + } + + public void addOperation(Operation op) { + ops.add(op); + } +}