Skip to content

Commit

Permalink
INTERNAL: Change handleRead logic in BTreeInsertAndGetOpImpl.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 authored and jhpark816 committed Sep 10, 2023
1 parent 41e53e8 commit b4bec20
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 79 deletions.
70 changes: 24 additions & 46 deletions src/main/java/net/spy/memcached/collection/BTreeInsertAndGet.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,7 @@ public class BTreeInsertAndGet<T> extends CollectionGet {

private final CollectionInsert<T> collection;
private final boolean updateIfExist;
private BKeyObject bkeyObject;
private int bytes;

private static final int BKEY = 0;
private static final int EFLAG_OR_BYTES = 1;
private static final int BYTES = 2;
private BKeyObject bKey;

public BTreeInsertAndGet(long bkey, byte[] eFlag, T value, boolean updateIfExist,
CollectionAttributes attributesForCreate) {
Expand All @@ -52,7 +47,9 @@ public BTreeInsertAndGet(long bkey, byte[] eFlag, T value, boolean updateIfExist
this.collection = new BTreeInsert<T>(value, eFlag, RequestMode.GET_TRIM, attributesForCreate);
}
this.updateIfExist = updateIfExist;
this.bkeyObject = new BKeyObject(bkey);
this.bKey = new BKeyObject(bkey);
this.eHeadCount = 2;
this.eFlagIndex = 1;
}

public BTreeInsertAndGet(byte[] bkey, byte[] eFlag, T value, boolean updateIfExist,
Expand All @@ -63,78 +60,59 @@ public BTreeInsertAndGet(byte[] bkey, byte[] eFlag, T value, boolean updateIfExi
this.collection = new BTreeInsert<T>(value, eFlag, RequestMode.GET_TRIM, attributesForCreate);
}
this.updateIfExist = updateIfExist;
this.bkeyObject = new BKeyObject(bkey);
this.bKey = new BKeyObject(bkey);
this.eHeadCount = 2;
this.eFlagIndex = 1;
}

@Override
public void decodeElemHeader(List<String> tokens) {
}

public void decodeItemHeader(String itemHeader) {
String[] splited = itemHeader.split(" ");
boolean hasEFlag = false;

// <bkey>
if (splited[BKEY].startsWith("0x")) {
this.bkeyObject = new BKeyObject(BTreeUtil.hexStringToByteArrays(splited[0].substring(2)));
subkey = tokens.get(0);
if (subkey.startsWith("0x")) {
bKey = new BKeyObject(BTreeUtil.hexStringToByteArrays(subkey.substring(2)));
} else {
this.bkeyObject = new BKeyObject(Long.parseLong(splited[0]));
bKey = new BKeyObject(Long.parseLong(subkey));
}

// <eflag> or <bytes>
if (splited[EFLAG_OR_BYTES].startsWith("0x")) {
// <eflag>
hasEFlag = true;
this.elementFlag = BTreeUtil
.hexStringToByteArrays(splited[EFLAG_OR_BYTES].substring(2));
if (tokens.size() == 2) {
dataLength = Integer.parseInt(tokens.get(1));
} else {
this.bytes = Integer.parseInt(splited[EFLAG_OR_BYTES]);
}

// <bytes>
if (hasEFlag) {
this.bytes = Integer.parseInt(splited[BYTES]);
elementFlag = BTreeUtil.hexStringToByteArrays(tokens.get(1));
dataLength = Integer.parseInt(tokens.get(2));
}
}

@Override
public byte[] getAddtionalArgs() {
return null;
}

@Override
public String stringify() {
return collection.stringify();
}

@Override
public String getCommand() {
return collection.getCommand();
}

public boolean headerReady(int spaceCount) {
return spaceCount == 2;
}

public T getValue() {
return collection.getValue();
}

public void setFlags(int flags) {
collection.setFlags(flags);
}

public String getElementFlagByHex() {
return collection.getElementFlagByHex();
}

public int getBytes() {
return bytes;
public BKeyObject getBkeyObject() {
return bKey;
}

public boolean isUpdateIfExist() {
return updateIfExist;
public void setFlags(int flags) {
collection.setFlags(flags);
}

public BKeyObject getBkeyObject() {
return bkeyObject;
public boolean isUpdateIfExist() {
return updateIfExist;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import java.io.ByteArrayOutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import net.spy.memcached.KeyUtil;
import net.spy.memcached.collection.BTreeGetByPosition;
import net.spy.memcached.collection.BTreeInsertAndGet;
import net.spy.memcached.collection.CollectionResponse;
import net.spy.memcached.ops.APIType;
Expand Down Expand Up @@ -89,15 +90,17 @@ public class BTreeInsertAndGetOperationImpl extends OperationImpl implements
protected byte[] data = null;
protected int readOffset = 0;
protected byte lookingFor = '\0';
protected int spaceCount = 0;

private Boolean hasEFlag = null;
protected final List<String> tokens = new ArrayList<String>();
protected int eHeadCount;
protected int eFlagIndex;

public BTreeInsertAndGetOperationImpl(String key, BTreeInsertAndGet<?> get,
byte[] dataToStore, OperationCallback cb) {
super(cb);
this.key = key;
this.get = get;
this.eHeadCount = get.getEHeadCount();
this.eFlagIndex = get.getEFlagIndex();
this.dataToStore = dataToStore;
if (get.isUpdateIfExist()) {
setAPIType(APIType.BOP_UPSERT);
Expand Down Expand Up @@ -165,53 +168,40 @@ public void handleLine(String line) {
public void handleRead(ByteBuffer bb) {
// Decode a data header.
if (lookingFor == '\0' && data == null) {
for (int i = 0; bb.remaining() > 0; i++) {
while (bb.hasRemaining()) {
byte b = bb.get();
// Handle spaces to parse the header.
if (b == ' ') {
// One-time check to find if this responses have eflags.
if (hasEFlag == null && spaceCount == BTreeGetByPosition.HEADER_EFLAG_POSITION + 1) {
String[] chunk = new String(byteBuffer.toByteArray())
.split(" ");
if (chunk[BTreeGetByPosition.HEADER_EFLAG_POSITION].startsWith("0x")) {
hasEFlag = true;
} else {
hasEFlag = false;
// btree: <bkey> [<eflag>] <bytes> <data>\r\n
tokens.add(byteBuffer.toString());
byteBuffer.reset();

if (eFlagIndex >= 0) {
if (tokens.size() == eFlagIndex + 1 && tokens.get(eFlagIndex).startsWith("0x")) {
eHeadCount++;
}
}

spaceCount++;

// Parse the value header.
// FIXME this is not cool... please fix this :-(
int spaceReduced = (hasEFlag != null && hasEFlag) ? 1 : 0;
if (get.headerReady(spaceCount - spaceReduced)) {
// <bkey> [<eflag>] <bytes> <data>\r\n
get.decodeItemHeader(new String(byteBuffer.toByteArray()));
data = new byte[get.getBytes()];
byteBuffer.reset();
spaceCount = 0;
hasEFlag = null;
if (tokens.size() == eHeadCount) {
get.decodeElemHeader(tokens);
data = new byte[get.getDataLength()];
tokens.clear();
eHeadCount = get.getEHeadCount();
break;
}
continue;
}

// Ready to finish.
if (b == '\r') {
if (b == '\r') { // Ready to finish.
continue;
}

// Finish the operation.
if (b == '\n') {
if (b == '\n') { // Finish the operation.
OperationStatus status = matchStatus(byteBuffer.toString(), STORE_AND_GET_ON_DATA);

getLogger().debug("Get complete!");
getCallback().receivedStatus(status);
transitionState(OperationState.COMPLETE);
data = null;
break;
}

// Write to the result ByteBuffer
byteBuffer.write(b);
}
Expand Down

0 comments on commit b4bec20

Please sign in to comment.