Skip to content

Commit

Permalink
INTERNAL: Refactor Future in asyncCollectionInsertBulk2 method.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Jul 21, 2023
1 parent ba61112 commit e6a3e97
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 87 deletions.
94 changes: 7 additions & 87 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -4015,13 +4015,11 @@ public <T> Future<Map<String, CollectionOperationStatus>> asyncLopInsertBulk(
private <T> Future<Map<String, CollectionOperationStatus>> asyncCollectionInsertBulk2(
List<CollectionBulkInsert<T>> insertList) {

final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();

final Map<String, CollectionOperationStatus> failedResult =
new ConcurrentHashMap<String, CollectionOperationStatus>();

final CountDownLatch latch = new CountDownLatch(insertList.size());

final BulkOperationFuture<CollectionOperationStatus> rv =
new BulkOperationFuture<CollectionOperationStatus>(latch, operationTimeout);

for (final CollectionBulkInsert<T> insert : insertList) {
Operation op = opFact.collectionBulkInsert(
insert, new CollectionBulkInsertOperation.Callback() {
Expand All @@ -4036,95 +4034,17 @@ public void complete() {
public void gotStatus(String key, OperationStatus status) {
if (!status.isSuccess()) {
if (status instanceof CollectionOperationStatus) {
failedResult.put(key,
(CollectionOperationStatus) status);
rv.addFailedResult(key, (CollectionOperationStatus) status);
} else {
failedResult.put(key,
new CollectionOperationStatus(status));
rv.addFailedResult(key, new CollectionOperationStatus(status));
}
}
}
});
ops.add(op);
rv.addOperation(op);
addOp(insert.getMemcachedNode(), op);
}

// return future
return new CollectionFuture<Map<String, CollectionOperationStatus>>(
latch, operationTimeout) {

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
rv = true;
op.cancel("by application.");
}
}
return rv;
}

@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
}

@Override
public Map<String, CollectionOperationStatus> get(long duration,
TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {
if (!latch.await(duration, units)) {
Collection<Operation> timedoutOps = new HashSet<Operation>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
timedoutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (timedoutOps.size() > 0) {
MemcachedConnection.opsTimedOut(timedoutOps);
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
}
} else {
// continuous timeout counter will be reset
MemcachedConnection.opsSucceeded(ops);
}

for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
}

return failedResult;
}

@Override
public CollectionOperationStatus getOperationStatus() {
return null;
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
}
return true;
}
};
return rv;
}

public CollectionGetBulkFuture<Map<String, BTreeGetResult<Long, Object>>> asyncBopGetBulk(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,8 @@ public void setOperations(Collection<Operation> ops) {
public void addFailedResult(String key, T value) {
failedResult.put(key, value);
}

public void addOperation(Operation op) {
ops.add(op);
}
}

0 comments on commit e6a3e97

Please sign in to comment.