diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 92ccbc2b5..0508ba5f6 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -43,7 +43,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.jar.JarFile; import java.util.jar.Manifest; @@ -127,6 +126,7 @@ import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.internal.SMGetFuture; import net.spy.memcached.internal.PipedCollectionFuture; +import net.spy.memcached.internal.BroadcastFuture; import net.spy.memcached.ops.BTreeFindPositionOperation; import net.spy.memcached.ops.BTreeFindPositionWithGetOperation; import net.spy.memcached.ops.BTreeGetBulkOperation; @@ -1914,97 +1914,27 @@ public OperationFuture flush(final String prefix) { @Override public OperationFuture flush(final String prefix, final int delay) { - final AtomicReference flushResult = new AtomicReference(null); - final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue(); - - final CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { - public Operation newOp(final MemcachedNode n, - final CountDownLatch latch) { + final BroadcastFuture rv + = new BroadcastFuture(getAllNodes().size(), operationTimeout, Boolean.FALSE); + broadcastOp(new BroadcastOpFactory() { + public Operation newOp(final MemcachedNode n) { Operation op = opFact.flush(prefix, delay, false, new OperationCallback() { public void receivedStatus(OperationStatus s) { - flushResult.set(s.isSuccess()); + if (s.isSuccess()) { + rv.setResult(Boolean.TRUE); + } } public void complete() { - latch.countDown(); + rv.complete(); } }); - ops.add(op); + rv.addOp(op); return op; } }); - - return new OperationFuture(blatch, flushResult, - operationTimeout) { - @Override - public boolean cancel(boolean ign) { - boolean rv = false; - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - rv = true; - op.cancel("by application."); - } - } - return rv; - } - - @Override - public boolean isCancelled() { - for (Operation op : ops) { - if (op.isCancelled()) { - return true; - } - } - return false; - } - - @Override - public Boolean get(long duration, TimeUnit units) - throws InterruptedException, TimeoutException, ExecutionException { - - if (!blatch.await(duration, units)) { - // whenever timeout occurs, continuous timeout counter will increase by 1. - Collection timedoutOps = new HashSet(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - MemcachedConnection.opTimedOut(op); - timedoutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (timedoutOps.size() > 0) { - throw new CheckedOperationTimeoutException(duration, units, timedoutOps); - } - } else { - // continuous timeout counter will be reset - MemcachedConnection.opsSucceeded(ops); - } - - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); - } - - if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); - } - } - - return flushResult.get(); - } - - @Override - public boolean isDone() { - for (Operation op : ops) { - if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) { - return false; - } - } - return true; - } - }; + return rv; } @Override diff --git a/src/main/java/net/spy/memcached/BroadcastOpFactory.java b/src/main/java/net/spy/memcached/BroadcastOpFactory.java index 32667ba05..bd09aa3a2 100644 --- a/src/main/java/net/spy/memcached/BroadcastOpFactory.java +++ b/src/main/java/net/spy/memcached/BroadcastOpFactory.java @@ -1,7 +1,5 @@ package net.spy.memcached; -import java.util.concurrent.CountDownLatch; - import net.spy.memcached.ops.Operation; /** @@ -13,5 +11,5 @@ public interface BroadcastOpFactory { * Construct a new operation for delivery to the given node. * Each operation should count the given latch down upon completion. */ - Operation newOp(MemcachedNode n, CountDownLatch latch); + Operation newOp(MemcachedNode n); } diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index 350b4f706..92867704d 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -29,13 +29,11 @@ import java.util.Collection; import java.util.ConcurrentModificationException; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -43,16 +41,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import net.spy.memcached.auth.AuthDescriptor; import net.spy.memcached.auth.AuthThreadMonitor; import net.spy.memcached.compat.SpyThread; import net.spy.memcached.internal.BulkFuture; import net.spy.memcached.internal.BulkGetFuture; -import net.spy.memcached.internal.CheckedOperationTimeoutException; import net.spy.memcached.internal.GetFuture; import net.spy.memcached.internal.OperationFuture; +import net.spy.memcached.internal.BroadcastFuture; import net.spy.memcached.internal.SingleElementInfiniteIterator; import net.spy.memcached.ops.CASOperationStatus; import net.spy.memcached.ops.CancelledOperationStatus; @@ -63,7 +60,6 @@ import net.spy.memcached.ops.Mutator; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; -import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.StatsOperation; import net.spy.memcached.ops.StoreType; @@ -370,22 +366,17 @@ protected void addOpMap(final Map opMap) { } } - protected CountDownLatch broadcastOp(final BroadcastOpFactory of) { - return broadcastOp(of, conn.getLocator().getAll(), true); + protected void broadcastOp(final BroadcastOpFactory of) { + broadcastOp(of, conn.getLocator().getAll(), true); } - CountDownLatch broadcastOp(final BroadcastOpFactory of, - Collection nodes) { - return broadcastOp(of, nodes, true); - } - - private CountDownLatch broadcastOp(BroadcastOpFactory of, + private void broadcastOp(BroadcastOpFactory of, Collection nodes, boolean checkShuttingDown) { if (checkShuttingDown && shuttingDown) { throw new IllegalStateException("Shutting down"); } - return conn.broadcastOperation(of, nodes); + conn.broadcastOperation(of, nodes); } private OperationFuture asyncStore(StoreType storeType, String key, @@ -1548,58 +1539,36 @@ public Map> getsBulk(String... keys) { * is too full to accept any more requests */ public Map getVersions() { - final Map rv = + final Map result = new ConcurrentHashMap(); - Collection nodes = getAllNodes(); - final List ops = new ArrayList(nodes.size()); + final BroadcastFuture> internalFuture + = new BroadcastFuture>( + getAllNodes().size(), operationTimeout, result); - CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { - public Operation newOp(final MemcachedNode n, - final CountDownLatch latch) { + broadcastOp(new BroadcastOpFactory() { + public Operation newOp(final MemcachedNode n) { final SocketAddress sa = n.getSocketAddress(); Operation op = opFact.version( new OperationCallback() { public void receivedStatus(OperationStatus s) { - rv.put(sa, s.getMessage()); + result.put(sa, s.getMessage()); } public void complete() { - latch.countDown(); + internalFuture.complete(); } }); - ops.add(op); + internalFuture.addOp(op); return op; } - }, nodes); + }); + Map rv = null; try { - if (!blatch.await(operationTimeout, TimeUnit.MILLISECONDS)) { - Collection timedoutOps = new HashSet(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - MemcachedConnection.opTimedOut(op); - timedoutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (timedoutOps.size() > 0) { - throw new OperationTimeoutException(operationTimeout, TimeUnit.MILLISECONDS, timedoutOps); - } - } else { - MemcachedConnection.opsSucceeded(ops); - } - - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new RuntimeException(op.getException()); - } - if (op != null && op.isCancelled()) { - throw new RuntimeException(op.getCancelCause()); - } - } - + rv = internalFuture.get(); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for versions", e); + } catch (ExecutionException e) { + throw new RuntimeException(e); } return rv; } @@ -1629,20 +1598,19 @@ public Map> getStats() { * is too full to accept any more requests */ public Map> getStats(final String arg) { - final Map> rv + final Map> resultMap = new HashMap>(); - Collection nodes = getAllNodes(); - final List ops = new ArrayList(nodes.size()); - - CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { - public Operation newOp(final MemcachedNode n, - final CountDownLatch latch) { + final BroadcastFuture>> internalFuture + = new BroadcastFuture>>( + getAllNodes().size(), operationTimeout, resultMap); + broadcastOp(new BroadcastOpFactory() { + public Operation newOp(final MemcachedNode n) { final SocketAddress sa = n.getSocketAddress(); - rv.put(sa, new HashMap()); + resultMap.put(sa, new HashMap()); Operation op = opFact.stats(arg, new StatsOperation.Callback() { public void gotStat(String name, String val) { - rv.get(sa).put(name, val); + resultMap.get(sa).put(name, val); } public void receivedStatus(OperationStatus status) { @@ -1653,42 +1621,20 @@ public void receivedStatus(OperationStatus status) { } public void complete() { - latch.countDown(); + internalFuture.complete(); } }); - ops.add(op); + internalFuture.addOp(op); return op; } - }, nodes); + }); + Map> rv = null; try { - if (!blatch.await(operationTimeout, TimeUnit.MILLISECONDS)) { - Collection timedoutOps = new HashSet(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - MemcachedConnection.opTimedOut(op); - timedoutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (timedoutOps.size() > 0) { - throw new OperationTimeoutException(operationTimeout, TimeUnit.MILLISECONDS, timedoutOps); - } - } else { - MemcachedConnection.opsSucceeded(ops); - } - - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new RuntimeException(op.getException()); - } - if (op != null && op.isCancelled()) { - throw new RuntimeException(op.getCancelCause()); - } - } - + rv = internalFuture.get(); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for stats", e); + } catch (ExecutionException e) { + throw new RuntimeException(e); } return rv; } @@ -2001,95 +1947,26 @@ public void complete() { * is too full to accept any more requests */ public Future flush(final int delay) { - final AtomicReference flushResult = - new AtomicReference(null); - final ConcurrentLinkedQueue ops = - new ConcurrentLinkedQueue(); - final CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { - public Operation newOp(final MemcachedNode n, - final CountDownLatch latch) { + final BroadcastFuture rv + = new BroadcastFuture(getAllNodes().size(), operationTimeout, Boolean.FALSE); + broadcastOp(new BroadcastOpFactory() { + public Operation newOp(final MemcachedNode n) { Operation op = opFact.flush(delay, new OperationCallback() { public void receivedStatus(OperationStatus s) { - flushResult.set(s.isSuccess()); + if (s.isSuccess()) { + rv.setResult(Boolean.TRUE); + } } public void complete() { - latch.countDown(); + rv.complete(); } }); - ops.add(op); + rv.addOp(op); return op; } }); - return new OperationFuture(blatch, flushResult, - operationTimeout) { - @Override - public boolean cancel(boolean ign) { - boolean rv = false; - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - rv = true; - op.cancel("by application."); - } - } - return rv; - } - - @Override - public boolean isCancelled() { - for (Operation op : ops) { - if (op.isCancelled()) { - return true; - } - } - return false; - } - - @Override - public Boolean get(long duration, TimeUnit units) - throws InterruptedException, TimeoutException, ExecutionException { - if (!blatch.await(duration, units)) { - // whenever timeout occurs, continuous timeout counter will increase by 1. - Collection timedoutOps = new HashSet(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - MemcachedConnection.opTimedOut(op); - timedoutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (timedoutOps.size() > 0) { - throw new CheckedOperationTimeoutException(duration, units, timedoutOps); - } - } else { - // continuous timeout counter will be reset - MemcachedConnection.opsSucceeded(ops); - } - - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); - } - - if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); - } - } - - return flushResult.get(); - } - - @Override - public boolean isDone() { - for (Operation op : ops) { - if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) { - return false; - } - } - return true; - } - }; + return rv; } /** @@ -2104,62 +1981,39 @@ public Future flush() { } public Set listSaslMechanisms() { - final ConcurrentMap rv + final ConcurrentMap resultMap = new ConcurrentHashMap(); - Collection nodes = getAllNodes(); - final List ops = new ArrayList(nodes.size()); - - CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { - public Operation newOp(MemcachedNode n, - final CountDownLatch latch) { + final BroadcastFuture> internalFuture + = new BroadcastFuture>( + getAllNodes().size(), operationTimeout, resultMap); + broadcastOp(new BroadcastOpFactory() { + public Operation newOp(MemcachedNode n) { Operation op = opFact.saslMechs(new OperationCallback() { public void receivedStatus(OperationStatus status) { for (String s : status.getMessage().split(" ")) { - rv.put(s, s); + resultMap.put(s, s); } } public void complete() { - latch.countDown(); + internalFuture.complete(); } }); - ops.add(op); + internalFuture.addOp(op); return op; } - }, nodes); + }); + Set rv = null; try { - if (!blatch.await(operationTimeout, TimeUnit.MILLISECONDS)) { - Collection timedoutOps = new HashSet(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - MemcachedConnection.opTimedOut(op); - timedoutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (timedoutOps.size() > 0) { - throw new OperationTimeoutException(operationTimeout, TimeUnit.MILLISECONDS, timedoutOps); - } - } else { - MemcachedConnection.opsSucceeded(ops); - } - - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new RuntimeException(op.getException()); - } - if (op != null && op.isCancelled()) { - throw new RuntimeException(op.getCancelCause()); - } - } - + rv = internalFuture.get().keySet(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + throw new RuntimeException(e); } - return rv.keySet(); + return rv; } private void logRunException(Exception e) { @@ -2253,9 +2107,9 @@ public boolean shutdown(long timeout, TimeUnit unit) { * is too full to accept any more requests */ public boolean waitForQueues(long timeout, TimeUnit unit) { - CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { - public Operation newOp(final MemcachedNode n, - final CountDownLatch latch) { + final CountDownLatch latch = new CountDownLatch(getAllNodes().size()); + broadcastOp(new BroadcastOpFactory() { + public Operation newOp(final MemcachedNode n) { return opFact.noop( new OperationCallback() { public void complete() { @@ -2272,7 +2126,7 @@ public void receivedStatus(OperationStatus s) { try { // XXX: Perhaps IllegalStateException should be caught here // and the check retried. - return blatch.await(timeout, unit); + return latch.await(timeout, unit); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for queues", e); } @@ -2356,7 +2210,7 @@ int getAddedQueueSize() { * * @return all memcachednode from node locator */ - Collection getAllNodes() { + protected Collection getAllNodes() { return conn.getLocator().getAll(); } } diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index e7904c1f5..5d09db6e5 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -44,7 +44,6 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -1495,24 +1494,22 @@ public void addOperations(final Map ops) { /** * Broadcast an operation to all nodes. */ - public CountDownLatch broadcastOperation(BroadcastOpFactory of) { - return broadcastOperation(of, locator.getAll()); + public void broadcastOperation(BroadcastOpFactory of) { + broadcastOperation(of, locator.getAll()); } /** * Broadcast an operation to a specific collection of nodes. */ - public CountDownLatch broadcastOperation(final BroadcastOpFactory of, + public void broadcastOperation(final BroadcastOpFactory of, Collection nodes) { - final CountDownLatch latch = new CountDownLatch(nodes.size()); for (MemcachedNode node : nodes) { - Operation op = of.newOp(node, latch); + Operation op = of.newOp(node); node.addOpToInputQ(op); addedQueue.offer(node); } Selector s = selector.wakeup(); assert s == selector : "Wakeup returned the wrong selector."; - return latch; } public void wakeUpSelector() { diff --git a/src/main/java/net/spy/memcached/internal/BroadcastFuture.java b/src/main/java/net/spy/memcached/internal/BroadcastFuture.java new file mode 100644 index 000000000..53179f640 --- /dev/null +++ b/src/main/java/net/spy/memcached/internal/BroadcastFuture.java @@ -0,0 +1,105 @@ +package net.spy.memcached.internal; + +import net.spy.memcached.MemcachedConnection; +import net.spy.memcached.OperationTimeoutException; +import net.spy.memcached.ops.Operation; +import net.spy.memcached.ops.OperationState; + +import java.util.ArrayList; +import java.util.List; +import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class BroadcastFuture extends OperationFuture { + private final List ops; + + public BroadcastFuture(int opSize, long timeout , T result) { + super(new CountDownLatch(opSize), timeout); + ops = new ArrayList(opSize); + objRef.set(result); + } + + @Override + public boolean cancel(boolean ign) { + boolean rv = false; + for (Operation op : ops) { + if (op.getState() != OperationState.COMPLETE) { + rv = true; + op.cancel("by application."); + } + } + return rv; + } + + @Override + public boolean isCancelled() { + for (Operation op : ops) { + if (op.isCancelled()) { + return true; + } + } + return false; + } + + @Override + public boolean isDone() { + for (Operation op : ops) { + if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) { + return false; + } + } + return true; + } + + @Override + public T get(long duration, TimeUnit units) + throws InterruptedException, TimeoutException, ExecutionException { + if (!latch.await(duration, units)) { + // whenever timeout occurs, continuous timeout counter will increase by 1. + Collection timedoutOps = new HashSet(); + for (Operation op : ops) { + if (op.getState() != OperationState.COMPLETE) { + MemcachedConnection.opTimedOut(op); + timedoutOps.add(op); + } else { + MemcachedConnection.opSucceeded(op); + } + } + if (timedoutOps.size() > 0) { + if (objRef.get() instanceof Boolean) { + throw new CheckedOperationTimeoutException(duration, units, timedoutOps); + } + throw new OperationTimeoutException(duration, units, timedoutOps); + } + } else { + // continuous timeout counter will be reset + MemcachedConnection.opsSucceeded(ops); + } + for (Operation op : ops) { + if (op != null && op.hasErrored()) { + throw new ExecutionException(op.getException()); + } + + if (op != null && op.isCancelled()) { + throw new ExecutionException(new RuntimeException(op.getCancelCause())); + } + } + return objRef.get(); + } + + public void setResult(T result) { + objRef.set(result); + } + + public void addOp(Operation op) { + ops.add(op); + } + + public void complete() { + latch.countDown(); + } +}