diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index eb84e3c25..8af4f7eed 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -31,7 +31,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.LinkedHashMap; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; @@ -3083,13 +3082,15 @@ private CollectionFuture>> asyncBopGetByPosition( } final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture>> rv = - new CollectionFuture>>(latch, operationTimeout); + final CollectionGetFuture>> rv = + new CollectionGetFuture>>(latch, operationTimeout); Operation op = opFact.bopGetByPosition(k, get, new BTreeGetByPositionOperation.Callback() { - private final TreeMap> map = new TreeMap>( - (reverse) ? Collections.reverseOrder() : null); + private final TreeMap> result = + new TreeMap>((reverse) ? Collections.reverseOrder() : null); + private final HashMap> cachedDataMap + = new HashMap>(); public void receivedStatus(OperationStatus status) { CollectionOperationStatus cstatus; @@ -3100,7 +3101,7 @@ public void receivedStatus(OperationStatus status) { cstatus = new CollectionOperationStatus(status); } if (cstatus.isSuccess()) { - rv.set(map, cstatus); + rv.set(result, cstatus); return; } switch (cstatus.getResponse()) { @@ -3109,7 +3110,7 @@ public void receivedStatus(OperationStatus status) { getLogger().debug("Key(%s) not found : %s", k, cstatus); break; case NOT_FOUND_ELEMENT: - rv.set(map, cstatus); + rv.set(result, cstatus); getLogger().debug("Element(%s) not found : %s", k, cstatus); break; case UNREADABLE: @@ -3125,17 +3126,24 @@ public void receivedStatus(OperationStatus status) { } } + @Override public void complete() { latch.countDown(); } - public void gotData(String key, int flags, int pos, BKeyObject bkeyObject, byte[] eflag, - byte[] data) { - assert key.equals(k) : "Wrong key returned"; - Element element = makeBTreeElement(key, flags, bkeyObject, eflag, data, tc); + @Override + public void gotData(int pos, int flags, BKeyObject bkeyObject, byte[] eflag, byte[] data) { + CachedData cachedData = new CachedData(flags, data, eflag, tc.getMaxSize()); + cachedDataMap.put(pos, new AbstractMap.SimpleEntry(bkeyObject, cachedData)); + } - if (element != null) { - map.put(pos, element); + @Override + public void addResult() { + if (result.isEmpty() && !cachedDataMap.isEmpty()) { + for (Entry> entry : cachedDataMap.entrySet()) { + Entry cachedDataEntry = entry.getValue(); + result.put(entry.getKey(), makeBTreeElement(cachedDataEntry.getKey(), cachedDataEntry.getValue(), tc)); + } } } }); @@ -3290,9 +3298,8 @@ private CollectionFuture>> asyncBopFindPositionWithG private final TreeMap> result = new TreeMap>(); - private final LinkedHashMap cachedDataMap - = new LinkedHashMap(); - private int startPosition; + private final HashMap> cachedDataMap + = new HashMap>(); public void receivedStatus(OperationStatus status) { CollectionOperationStatus cstatus; @@ -3338,21 +3345,17 @@ public void complete() { } @Override - public void getStartPosition(int start) { - startPosition = start; - } - - public void gotData(int flags, BKeyObject bkeyObject, byte[] eflag, byte[] data) { + public void gotData(int pos, int flags, BKeyObject bkeyObject, byte[] eflag, byte[] data) { CachedData cachedData = new CachedData(flags, data, eflag, tc.getMaxSize()); - cachedDataMap.put(bkeyObject, cachedData); + cachedDataMap.put(pos, new AbstractMap.SimpleEntry(bkeyObject, cachedData)); } @Override public void addResult() { if (result.isEmpty() && !cachedDataMap.isEmpty()) { - for (Map.Entry entry : cachedDataMap.entrySet()) { - result.put(startPosition, makeBTreeElement(entry.getKey(), entry.getValue(), tc)); - startPosition++; + for (Map.Entry> entry : cachedDataMap.entrySet()) { + Entry cachedDataEntry = entry.getValue(); + result.put(entry.getKey(), makeBTreeElement(cachedDataEntry.getKey(), cachedDataEntry.getValue(), tc)); } } } @@ -3488,9 +3491,8 @@ public void complete() { latch.countDown(); } - public void gotData(String key, int flags, BKeyObject bkeyObject, - byte[] eflag, byte[] data) { - assert key.equals(k) : "Wrong key returned"; + @Override + public void gotData(int flags, BKeyObject bkeyObject, byte[] eflag, byte[] data) { cachedData = new CachedData(flags, data, eflag, tc.getMaxSize()); } @@ -3507,37 +3509,6 @@ public void addResult() { return rv; } - /** - * Utility method to create a b+tree element from individual parameters. - * - * @param key b+tree item's key - * @param flags item flags, used when creating the item - * @param bkey element key - * @param eflag element flags - * @param data element data - * @param tc transcoder to serialize and unserialize value - * @return element object containing all the parameters and transcoded value - */ - private Element makeBTreeElement(String key, int flags, - BKeyObject bkey, byte[] eflag, byte[] data, - Transcoder tc) { - Element element = null; - T value = tc.decode(new CachedData(flags, data, tc.getMaxSize())); - - switch (bkey.getType()) { - case LONG: - element = new Element(bkey.getLongBKey(), value, eflag); - break; - case BYTEARRAY: - element = new Element(bkey.getByteArrayBKeyRaw(), value, eflag); - break; - default: - getLogger().error("Unexpected bkey type : (key:" + key + ", bkey:" - + bkey.toString() + ")"); - } - - return element; - } private Element makeBTreeElement(BKeyObject bkey, CachedData cachedData, Transcoder tc) { Element element = null; T value = tc.decode(cachedData); @@ -3553,7 +3524,6 @@ private Element makeBTreeElement(BKeyObject bkey, CachedData cachedData, getLogger().error("Unexpected bkey type : (bkey:" + bkey.toString() + ")"); } - return element; } diff --git a/src/main/java/net/spy/memcached/internal/BTreeStoreAndGetFuture.java b/src/main/java/net/spy/memcached/internal/BTreeStoreAndGetFuture.java index d7694b8d3..85c2acd12 100644 --- a/src/main/java/net/spy/memcached/internal/BTreeStoreAndGetFuture.java +++ b/src/main/java/net/spy/memcached/internal/BTreeStoreAndGetFuture.java @@ -41,16 +41,8 @@ public BTreeStoreAndGetFuture(CountDownLatch l, AtomicReference oref, long op } public Element getElement() { - Boolean result; - try { - result = (Boolean) super.get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - if (result) { - CollectionGetOpCallback callback = (CollectionGetOpCallback) op.getCallback(); - callback.addResult(); - } + CollectionGetOpCallback callback = (CollectionGetOpCallback) op.getCallback(); + callback.addResult(); return element; } diff --git a/src/main/java/net/spy/memcached/ops/BTreeFindPositionWithGetOperation.java b/src/main/java/net/spy/memcached/ops/BTreeFindPositionWithGetOperation.java index a4d1aafc5..fc30780e1 100644 --- a/src/main/java/net/spy/memcached/ops/BTreeFindPositionWithGetOperation.java +++ b/src/main/java/net/spy/memcached/ops/BTreeFindPositionWithGetOperation.java @@ -24,8 +24,6 @@ public interface BTreeFindPositionWithGetOperation extends KeyedOperation { BTreeFindPositionWithGet getGet(); interface Callback extends CollectionGetOpCallback { - void gotData(int flags, BKeyObject bkey, byte[] eflag, byte[] data); - void getStartPosition(int startPosition); + void gotData(int pos, int flags, BKeyObject bkey, byte[] eflag, byte[] data); } - } diff --git a/src/main/java/net/spy/memcached/ops/BTreeGetByPositionOperation.java b/src/main/java/net/spy/memcached/ops/BTreeGetByPositionOperation.java index 3208d405b..b236e05de 100644 --- a/src/main/java/net/spy/memcached/ops/BTreeGetByPositionOperation.java +++ b/src/main/java/net/spy/memcached/ops/BTreeGetByPositionOperation.java @@ -23,8 +23,7 @@ public interface BTreeGetByPositionOperation extends KeyedOperation { BTreeGetByPosition getGet(); - interface Callback extends OperationCallback { - void gotData(String key, int flags, int pos, BKeyObject bkey, byte[] eflag, byte[] data); + interface Callback extends CollectionGetOpCallback { + void gotData(int pos, int flags, BKeyObject bkey, byte[] eflag, byte[] data); } - } diff --git a/src/main/java/net/spy/memcached/ops/BTreeInsertAndGetOperation.java b/src/main/java/net/spy/memcached/ops/BTreeInsertAndGetOperation.java index b49e54640..83a87e893 100644 --- a/src/main/java/net/spy/memcached/ops/BTreeInsertAndGetOperation.java +++ b/src/main/java/net/spy/memcached/ops/BTreeInsertAndGetOperation.java @@ -24,7 +24,7 @@ public interface BTreeInsertAndGetOperation extends KeyedOperation { BTreeInsertAndGet getGet(); interface Callback extends CollectionGetOpCallback { - void gotData(String key, int flags, BKeyObject bkeyObject, byte[] elementFlag, byte[] data); + void gotData(int flags, BKeyObject bkeyObject, byte[] elementFlag, byte[] data); } } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/BTreeFindPositionWithGetOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/BTreeFindPositionWithGetOperationImpl.java index ff30f7f68..35aeb0003 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/BTreeFindPositionWithGetOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/BTreeFindPositionWithGetOperationImpl.java @@ -62,6 +62,7 @@ public class BTreeFindPositionWithGetOperationImpl extends OperationImpl impleme protected int count = 0; protected int index = 0; protected int pos = 0; + protected int posDiff = 0; protected byte[] data = null; protected int readOffset = 0; protected byte lookingFor = '\0'; @@ -112,9 +113,8 @@ public void handleLine(String line) { assert count > 0; // position counter - BTreeFindPositionWithGetOperation.Callback cb = - (BTreeFindPositionWithGetOperation.Callback) getCallback(); - cb.getStartPosition(position - index); + pos = position - index; + posDiff = 1; // start to read actual data setReadType(OperationReadType.DATA); @@ -201,8 +201,10 @@ public void handleRead(ByteBuffer bb) { // put an element data. BTreeFindPositionWithGetOperation.Callback cb = (BTreeFindPositionWithGetOperation.Callback) getCallback(); - cb.gotData(flags, get.getBkey(), get.getEflag(), data); + cb.gotData(pos, flags, get.getBkey(), get.getEflag(), data); + //next position. + pos += posDiff; lookingFor = '\r'; } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/BTreeGetByPositionOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/BTreeGetByPositionOperationImpl.java index 0a19dce99..5e318b743 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/BTreeGetByPositionOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/BTreeGetByPositionOperationImpl.java @@ -208,7 +208,7 @@ public void handleRead(ByteBuffer bb) { // put an element data. BTreeGetByPositionOperation.Callback cb = (BTreeGetByPositionOperation.Callback) getCallback(); - cb.gotData(key, flags, pos, get.getBkey(), get.getEflag(), data); + cb.gotData(pos, flags, get.getBkey(), get.getEflag(), data); // next position. pos += posDiff; diff --git a/src/main/java/net/spy/memcached/protocol/ascii/BTreeInsertAndGetOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/BTreeInsertAndGetOperationImpl.java index 5dcee4c37..3aae178d6 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/BTreeInsertAndGetOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/BTreeInsertAndGetOperationImpl.java @@ -246,7 +246,7 @@ public void handleRead(ByteBuffer bb) { if (lookingFor == '\0' && readOffset == data.length) { // put an element data. BTreeInsertAndGetOperation.Callback cb = (BTreeInsertAndGetOperation.Callback) getCallback(); - cb.gotData(key, flags, get.getBkeyObject(), get.getElementFlag(), data); + cb.gotData(flags, get.getBkeyObject(), get.getElementFlag(), data); lookingFor = '\r'; } diff --git a/src/test/manual/net/spy/memcached/MultibyteKeyTest.java b/src/test/manual/net/spy/memcached/MultibyteKeyTest.java index c4dbf1d12..d6117c5b0 100644 --- a/src/test/manual/net/spy/memcached/MultibyteKeyTest.java +++ b/src/test/manual/net/spy/memcached/MultibyteKeyTest.java @@ -553,8 +553,11 @@ public void BTreeGetByPositionOperationImplTest() { new BTreeGetByPosition(BTreeOrder.ASC, 0), new BTreeGetByPositionOperation.Callback() { @Override - public void gotData(String key, int flags, int pos, BKeyObject bkey, - byte[] eflag, byte[] data) { + public void addResult() { + } + + @Override + public void gotData(int pos, int flags, BKeyObject bkey, byte[] eflag, byte[] data) { } @Override @@ -667,8 +670,7 @@ public void BTreeInsertAndGetOperationImplTest() { new Random().nextInt(), new CollectionAttributes()), testData, new BTreeInsertAndGetOperation.Callback() { @Override - public void gotData(String key, int flags, BKeyObject bkeyObject, - byte[] elementFlag, byte[] data) { + public void gotData(int flags, BKeyObject bkeyObject, byte[] elementFlag, byte[] data) { } @Override @@ -682,8 +684,7 @@ public void complete() { @Override public void addResult() { } - - }).initialize(); + }).initialize(); } catch (java.nio.BufferOverflowException e) { Assert.fail(); }