Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโ€™ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INTERNAL: use groupingKeys in asyncGet(s)Bulk #801

Merged
merged 1 commit into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 0 additions & 32 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1986,38 +1986,6 @@ public SMGetFuture<List<SMGetElement<Object>>> asyncBopSortMergeGet(
return smget(smGetList, count, smgetMode == SMGetMode.UNIQUE, (from > to), collectionTranscoder);
}

/**
* Turn the list of keys into groups of keys.
* All keys in a group belong to the same memcached server.
*
* @param keyList list of keys
* @param maxKeyCountPerGroup max size of the key group (number of keys)
* @return list of grouped (memcached node + keys) in the group
*/
private Collection<Entry<MemcachedNode, List<String>>> groupingKeys(List<String> keyList, int maxKeyCountPerGroup) {
List<Entry<MemcachedNode, List<String>>> resultList = new ArrayList<>();
Map<MemcachedNode, List<String>> nodeMap = new HashMap<>();
MemcachedConnection conn = getMemcachedConnection();

for (String key : keyList) {
MemcachedNode qa = conn.findNodeByKey(key);
List<String> keyGroup = nodeMap.get(qa);

if (keyGroup == null) {
keyGroup = new ArrayList<>();
nodeMap.put(qa, keyGroup);
} else if (keyGroup.size() >= maxKeyCountPerGroup) {
resultList.add(new AbstractMap.SimpleEntry<>(qa, keyGroup));
keyGroup = new ArrayList<>();
nodeMap.put(qa, keyGroup);
}
keyGroup.add(key);
}
// Add the Entry instance which is not full(smaller than groupSize) to the result.
resultList.addAll(nodeMap.entrySet());
return resultList;
}

