Skip to content

Commit

Permalink
INTERNAL: use groupingKeys in asyncGet(s)Bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
oliviarla committed Aug 28, 2024
1 parent fb565bd commit dcdf622
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 111 deletions.
32 changes: 0 additions & 32 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1986,38 +1986,6 @@ public SMGetFuture<List<SMGetElement<Object>>> asyncBopSortMergeGet(
return smget(smGetList, count, smgetMode == SMGetMode.UNIQUE, (from > to), collectionTranscoder);
}

/**
* Turn the list of keys into groups of keys.
* All keys in a group belong to the same memcached server.
*
* @param keyList list of keys
* @param maxKeyCountPerGroup max size of the key group (number of keys)
* @return list of grouped (memcached node + keys) in the group
*/
private Collection<Entry<MemcachedNode, List<String>>> groupingKeys(List<String> keyList, int maxKeyCountPerGroup) {
List<Entry<MemcachedNode, List<String>>> resultList = new ArrayList<>();
Map<MemcachedNode, List<String>> nodeMap = new HashMap<>();
MemcachedConnection conn = getMemcachedConnection();

for (String key : keyList) {
MemcachedNode qa = conn.findNodeByKey(key);
List<String> keyGroup = nodeMap.get(qa);

if (keyGroup == null) {
keyGroup = new ArrayList<>();
nodeMap.put(qa, keyGroup);
} else if (keyGroup.size() >= maxKeyCountPerGroup) {
resultList.add(new AbstractMap.SimpleEntry<>(qa, keyGroup));
keyGroup = new ArrayList<>();
nodeMap.put(qa, keyGroup);
}
keyGroup.add(key);
}
// Add the Entry instance which is not full(smaller than groupSize) to the result.
resultList.addAll(nodeMap.entrySet());
return resultList;
}

