From 4980a09741e6aff891b7840302b0b79edd7af4a2 Mon Sep 17 00:00:00 2001 From: brido4125 Date: Fri, 21 Jul 2023 15:16:43 +0900 Subject: [PATCH] INTERNAL: Refactor Future in asyncCollectionInsertBulk2 method. --- .../java/net/spy/memcached/ArcusClient.java | 94 ++----------------- .../internal/BulkOperationFuture.java | 8 +- 2 files changed, 13 insertions(+), 89 deletions(-) diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 7642e06f8..6af6f2663 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -3914,13 +3914,11 @@ public Future> asyncLopInsertBulk( private Future> asyncCollectionInsertBulk2( List> insertList) { - final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue(); - - final Map failedResult = - new ConcurrentHashMap(); - final CountDownLatch latch = new CountDownLatch(insertList.size()); + final BulkOperationFuture rv = + new BulkOperationFuture(latch, operationTimeout); + for (final CollectionBulkInsert insert : insertList) { Operation op = opFact.collectionBulkInsert( insert, new CollectionBulkInsertOperation.Callback() { @@ -3935,95 +3933,17 @@ public void complete() { public void gotStatus(String key, OperationStatus status) { if (!status.isSuccess()) { if (status instanceof CollectionOperationStatus) { - failedResult.put(key, - (CollectionOperationStatus) status); + rv.addFailedResult(key, (CollectionOperationStatus) status); } else { - failedResult.put(key, - new CollectionOperationStatus(status)); + rv.addFailedResult(key, new CollectionOperationStatus(status)); } } } }); - ops.add(op); + rv.addOperation(op); addOp(insert.getMemcachedNode(), op); } - - // return future - return new CollectionFuture>( - latch, operationTimeout) { - - @Override - public boolean cancel(boolean ign) { - boolean rv = false; - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - rv = true; - op.cancel("by application."); - } - } - return rv; - } - - @Override - public boolean isCancelled() { - for (Operation op : ops) { - if (op.isCancelled()) { - return true; - } - } - return false; - } - - @Override - public Map get(long duration, - TimeUnit units) - throws InterruptedException, TimeoutException, ExecutionException { - if (!latch.await(duration, units)) { - Collection timedoutOps = new HashSet(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - timedoutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (timedoutOps.size() > 0) { - MemcachedConnection.opsTimedOut(timedoutOps); - throw new CheckedOperationTimeoutException(duration, units, timedoutOps); - } - } else { - // continuous timeout counter will be reset - MemcachedConnection.opsSucceeded(ops); - } - - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); - } - - if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); - } - } - - return failedResult; - } - - @Override - public CollectionOperationStatus getOperationStatus() { - return null; - } - - @Override - public boolean isDone() { - for (Operation op : ops) { - if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) { - return false; - } - } - return true; - } - }; + return rv; } public CollectionGetBulkFuture>> asyncBopGetBulk( diff --git a/src/main/java/net/spy/memcached/internal/BulkOperationFuture.java b/src/main/java/net/spy/memcached/internal/BulkOperationFuture.java index 5e586172f..cc198fcc7 100644 --- a/src/main/java/net/spy/memcached/internal/BulkOperationFuture.java +++ b/src/main/java/net/spy/memcached/internal/BulkOperationFuture.java @@ -103,11 +103,15 @@ public Map get(long duration, return failedResult; } + public void addFailedResult(String key, T value) { + failedResult.put(key, value); + } + public void setOperations(Collection ops) { this.ops.addAll(ops); } - public void addFailedResult(String key, T value) { - failedResult.put(key, value); + public void addOperation(Operation op) { + ops.add(op); } }