Skip to content

Commit

Permalink
INTERNAL: Creating piped collection operation future.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Jun 27, 2023
1 parent 7cdb935 commit fbefe74
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 183 deletions.
198 changes: 15 additions & 183 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import net.spy.memcached.internal.CollectionGetBulkFuture;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.SMGetFuture;
import net.spy.memcached.internal.PipedCollectionFuture;
import net.spy.memcached.ops.BTreeFindPositionOperation;
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
import net.spy.memcached.ops.BTreeGetBulkOperation;
Expand Down Expand Up @@ -948,15 +949,9 @@ public void gotStatus(Integer index, OperationStatus status) {
<T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPipedUpdate(
final String key, final List<CollectionPipedUpdate<T>> updateList) {

final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();

final CountDownLatch latch = new CountDownLatch(updateList.size());

final List<CollectionOperationStatus> mergedOperationStatus = Collections
.synchronizedList(new ArrayList<CollectionOperationStatus>(updateList.size()));

final Map<Integer, CollectionOperationStatus> mergedResult =
new ConcurrentHashMap<Integer, CollectionOperationStatus>();
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout, updateList.size());

for (int i = 0; i < updateList.size(); i++) {
final CollectionPipedUpdate<T> update = updateList.get(i);
Expand All @@ -974,7 +969,7 @@ public void receivedStatus(OperationStatus status) {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
mergedOperationStatus.add(cstatus);
rv.addOperationStatus(cstatus);
}

// complete
Expand All @@ -985,97 +980,18 @@ public void complete() {
// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
mergedResult.put(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
mergedResult.put(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
});
addOp(key, op);
ops.add(op);
rv.addOperation(op);
}

return new CollectionFuture<Map<Integer, CollectionOperationStatus>>(
latch, operationTimeout) {

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
op.cancel("by application.");
rv |= op.getState() == OperationState.WRITE_QUEUED;
}
return rv;
}

@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
}

@Override
public Map<Integer, CollectionOperationStatus> get(long duration,
TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

if (!latch.await(duration, units)) {
Collection<Operation> timedoutOps = new HashSet<Operation>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
timedoutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (timedoutOps.size() > 0) {
MemcachedConnection.opTimedOut(timedoutOps.iterator().next());
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
}
} else {
// continuous timeout counter will be reset only once in pipe
MemcachedConnection.opSucceeded(ops.iterator().next());
}

for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
}

return mergedResult;
}

@Override
public CollectionOperationStatus getOperationStatus() {
for (CollectionOperationStatus status : mergedOperationStatus) {
if (!status.isSuccess()) {
return status;
}
}
return new CollectionOperationStatus(true, "END", CollectionResponse.END);
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
}
return true;
}
};
return rv;
}

/**
Expand Down Expand Up @@ -3903,15 +3819,9 @@ public SMGetFuture<List<SMGetElement<Object>>> asyncBopSortMergeGet(
<T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPipedInsert(
final String key, final List<CollectionPipedInsert<T>> insertList) {

final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();

final CountDownLatch latch = new CountDownLatch(insertList.size());

final List<CollectionOperationStatus> mergedOperationStatus = Collections
.synchronizedList(new ArrayList<CollectionOperationStatus>(insertList.size()));

final Map<Integer, CollectionOperationStatus> mergedResult =
new ConcurrentHashMap<Integer, CollectionOperationStatus>();
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout, insertList.size());

for (int i = 0; i < insertList.size(); i++) {
final CollectionPipedInsert<T> insert = insertList.get(i);
Expand All @@ -3929,7 +3839,7 @@ public void receivedStatus(OperationStatus status) {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
mergedOperationStatus.add(cstatus);
rv.addOperationStatus(cstatus);
}

// complete
Expand All @@ -3940,96 +3850,18 @@ public void complete() {
// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
mergedResult.put(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
mergedResult.put(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
});
addOp(key, op);
ops.add(op);
rv.addOperation(op);
}

return new CollectionFuture<Map<Integer, CollectionOperationStatus>>(
latch, operationTimeout) {

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
op.cancel("by application.");
rv |= op.getState() == OperationState.WRITE_QUEUED;
}
return rv;
}

@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
}

@Override
public Map<Integer, CollectionOperationStatus> get(long duration,
TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

if (!latch.await(duration, units)) {
Collection<Operation> timedoutOps = new HashSet<Operation>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
timedoutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (timedoutOps.size() > 0) {
MemcachedConnection.opTimedOut(timedoutOps.iterator().next());
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
}
} else {
// continuous timeout counter will be reset only once in pipe
MemcachedConnection.opSucceeded(ops.iterator().next());
}
for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
}

return mergedResult;
}

@Override
public CollectionOperationStatus getOperationStatus() {
for (CollectionOperationStatus status : mergedOperationStatus) {
if (!status.isSuccess()) {
return status;
}
}
return new CollectionOperationStatus(true, "END", CollectionResponse.END);
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
}
return true;
}
};
return rv;
}

@Override
Expand Down
122 changes: 122 additions & 0 deletions src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package net.spy.memcached.internal;

import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationState;

import java.util.List;
import java.util.Map;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutionException;

public class PipedCollectionFuture<K, V>
extends CollectionFuture<Map<K, V>> {
private final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();
private final List<CollectionOperationStatus> mergedOperationStatus;

private final Map<K, V> mergedResult =
new ConcurrentHashMap<K, V>();

public PipedCollectionFuture(CountDownLatch l, long opTimeout, int opSize) {
super(l, opTimeout);
mergedOperationStatus = Collections
.synchronizedList(new ArrayList<CollectionOperationStatus>(opSize));
}

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
op.cancel("by application.");
rv |= op.getState() == OperationState.WRITE_QUEUED;
}
return rv;
}

@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
}
return true;
}

@Override
public Map<K, V> get(long duration,
TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

if (!latch.await(duration, units)) {
Collection<Operation> timedoutOps = new HashSet<Operation>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
timedoutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (timedoutOps.size() > 0) {
MemcachedConnection.opTimedOut(timedoutOps.iterator().next());
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
}
} else {
// continuous timeout counter will be reset only once in pipe
MemcachedConnection.opSucceeded(ops.iterator().next());
}

for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
}

return mergedResult;
}

@Override
public CollectionOperationStatus getOperationStatus() {
for (CollectionOperationStatus status : mergedOperationStatus) {
if (!status.isSuccess()) {
return status;
}
}
return mergedOperationStatus.get(0);
}

public void addOperationStatus(CollectionOperationStatus status) {
mergedOperationStatus.add(status);
}

public void addEachResult(K index, V status) {
mergedResult.put(index, status);
}

public void addOperation(Operation op) {
ops.add(op);
}
}

0 comments on commit fbefe74

Please sign in to comment.