From 82814128e259508eaf9dd353c7aa36db67977726 Mon Sep 17 00:00:00 2001 From: brido4125 Date: Mon, 11 Sep 2023 12:47:30 +0900 Subject: [PATCH] ENHANCE: Refactor smget api logic and SMGetFuture --- .../java/net/spy/memcached/ArcusClient.java | 266 +++--------------- .../spy/memcached/internal/SMGetFuture.java | 146 +++++++++- 2 files changed, 178 insertions(+), 234 deletions(-) diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 458231645..a71ab7cd0 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -37,10 +37,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.jar.JarFile; @@ -120,7 +118,6 @@ import net.spy.memcached.internal.BTreeStoreAndGetFuture; import net.spy.memcached.internal.BroadcastFuture; import net.spy.memcached.internal.BulkOperationFuture; -import net.spy.memcached.internal.CheckedOperationTimeoutException; import net.spy.memcached.internal.CollectionFuture; import net.spy.memcached.internal.CollectionGetBulkFuture; import net.spy.memcached.internal.CollectionGetFuture; @@ -144,7 +141,6 @@ import net.spy.memcached.ops.Mutator; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; -import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.StoreType; import net.spy.memcached.plugin.FrontCacheMemcachedClient; @@ -2063,31 +2059,6 @@ private Collection>> groupingKeys(List return resultList; } - /** - * Get the sublist of elements from the smget result. - * - * @param mergedResult smget result (list of elements) - * @param offset start index, negative offset indicates "start from the tail" - * @param count number of elements to get - * @return list of elements - */ - private List> getSubList(final List> mergedResult, int offset, int count) { - if (offset > 0) { - if ((offset + count) < mergedResult.size()) { - return mergedResult.subList(offset, (offset + count)); - } - if (offset < mergedResult.size()) { - return mergedResult.subList(offset, mergedResult.size()); - } - return Collections.emptyList(); - } else { - if (count < mergedResult.size()) { - return mergedResult.subList(0, count); - } - return mergedResult; - } - } - /** * Generic smget operation for b+tree items. Public smget methods call this method. * @@ -2102,26 +2073,13 @@ private SMGetFuture>> smget( final List> smGetList, final int offset, final int count, final boolean reverse, final Transcoder tc) { - final String END = "END"; - final String TRIMMED = "TRIMMED"; - final String DUPLICATED = "DUPLICATED"; - final String DUPLICATED_TRIMMED = "DUPLICATED_TRIMMED"; - - final CountDownLatch blatch = new CountDownLatch(smGetList.size()); - final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue(); - final List missedKeyList - = Collections.synchronizedList(new ArrayList()); - final Map missedKeys - = Collections.synchronizedMap(new HashMap()); - final List mergedTrimmedKeys - = Collections.synchronizedList(new ArrayList()); final int totalResultElementCount = count + offset; - final List> mergedResult = new ArrayList>(totalResultElementCount); - final List resultOperationStatus - = Collections.synchronizedList(new ArrayList(1)); - final List failedOperationStatus - = Collections.synchronizedList(new ArrayList(1)); + final List> mergedResult + = new ArrayList>(totalResultElementCount); + + final SMGetFuture>> rv + = new SMGetFuture>>(mergedResult, smGetList.size(), count, operationTimeout, offset); // if processedSMGetCount is 0, then all smget is done. final AtomicInteger processedSMGetCount = new AtomicInteger(smGetList.size()); @@ -2140,16 +2098,15 @@ public void receivedStatus(OperationStatus status) { getLogger().warn("SMGetFailed. status=%s", status); if (!stopCollect.get()) { stopCollect.set(true); - failedOperationStatus.add(status); + rv.addFailedOperationStatus(status); } mergedResult.clear(); return; } + boolean isTrimmed = status.getMessage().equals("TRIMMED") || + status.getMessage().equals("DUPLICATED_TRIMMED"); - boolean isTrimmed = (TRIMMED.equals(status.getMessage()) || - DUPLICATED_TRIMMED.equals(status.getMessage())) - ? true : false; - if (mergedResult.size() == 0) { + if (mergedResult.isEmpty()) { // merged result is empty, add all. mergedResult.addAll(eachResult); mergedTrim.set(isTrimmed); @@ -2203,15 +2160,15 @@ public void receivedStatus(OperationStatus status) { } if (mergedTrim.get()) { if (isDuplicated) { - resultOperationStatus.add(new OperationStatus(true, "DUPLICATED_TRIMMED")); + rv.addResultOperationStatus(new OperationStatus(true, "DUPLICATED_TRIMMED")); } else { - resultOperationStatus.add(new OperationStatus(true, "TRIMMED")); + rv.addResultOperationStatus(new OperationStatus(true, "TRIMMED")); } } else { if (isDuplicated) { - resultOperationStatus.add(new OperationStatus(true, "DUPLICATED")); + rv.addResultOperationStatus(new OperationStatus(true, "DUPLICATED")); } else { - resultOperationStatus.add(new OperationStatus(true, "END")); + rv.addResultOperationStatus(new OperationStatus(true, "END")); } } } @@ -2219,7 +2176,7 @@ public void receivedStatus(OperationStatus status) { @Override public void complete() { - blatch.countDown(); + rv.countDownLatch(); } @Override @@ -2227,7 +2184,6 @@ public void gotData(String key, int flags, Object subkey, byte[] eflag, byte[] d if (stopCollect.get()) { return; } - if (subkey instanceof Long) { eachResult.add(new SMGetElement(key, (Long) subkey, eflag, tc.decode(new CachedData(flags, data, tc.getMaxSize())))); @@ -2239,104 +2195,31 @@ public void gotData(String key, int flags, Object subkey, byte[] eflag, byte[] d @Override public void gotMissedKey(byte[] data) { - missedKeyList.add(new String(data)); + rv.addMissedKeyToList(new String(data)); OperationStatus cause = new OperationStatus(false, "UNDEFINED"); - missedKeys.put(new String(data), new CollectionOperationStatus(cause)); + rv.addMissedKeyToMap(new String(data), new CollectionOperationStatus(cause)); } }); - ops.add(op); + rv.addOperation(op); addOp(smGet.getMemcachedNode(), op); } - - return new SMGetFuture>>(ops, operationTimeout) { - @Override - public List> get(long duration, TimeUnit units) - throws InterruptedException, TimeoutException, ExecutionException { - - if (!blatch.await(duration, units)) { - Collection timedoutOps = new HashSet(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - timedoutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (timedoutOps.size() > 0) { - MemcachedConnection.opsTimedOut(timedoutOps); - throw new CheckedOperationTimeoutException(duration, units, timedoutOps); - } - } else { - // continuous timeout counter will be reset - MemcachedConnection.opsSucceeded(ops); - } - - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); - } - - if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); - } - } - - if (smGetList.size() == 1) { - return mergedResult; - } - - return getSubList(mergedResult, offset, count); - } - - @Override - public List getMissedKeyList() { - return missedKeyList; - } - - @Override - public Map getMissedKeys() { - return missedKeys; - } - - @Override - public List getTrimmedKeys() { - return mergedTrimmedKeys; - } - - @Override - public CollectionOperationStatus getOperationStatus() { - if (!failedOperationStatus.isEmpty()) { - return new CollectionOperationStatus(failedOperationStatus.get(0)); - } - return new CollectionOperationStatus(resultOperationStatus.get(0)); - } - }; + return rv; } private SMGetFuture>> smget( final List> smGetList, final int count, final boolean reverse, final Transcoder tc, final SMGetMode smgetMode) { - final CountDownLatch blatch = new CountDownLatch(smGetList.size()); - final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue(); - final List missedKeyList - = Collections.synchronizedList(new ArrayList()); - final Map missedKeys - = Collections.synchronizedMap(new HashMap()); - final int totalResultElementCount = count; - - final List> mergedResult = new ArrayList>(totalResultElementCount); - final List mergedTrimmedKeys - = Collections.synchronizedList(new ArrayList()); + final List> mergedResult + = new ArrayList>(count); - final List resultOperationStatus - = Collections.synchronizedList(new ArrayList(1)); - final List failedOperationStatus - = Collections.synchronizedList(new ArrayList(1)); + final SMGetFuture>> rv + = new SMGetFuture>>(mergedResult, smGetList.size(), count, operationTimeout); - final AtomicBoolean stopCollect = new AtomicBoolean(false); // if processedSMGetCount is 0, then all smget is done. final AtomicInteger processedSMGetCount = new AtomicInteger(smGetList.size()); + final AtomicBoolean stopCollect = new AtomicBoolean(false); + final List mergedTrimmedKeys = rv.getTrimmedKeys(); for (BTreeSMGet smGet : smGetList) { Operation op = opFact.bopsmget(smGet, new BTreeSortMergeGetOperation.Callback() { @@ -2351,7 +2234,7 @@ public void receivedStatus(OperationStatus status) { getLogger().warn("SMGetFailed. status=%s", status); if (!stopCollect.get()) { stopCollect.set(true); - failedOperationStatus.add(status); + rv.addFailedOperationStatus(status); } mergedResult.clear(); mergedTrimmedKeys.clear(); @@ -2391,9 +2274,9 @@ public void receivedStatus(OperationStatus status) { if (duplicated) { // UNIQUE continue; } - if (pos >= totalResultElementCount) { + if (pos >= count) { // At this point, following conditions are met. - // - mergedResult.size() == totalResultElementCount && + // - mergedResult.size() == count && // - The current of eachResult is // behind of the last of mergedResult. // Then, all the next elements of eachResult are @@ -2403,15 +2286,14 @@ public void receivedStatus(OperationStatus status) { } mergedResult.add(pos, result); - if (mergedResult.size() > totalResultElementCount) { - mergedResult.remove(totalResultElementCount); + if (mergedResult.size() > count) { + mergedResult.remove(count); } pos += 1; } } - - if (eachTrimmedResult.size() > 0) { - if (mergedTrimmedKeys.size() == 0) { + if (!eachTrimmedResult.isEmpty()) { + if (mergedTrimmedKeys.isEmpty()) { mergedTrimmedKeys.addAll(eachTrimmedResult); } else { // do sort merge trimmed list @@ -2428,9 +2310,8 @@ public void receivedStatus(OperationStatus status) { } } } - if (processedSMGetCount.get() == 0) { - if (mergedTrimmedKeys.size() > 0 && count <= mergedResult.size()) { + if (!mergedTrimmedKeys.isEmpty() && count <= mergedResult.size()) { // remove trimed keys whose bkeys are behind of the last element. SMGetElement lastElement = mergedResult.get(mergedResult.size() - 1); SMGetTrimKey lastTrimKey = new SMGetTrimKey(lastElement.getKey(), @@ -2438,7 +2319,7 @@ public void receivedStatus(OperationStatus status) { for (int i = mergedTrimmedKeys.size() - 1; i >= 0; i--) { SMGetTrimKey me = mergedTrimmedKeys.get(i); if ((reverse) ? (0 >= me.compareTo(lastTrimKey)) - : (0 <= me.compareTo(lastTrimKey))) { + : (0 <= me.compareTo(lastTrimKey))) { mergedTrimmedKeys.remove(i); } else { break; @@ -2446,7 +2327,7 @@ public void receivedStatus(OperationStatus status) { } } if (smgetMode == SMGetMode.UNIQUE) { - resultOperationStatus.add(new OperationStatus(true, "END")); + rv.addResultOperationStatus(new OperationStatus(true, "END")); } else { boolean isDuplicated = false; for (int i = 1; i < mergedResult.size(); i++) { @@ -2456,9 +2337,9 @@ public void receivedStatus(OperationStatus status) { } } if (isDuplicated) { - resultOperationStatus.add(new OperationStatus(true, "DUPLICATED")); + rv.addResultOperationStatus(new OperationStatus(true, "DUPLICATED")); } else { - resultOperationStatus.add(new OperationStatus(true, "END")); + rv.addResultOperationStatus(new OperationStatus(true, "END")); } } } @@ -2466,7 +2347,7 @@ public void receivedStatus(OperationStatus status) { @Override public void complete() { - blatch.countDown(); + rv.countDownLatch(); } @Override @@ -2474,7 +2355,6 @@ public void gotData(String key, int flags, Object subkey, byte[] eflag, byte[] d if (stopCollect.get()) { return; } - if (subkey instanceof Long) { eachResult.add(new SMGetElement(key, (Long) subkey, eflag, tc.decode(new CachedData(flags, data, tc.getMaxSize())))); @@ -2486,8 +2366,8 @@ public void gotData(String key, int flags, Object subkey, byte[] eflag, byte[] d @Override public void gotMissedKey(String key, OperationStatus cause) { - missedKeyList.add(key); - missedKeys.put(key, new CollectionOperationStatus(cause)); + rv.addMissedKeyToList(key); + rv.addMissedKeyToMap(key, new CollectionOperationStatus(cause)); } @Override @@ -2495,7 +2375,6 @@ public void gotTrimmedKey(String key, Object subkey) { if (stopCollect.get()) { return; } - if (subkey instanceof Long) { eachTrimmedResult.add(new SMGetTrimKey(key, (Long) subkey)); } else if (subkey instanceof byte[]) { @@ -2503,73 +2382,10 @@ public void gotTrimmedKey(String key, Object subkey) { } } }); - ops.add(op); + rv.addOperation(op); addOp(smGet.getMemcachedNode(), op); } - - return new SMGetFuture>>(ops, operationTimeout) { - @Override - public List> get(long duration, TimeUnit units) - throws InterruptedException, TimeoutException, ExecutionException { - - if (!blatch.await(duration, units)) { - Collection timedoutOps = new HashSet(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - timedoutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (timedoutOps.size() > 0) { - MemcachedConnection.opsTimedOut(timedoutOps); - throw new CheckedOperationTimeoutException(duration, units, timedoutOps); - } - } else { - // continuous timeout counter will be reset - MemcachedConnection.opsSucceeded(ops); - } - - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); - } - - if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); - } - } - - if (smGetList.size() == 1) { - return mergedResult; - } - - return getSubList(mergedResult, 0, count); - } - - @Override - public List getMissedKeyList() { - return missedKeyList; - } - - @Override - public Map getMissedKeys() { - return missedKeys; - } - - @Override - public List getTrimmedKeys() { - return mergedTrimmedKeys; - } - - @Override - public CollectionOperationStatus getOperationStatus() { - if (!failedOperationStatus.isEmpty()) { - return new CollectionOperationStatus(failedOperationStatus.get(0)); - } - return new CollectionOperationStatus(resultOperationStatus.get(0)); - } - }; + return rv; } @Override diff --git a/src/main/java/net/spy/memcached/internal/SMGetFuture.java b/src/main/java/net/spy/memcached/internal/SMGetFuture.java index 2dd890450..f3f703d82 100644 --- a/src/main/java/net/spy/memcached/internal/SMGetFuture.java +++ b/src/main/java/net/spy/memcached/internal/SMGetFuture.java @@ -16,30 +16,60 @@ */ package net.spy.memcached.internal; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.MemcachedConnection; import net.spy.memcached.OperationTimeoutException; import net.spy.memcached.collection.SMGetTrimKey; import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; +import net.spy.memcached.ops.OperationStatus; -public abstract class SMGetFuture implements Future { - - private final Collection ops; +public class SMGetFuture> implements Future { + private final CountDownLatch latch; + private final Collection ops = new ConcurrentLinkedQueue(); + private final List mergedTrimmedKeys + = Collections.synchronizedList(new ArrayList()); + private final List resultOperationStatus + = Collections.synchronizedList(new ArrayList(1)); + private final List failedOperationStatus + = Collections.synchronizedList(new ArrayList(1)); + private final List missedKeyList + = Collections.synchronizedList(new ArrayList()); + private final Map missedKeys + = Collections.synchronizedMap(new HashMap()); + private final T mergedResult; + private final int opSize; private final long timeout; + private int offset = 0; + private final int count; - public SMGetFuture(Collection ops, long timeout) { - this.ops = ops; + public SMGetFuture(T mergedResult, int opSize, int count, long timeout) { + this.mergedResult = mergedResult; + this.latch = new CountDownLatch(opSize); + this.opSize = opSize; + this.count = count; this.timeout = timeout; } + public SMGetFuture(T mergedResult, int opSize, int count, long timeout, int offset) { + this(mergedResult, opSize, count, timeout); + this.offset = offset; + } + @Override public T get() throws InterruptedException, ExecutionException { try { @@ -49,6 +79,45 @@ public T get() throws InterruptedException, ExecutionException { } } + @Override + public T get(long duration, TimeUnit units) + throws InterruptedException, TimeoutException, ExecutionException { + if (!latch.await(duration, units)) { + Collection timedoutOps = new HashSet(); + for (Operation op : ops) { + if (op.getState() != OperationState.COMPLETE) { + timedoutOps.add(op); + } else { + MemcachedConnection.opSucceeded(op); + } + } + if (timedoutOps.size() > 0) { + MemcachedConnection.opsTimedOut(timedoutOps); + throw new CheckedOperationTimeoutException(duration, units, timedoutOps); + } + } else { + // continuous timeout counter will be reset + MemcachedConnection.opsSucceeded(ops); + } + + for (Operation op : ops) { + if (op != null && op.hasErrored()) { + throw new ExecutionException(op.getException()); + } + + if (op != null && op.isCancelled()) { + throw new ExecutionException(new RuntimeException(op.getCancelCause())); + } + } + + if (opSize == 1) { + return mergedResult; + } + + return getSubList(offset, count); + } + + @Override public boolean cancel(boolean ign) { boolean rv = false; @@ -78,11 +147,70 @@ public boolean isDone() { return true; } - public abstract Map getMissedKeys(); + /** + * Get the sublist of elements from the smget result. + * @param offset start index, negative offset indicates "start from the tail" + * @param count number of elements to get + * @return list of elements + */ + @SuppressWarnings("unchecked") + private T getSubList(int offset, int count) { + if (offset > 0) { + if ((offset + count) < mergedResult.size()) { + return (T) mergedResult.subList(offset, (offset + count)); + } + if (offset < mergedResult.size()) { + return (T) mergedResult.subList(offset, mergedResult.size()); + } + return (T) Collections.emptyList(); + } else { + if (count < mergedResult.size()) { + return (T) mergedResult.subList(0, count); + } + return mergedResult; + } + } + + public void countDownLatch() { + latch.countDown(); + } - public abstract List getMissedKeyList(); + public void addOperation(Operation op) { + ops.add(op); + } - public abstract List getTrimmedKeys(); + public void addMissedKeyToMap(String key, CollectionOperationStatus value) { + missedKeys.put(key, value); + } + + public void addMissedKeyToList(String missedKey) { + missedKeyList.add(missedKey); + } + + public Map getMissedKeys() { + return missedKeys; + } - public abstract CollectionOperationStatus getOperationStatus(); + public List getMissedKeyList() { + return missedKeyList; + } + + public List getTrimmedKeys() { + return mergedTrimmedKeys; + } + + public CollectionOperationStatus getOperationStatus() { + if (!failedOperationStatus.isEmpty()) { + return new CollectionOperationStatus(failedOperationStatus.get(0)); + } + return new CollectionOperationStatus(resultOperationStatus.get(0)); + } + + public void addFailedOperationStatus(OperationStatus operationStatus) { + failedOperationStatus.add(operationStatus); + } + + public void addResultOperationStatus(OperationStatus operationStatus) { + resultOperationStatus.add(operationStatus); + } }