Skip to content

Commit

Permalink
INTERNAL: Creating piped collection operation future.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Jun 23, 2023
1 parent be1b2b4 commit 337b587
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 197 deletions.
202 changes: 19 additions & 183 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import net.spy.memcached.internal.CollectionGetBulkFuture;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.SMGetFuture;
import net.spy.memcached.internal.PipedCollectionFuture;
import net.spy.memcached.ops.BTreeFindPositionOperation;
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
import net.spy.memcached.ops.BTreeGetBulkOperation;
Expand Down Expand Up @@ -948,15 +949,10 @@ public void gotStatus(Integer index, OperationStatus status) {
<T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPipedUpdate(
final String key, final List<CollectionPipedUpdate<T>> updateList) {

final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();

final CountDownLatch latch = new CountDownLatch(updateList.size());

final List<CollectionOperationStatus> mergedOperationStatus = Collections
.synchronizedList(new ArrayList<CollectionOperationStatus>(updateList.size()));

final Map<Integer, CollectionOperationStatus> mergedResult =
new ConcurrentHashMap<Integer, CollectionOperationStatus>();
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout, updateList.size());
Collection<Operation> ops = new ConcurrentLinkedQueue<Operation>();

for (int i = 0; i < updateList.size(); i++) {
final CollectionPipedUpdate<T> update = updateList.get(i);
Expand All @@ -974,7 +970,7 @@ public void receivedStatus(OperationStatus status) {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
mergedOperationStatus.add(cstatus);
rv.addOperationStatus(cstatus);
}

// complete
Expand All @@ -985,97 +981,19 @@ public void complete() {
// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
mergedResult.put(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
mergedResult.put(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
});
addOp(key, op);
ops.add(op);
}

return new CollectionFuture<Map<Integer, CollectionOperationStatus>>(
latch, operationTimeout) {

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
op.cancel("by application.");
rv |= op.getState() == OperationState.WRITE_QUEUED;
}
return rv;
}

@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
}

@Override
public Map<Integer, CollectionOperationStatus> get(long duration,
TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

if (!latch.await(duration, units)) {
Collection<Operation> timedoutOps = new HashSet<Operation>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
timedoutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (timedoutOps.size() > 0) {
MemcachedConnection.opTimedOut(timedoutOps.iterator().next());
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
}
} else {
// continuous timeout counter will be reset only once in pipe
MemcachedConnection.opSucceeded(ops.iterator().next());
}

for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
}

return mergedResult;
}

@Override
public CollectionOperationStatus getOperationStatus() {
for (CollectionOperationStatus status : mergedOperationStatus) {
if (!status.isSuccess()) {
return status;
}
}
return new CollectionOperationStatus(true, "END", CollectionResponse.END);
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
}
return true;
}
};
rv.setOps(ops);
addOps(key, ops);
return rv;
}

/**
Expand Down Expand Up @@ -3903,15 +3821,10 @@ public SMGetFuture<List<SMGetElement<Object>>> asyncBopSortMergeGet(
<T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPipedInsert(
final String key, final List<CollectionPipedInsert<T>> insertList) {

final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();

final CountDownLatch latch = new CountDownLatch(insertList.size());

final List<CollectionOperationStatus> mergedOperationStatus = Collections
.synchronizedList(new ArrayList<CollectionOperationStatus>(insertList.size()));

final Map<Integer, CollectionOperationStatus> mergedResult =
new ConcurrentHashMap<Integer, CollectionOperationStatus>();
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout, insertList.size());
Collection<Operation> ops = new ConcurrentLinkedQueue<Operation>();

for (int i = 0; i < insertList.size(); i++) {
final CollectionPipedInsert<T> insert = insertList.get(i);
Expand All @@ -3929,7 +3842,7 @@ public void receivedStatus(OperationStatus status) {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
mergedOperationStatus.add(cstatus);
rv.addOperationStatus(cstatus);
}

// complete
Expand All @@ -3940,96 +3853,19 @@ public void complete() {
// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
mergedResult.put(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
mergedResult.put(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
});
addOp(key, op);
ops.add(op);
}

return new CollectionFuture<Map<Integer, CollectionOperationStatus>>(
latch, operationTimeout) {

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
op.cancel("by application.");
rv |= op.getState() == OperationState.WRITE_QUEUED;
}
return rv;
}

@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
}

@Override
public Map<Integer, CollectionOperationStatus> get(long duration,
TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

if (!latch.await(duration, units)) {
Collection<Operation> timedoutOps = new HashSet<Operation>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
timedoutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (timedoutOps.size() > 0) {
MemcachedConnection.opTimedOut(timedoutOps.iterator().next());
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
}
} else {
// continuous timeout counter will be reset only once in pipe
MemcachedConnection.opSucceeded(ops.iterator().next());
}
for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
}

return mergedResult;
}

@Override
public CollectionOperationStatus getOperationStatus() {
for (CollectionOperationStatus status : mergedOperationStatus) {
if (!status.isSuccess()) {
return status;
}
}
return new CollectionOperationStatus(true, "END", CollectionResponse.END);
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
}
return true;
}
};
rv.setOps(ops);
addOps(key, ops);
return rv;
}

@Override
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,12 @@ protected Operation addOp(final MemcachedNode node, final Operation op) {
return op;
}

protected void addOps(final String key, final Collection<Operation> ops) {
validateKey(key);
checkState();
conn.addOperations(key, ops);
}

protected void addOpMap(final Map<String, Operation> opMap) {
checkState();
for (Map.Entry<String, Operation> me : opMap.entrySet()) {
Expand Down
33 changes: 19 additions & 14 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -1464,14 +1464,7 @@ public void insertOperation(final MemcachedNode node, final Operation o) {
}

public void addOperation(final MemcachedNode node, final Operation o) {
if (node == null) {
o.cancel("no node");
return;
}
if ((!node.isActive() && !node.isFirstConnecting()) &&
failureMode == FailureMode.Cancel) {
o.setHandlingNode(node);
o.cancel("inactive node");
if (checkNodeState(node, o)) {
return;
}
node.addOpToInputQ(o);
Expand All @@ -1481,17 +1474,29 @@ public void addOperation(final MemcachedNode node, final Operation o) {
getLogger().debug("Added %s to %s", o, node);
}

public void addOperations(final Map<MemcachedNode, Operation> ops) {
for (Map.Entry<MemcachedNode, Operation> me : ops.entrySet()) {
final MemcachedNode node = me.getKey();
Operation o = me.getValue();
node.addOpToInputQ(o);
addedQueue.offer(node);
public void addOperations(String key, Collection<Operation> ops) {
MemcachedNode findNode = findNodeByKey(key);
for (Operation op : ops) {
addOperation(findNode, op);
}
Selector s = selector.wakeup();
assert s == selector : "Wakeup returned the wrong selector.";
}

private boolean checkNodeState(MemcachedNode findNode, Operation op) {
if (findNode == null) {
op.cancel("no node");
return true;
}
if ((!findNode.isActive() && !findNode.isFirstConnecting()) &&
failureMode == FailureMode.Cancel) {
op.setHandlingNode(findNode);
op.cancel("inactive node");
return true;
}
return false;
}

/**
* Broadcast an operation to all nodes.
*/
Expand Down
Loading

0 comments on commit 337b587

Please sign in to comment.