/**
* Generic smget operation for b+tree items. Public smget methods call this method.
*
Expand Down
152 changes: 74 additions & 78 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -1060,21 +1061,17 @@ public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys,
// because it is fully populated when it is used and
// used only to read the transcoder for a key.
final Map<String, Transcoder<T>> tcMap = new HashMap<>();
for (String key : keys) {
if (tcIter.hasNext()) {
tcMap.put(key, tcIter.next());
validateKey(key);
}
}

// Grouping keys by memcached node
final Map<MemcachedNode, List<String>> keyMap = new HashMap<>();

Iterator<String> keyIter = keys.iterator();
while (keyIter.hasNext() && tcIter.hasNext()) {
String key = keyIter.next();
Transcoder<T> tc = tcIter.next();
tcMap.put(key, tc);
validateKey(key);
addKeyToMap(keyMap, key);
}
int wholeChunkSize = getWholeChunkSize(keyMap);
final CountDownLatch latch = new CountDownLatch(wholeChunkSize);
final Collection<Operation> ops = new ArrayList<>(wholeChunkSize);
Collection<Map.Entry<MemcachedNode, List<String>>> arrangedKey
= groupingKeys(keys, GET_BULK_CHUNK_SIZE);
final CountDownLatch latch = new CountDownLatch(arrangedKey.size());

GetOperation.Callback cb = new GetOperation.Callback() {
public void receivedStatus(OperationStatus status) {
Expand All @@ -1096,25 +1093,21 @@ public void complete() {
}
};

// Now that we know how many servers it breaks down into, and the latch
// is all set up, convert all of these strings collections to operations
checkState();
for (Map.Entry<MemcachedNode, List<String>> entry : keyMap.entrySet()) {
List<Operation> ops = new ArrayList<>(arrangedKey.size());
for (Map.Entry<MemcachedNode, List<String>> entry : arrangedKey) {
MemcachedNode node = entry.getKey();
List<String> keyList = entry.getValue();

for (int i = 0; i < keyList.size(); i += GET_BULK_CHUNK_SIZE) {
List<String> lk = keyList.subList(i, Math.min(keyList.size(), i + GET_BULK_CHUNK_SIZE));
Operation op;
if (node == null) {
op = opFact.mget(lk, cb);
} else {
op = node.enabledMGetOp() ? opFact.mget(lk, cb)
: opFact.get(lk, cb);
}
conn.addOperation(node, op);
ops.add(op);
Operation op;
if (node == null) {
op = opFact.mget(keyList, cb);
} else {
op = node.enabledMGetOp() ? opFact.mget(keyList, cb)
: opFact.get(keyList, cb);
}
conn.addOperation(node, op);
ops.add(op);
}
return new BulkGetFuture<>(rvMap, ops, latch, operationTimeout);
}
Expand Down Expand Up @@ -1196,29 +1189,24 @@ public BulkFuture<Map<String, Object>> asyncGetBulk(String... keys) {
*/
public <T> BulkFuture<Map<String, CASValue<T>>> asyncGetsBulk(Collection<String> keys,
Iterator<Transcoder<T>> tcIter) {
final Map<String, GetResult<CASValue<T>>> rvMap
= new ConcurrentHashMap<>();
final Map<String, GetResult<CASValue<T>>> rvMap = new ConcurrentHashMap<>();

// This map does not need to be a ConcurrentHashMap
// because it is fully populated when it is used and
// used only to read the transcoder for a key.
final Map<String, Transcoder<T>> tcMap = new HashMap<>();

// Grouping keys by memcached nodes
final Map<MemcachedNode, List<String>> keyMap = new HashMap<>();
Iterator<String> keyIter = keys.iterator();
while (keyIter.hasNext() && tcIter.hasNext()) {
String key = keyIter.next();
Transcoder<T> tc = tcIter.next();

tcMap.put(key, tc);
validateKey(key);
addKeyToMap(keyMap, key);
for (String key : keys) {
if (tcIter.hasNext()) {
tcMap.put(key, tcIter.next());
validateKey(key);
}
}

int wholeChunkSize = getWholeChunkSize(keyMap);
final CountDownLatch latch = new CountDownLatch(wholeChunkSize);
final Collection<Operation> ops = new ArrayList<>(wholeChunkSize);
// Grouping keys by memcached node
Collection<Map.Entry<MemcachedNode, List<String>>> arrangedKey
= groupingKeys(keys, GET_BULK_CHUNK_SIZE);

final CountDownLatch latch = new CountDownLatch(arrangedKey.size());

GetsOperation.Callback cb = new GetsOperation.Callback() {
public void receivedStatus(OperationStatus status) {
Expand All @@ -1243,49 +1231,24 @@ public void complete() {
// Now that we know how many servers it breaks down into, and the latch
// is all set up, convert all of these strings collections to operations
checkState();
for (Map.Entry<MemcachedNode, List<String>> entry : keyMap.entrySet()) {
List<Operation> ops = new ArrayList<>(arrangedKey.size());
for (Map.Entry<MemcachedNode, List<String>> entry : arrangedKey) {
MemcachedNode node = entry.getKey();
List<String> keyList = entry.getValue();

for (int i = 0; i < keyList.size(); i += GET_BULK_CHUNK_SIZE) {
List<String> lk = keyList.subList(i, Math.min(keyList.size(), i + GET_BULK_CHUNK_SIZE));
Operation op;
if (node == null) {
op = opFact.mgets(lk, cb);
} else {
op = node.enabledMGetsOp() ? opFact.mgets(lk, cb)
: opFact.gets(lk, cb);
}
conn.addOperation(node, op);
ops.add(op);
Operation op;
if (node == null) {
op = opFact.mgets(keyList, cb);
} else {
op = node.enabledMGetsOp() ? opFact.mgets(keyList, cb)
: opFact.gets(keyList, cb);
}
conn.addOperation(node, op);
ops.add(op);
}
return new BulkGetFuture<>(rvMap, ops, latch, operationTimeout);
}

/**
* Grouping keys by memcached node.
* @param keyMap key list that mapped by node
* @param key the key to request
*/
private void addKeyToMap(Map<MemcachedNode, List<String>> keyMap, String key) {
MemcachedNode node = conn.findNodeByKey(key);
keyMap.computeIfAbsent(node, k -> new ArrayList<>()).add(key);
}

/**
* get size of whole chunk by node
* @param keyMap collection list that grouped by node
* @return size of whole chunk
*/
private int getWholeChunkSize(Map<MemcachedNode, List<String>> keyMap) {
int wholeChunkSize = 0;
for (List<String> keys : keyMap.values()) {
wholeChunkSize += (((keys.size() - 1) / GET_BULK_CHUNK_SIZE) + 1);
}
return wholeChunkSize;
}

/**
* Asynchronously gets (with CAS support) a bunch of objects from the cache.
*
Expand Down Expand Up @@ -2183,4 +2146,37 @@ int getAddedQueueSize() {
protected Collection<MemcachedNode> getAllNodes() {
return conn.getLocator().getAll();
}

/**
* Turn the list of keys into groups of keys.
* All keys in a group belong to the same memcached server.
*
* @param keyList list of keys
* @param maxKeyCountPerGroup max size of the key group (number of keys)
* @return list of grouped (memcached node + keys) in the group
*/
protected Collection<Map.Entry<MemcachedNode, List<String>>> groupingKeys(
Collection<String> keyList, int maxKeyCountPerGroup) {
List<Map.Entry<MemcachedNode, List<String>>> resultList = new ArrayList<>();
Map<MemcachedNode, List<String>> nodeMap = new HashMap<>();
MemcachedConnection conn = getMemcachedConnection();

for (String key : keyList) {
MemcachedNode qa = conn.findNodeByKey(key);
List<String> keyGroup = nodeMap.get(qa);

if (keyGroup == null) {
keyGroup = new ArrayList<>();
nodeMap.put(qa, keyGroup);
} else if (keyGroup.size() >= maxKeyCountPerGroup) {
resultList.add(new AbstractMap.SimpleEntry<>(qa, keyGroup));
keyGroup = new ArrayList<>();
nodeMap.put(qa, keyGroup);
}
keyGroup.add(key);
}
// Add the Entry instance which is not full(smaller than groupSize) to the result.
resultList.addAll(nodeMap.entrySet());
return resultList;
}
}
2 changes: 1 addition & 1 deletion src/test/java/net/spy/memcached/ProtocolBaseCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public void testInvalidKeyBlank() throws Exception {
}
}

public void testInvalidKeyBulk() throws Exception {
public void testInvalidKeyBulk() {
try {
Object val = client.getBulk(Collections.singletonList("Key key2"));
fail("Expected IllegalArgumentException, got " + val);
Expand Down

0 comments on commit dcdf622

Please sign in to comment.