/**
* Generic smget operation for b+tree items. Public smget methods call this method.
*
Expand Down
150 changes: 72 additions & 78 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -1060,21 +1061,17 @@ public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys,
// because it is fully populated when it is used and
// used only to read the transcoder for a key.
final Map<String, Transcoder<T>> tcMap = new HashMap<>();
for (String key : keys) {
if (tcIter.hasNext()) {
tcMap.put(key, tcIter.next());
validateKey(key);
}
}

// Grouping keys by memcached node
final Map<MemcachedNode, List<String>> keyMap = new HashMap<>();

Iterator<String> keyIter = keys.iterator();
while (keyIter.hasNext() && tcIter.hasNext()) {
String key = keyIter.next();
Transcoder<T> tc = tcIter.next();
tcMap.put(key, tc);
validateKey(key);
addKeyToMap(keyMap, key);
}
int wholeChunkSize = getWholeChunkSize(keyMap);
final CountDownLatch latch = new CountDownLatch(wholeChunkSize);
final Collection<Operation> ops = new ArrayList<>(wholeChunkSize);
Collection<Map.Entry<MemcachedNode, List<String>>> arrangedKey
= groupingKeys(keys, GET_BULK_CHUNK_SIZE);
final CountDownLatch latch = new CountDownLatch(arrangedKey.size());

GetOperation.Callback cb = new GetOperation.Callback() {
public void receivedStatus(OperationStatus status) {
Expand All @@ -1096,25 +1093,21 @@ public void complete() {
}
};

// Now that we know how many servers it breaks down into, and the latch
// is all set up, convert all of these strings collections to operations
checkState();
for (Map.Entry<MemcachedNode, List<String>> entry : keyMap.entrySet()) {
List<Operation> ops = new ArrayList<>(arrangedKey.size());
for (Map.Entry<MemcachedNode, List<String>> entry : arrangedKey) {
MemcachedNode node = entry.getKey();
List<String> keyList = entry.getValue();

for (int i = 0; i < keyList.size(); i += GET_BULK_CHUNK_SIZE) {
List<String> lk = keyList.subList(i, Math.min(keyList.size(), i + GET_BULK_CHUNK_SIZE));
Operation op;
if (node == null) {
op = opFact.mget(lk, cb);
} else {
op = node.enabledMGetOp() ? opFact.mget(lk, cb)
: opFact.get(lk, cb);
}
conn.addOperation(node, op);
ops.add(op);
Operation op;
if (node == null) {
op = opFact.mget(keyList, cb);
} else {
op = node.enabledMGetOp() ? opFact.mget(keyList, cb)
: opFact.get(keyList, cb);
}
conn.addOperation(node, op);
ops.add(op);
}
return new BulkGetFuture<>(rvMap, ops, latch, operationTimeout);
}
Expand Down Expand Up @@ -1196,29 +1189,24 @@ public BulkFuture<Map<String, Object>> asyncGetBulk(String... keys) {
*/
public <T> BulkFuture<Map<String, CASValue<T>>> asyncGetsBulk(Collection<String> keys,
Iterator<Transcoder<T>> tcIter) {
final Map<String, GetResult<CASValue<T>>> rvMap
= new ConcurrentHashMap<>();
final Map<String, GetResult<CASValue<T>>> rvMap = new ConcurrentHashMap<>();

// This map does not need to be a ConcurrentHashMap
// because it is fully populated when it is used and
// used only to read the transcoder for a key.
final Map<String, Transcoder<T>> tcMap = new HashMap<>();

// Grouping keys by memcached nodes
final Map<MemcachedNode, List<String>> keyMap = new HashMap<>();
Iterator<String> keyIter = keys.iterator();
while (keyIter.hasNext() && tcIter.hasNext()) {
String key = keyIter.next();
Transcoder<T> tc = tcIter.next();

tcMap.put(key, tc);
validateKey(key);
addKeyToMap(keyMap, key);
for (String key : keys) {
if (tcIter.hasNext()) {
tcMap.put(key, tcIter.next());
validateKey(key);
}
}

int wholeChunkSize = getWholeChunkSize(keyMap);
final CountDownLatch latch = new CountDownLatch(wholeChunkSize);
final Collection<Operation> ops = new ArrayList<>(wholeChunkSize);
// Grouping keys by memcached node
Collection<Map.Entry<MemcachedNode, List<String>>> arrangedKey
= groupingKeys(keys, GET_BULK_CHUNK_SIZE);

final CountDownLatch latch = new CountDownLatch(arrangedKey.size());

GetsOperation.Callback cb = new GetsOperation.Callback() {
public void receivedStatus(OperationStatus status) {
Expand All @@ -1243,49 +1231,24 @@ public void complete() {
// Now that we know how many servers it breaks down into, and the latch
// is all set up, convert all of these strings collections to operations
checkState();
for (Map.Entry<MemcachedNode, List<String>> entry : keyMap.entrySet()) {
List<Operation> ops = new ArrayList<>(arrangedKey.size());
for (Map.Entry<MemcachedNode, List<String>> entry : arrangedKey) {
MemcachedNode node = entry.getKey();
List<String> keyList = entry.getValue();

for (int i = 0; i < keyList.size(); i += GET_BULK_CHUNK_SIZE) {
List<String> lk = keyList.subList(i, Math.min(keyList.size(), i + GET_BULK_CHUNK_SIZE));
Operation op;
if (node == null) {
op = opFact.mgets(lk, cb);
} else {
op = node.enabledMGetsOp() ? opFact.mgets(lk, cb)
: opFact.gets(lk, cb);
}
conn.addOperation(node, op);
ops.add(op);
Operation op;
if (node == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

์—ฌ๊ธฐ๋Š” ์งˆ๋ฌธ์ž…๋‹ˆ๋‹ค.

node == null ์ƒํ™ฉ์—์„œ operation ๊ฐ์ฒด๋ฅผ ๋งŒ๋“ค์–ด connection์— ์ถ”๊ฐ€ํ•  ํ•„์š”๊ฐ€ ์žˆ๋‚˜์š”?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ํ˜„์žฌ๋Š” connection์—์„œ node๊ฐ€ null์ผ ๊ฒฝ์šฐ op๋ฅผ cancel ์‹œํ‚ค๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
์—ฌ๋Ÿฌ ๋…ธ๋“œ์— ๋‹ค์–‘ํ•œ mget op๋ฅผ ๋ณด๋‚ด๋†“๊ณ  node๊ฐ€ null์ผ ๊ฒฝ์šฐ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด cancel๋กœ ์ธํ•ด ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜๊ธฐ ๋•Œ๋ฌธ์—, get/mget ์š”์ฒญ์ด ์„œ๋ฒ„์—์„œ ์ฒ˜๋ฆฌ๋˜์—ˆ์ง€๋งŒ ๊ฒฐ๊ณผ๊ฐ’์„ ์–ป์„ ์ˆ˜ ์—†๋Š” ์ƒํ™ฉ์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
์ด๋ฅผ ๋ง‰์œผ๋ ค๋ฉด node๊ฐ€ null์ธ ์ƒํ™ฉ์ด ๋ฐœ์ƒํ•  ๋•Œ ๊ธฐ์กด ops list์— ์กด์žฌํ•˜๋Š” ๋ชจ๋“  op๋ฅผ cancel์‹œํ‚ค๊ณ  ์ด op๋„ cancel์‹œํ‚ค๊ณ  ์˜ˆ์™ธ๋ฅผ ๋˜์ง€๋ฉด ๋  ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node๊ฐ€ null ์ด๋ผ๋Š” ์˜๋ฏธ๋Š” ํ˜„์žฌ ๋‚จ์•„ ์žˆ๋Š” node๊ฐ€ ์—†๋‹ค๋Š” ๊ฒƒ์ด๋ฏ€๋กœ,
๋ชจ๋“  key๋“ค์ด null node์— mapping ๋  ๊ฒƒ ๊ฐ™๊ณ , ์ƒ์„ฑํ•˜๋Š” ๋ชจ๋“  op๊ฐ€ ์ƒ์„ฑ๋  ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค.
๋งž์ง€ ์•Š๋Š” ๋ถ€๋ถ„์ด ์žˆ์œผ๋ฉด, ์•Œ๋ ค์ฃผ์„ธ์š”.

op = opFact.mgets(keyList, cb);
} else {
op = node.enabledMGetsOp() ? opFact.mgets(keyList, cb)
: opFact.gets(keyList, cb);
}
conn.addOperation(node, op);
ops.add(op);
}
return new BulkGetFuture<>(rvMap, ops, latch, operationTimeout);
}

/**
* Grouping keys by memcached node.
* @param keyMap key list that mapped by node
* @param key the key to request
*/
private void addKeyToMap(Map<MemcachedNode, List<String>> keyMap, String key) {
MemcachedNode node = conn.findNodeByKey(key);
keyMap.computeIfAbsent(node, k -> new ArrayList<>()).add(key);
}

/**
* get size of whole chunk by node
* @param keyMap collection list that grouped by node
* @return size of whole chunk
*/
private int getWholeChunkSize(Map<MemcachedNode, List<String>> keyMap) {
int wholeChunkSize = 0;
for (List<String> keys : keyMap.values()) {
wholeChunkSize += (((keys.size() - 1) / GET_BULK_CHUNK_SIZE) + 1);
}
return wholeChunkSize;
}

/**
* Asynchronously gets (with CAS support) a bunch of objects from the cache.
*
Expand Down Expand Up @@ -2183,4 +2146,35 @@ int getAddedQueueSize() {
protected Collection<MemcachedNode> getAllNodes() {
return conn.getLocator().getAll();
}

/**
* Turn the list of keys into groups of keys.
* All keys in a group belong to the same memcached server.
*
* @param keyList list of keys
* @param maxKeyCountPerGroup max size of the key group (number of keys)
* @return list of grouped (memcached node + keys) in the group
*/
protected Collection<Map.Entry<MemcachedNode, List<String>>> groupingKeys(
Collection<String> keyList, int maxKeyCountPerGroup) {
List<Map.Entry<MemcachedNode, List<String>>> resultList = new ArrayList<>();
Map<MemcachedNode, List<String>> nodeMap = new HashMap<>();
for (String key : keyList) {
MemcachedNode qa = conn.findNodeByKey(key);
List<String> keyGroup = nodeMap.get(qa);

if (keyGroup == null) {
keyGroup = new ArrayList<>();
nodeMap.put(qa, keyGroup);
} else if (keyGroup.size() >= maxKeyCountPerGroup) {
resultList.add(new AbstractMap.SimpleEntry<>(qa, keyGroup));
keyGroup = new ArrayList<>();
nodeMap.put(qa, keyGroup);
}
keyGroup.add(key);
}
// Add the Entry instance which is not full(smaller than groupSize) to the result.
resultList.addAll(nodeMap.entrySet());
return resultList;
}
}
2 changes: 1 addition & 1 deletion src/test/java/net/spy/memcached/ProtocolBaseCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public void testInvalidKeyBlank() throws Exception {
}
}

public void testInvalidKeyBulk() throws Exception {
public void testInvalidKeyBulk() {
try {
Object val = client.getBulk(Collections.singletonList("Key key2"));
fail("Expected IllegalArgumentException, got " + val);
Expand Down