Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INTERNAL: Creating piped collection operation future. #619

Merged
merged 1 commit into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 15 additions & 183 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import net.spy.memcached.internal.CollectionGetBulkFuture;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.SMGetFuture;
import net.spy.memcached.internal.PipedCollectionFuture;
import net.spy.memcached.ops.BTreeFindPositionOperation;
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
import net.spy.memcached.ops.BTreeGetBulkOperation;
Expand Down Expand Up @@ -948,15 +949,9 @@ public void gotStatus(Integer index, OperationStatus status) {
<T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPipedUpdate(
final String key, final List<CollectionPipedUpdate<T>> updateList) {

final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();

final CountDownLatch latch = new CountDownLatch(updateList.size());

final List<CollectionOperationStatus> mergedOperationStatus = Collections
.synchronizedList(new ArrayList<CollectionOperationStatus>(updateList.size()));

final Map<Integer, CollectionOperationStatus> mergedResult =
new ConcurrentHashMap<Integer, CollectionOperationStatus>();
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout, updateList.size());

for (int i = 0; i < updateList.size(); i++) {
final CollectionPipedUpdate<T> update = updateList.get(i);
Expand All @@ -974,7 +969,7 @@ public void receivedStatus(OperationStatus status) {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
mergedOperationStatus.add(cstatus);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

기존 로직에서 CollectionOperationStatus가 아닌 OperationStatus 를 넘기는 것이 맞지 않는 지?

rv.addOperationStatus(cstatus);
}

// complete
Expand All @@ -985,97 +980,18 @@ public void complete() {
// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
mergedResult.put(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
mergedResult.put(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
});
addOp(key, op);
ops.add(op);
rv.addOperation(op);
}

return new CollectionFuture<Map<Integer, CollectionOperationStatus>>(
latch, operationTimeout) {

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
op.cancel("by application.");
rv |= op.getState() == OperationState.WRITE_QUEUED;
}
return rv;
}

@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
}

@Override
public Map<Integer, CollectionOperationStatus> get(long duration,
TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

if (!latch.await(duration, units)) {
Collection<Operation> timedoutOps = new HashSet<Operation>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
timedoutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (timedoutOps.size() > 0) {
MemcachedConnection.opTimedOut(timedoutOps.iterator().next());
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
}
} else {
// continuous timeout counter will be reset only once in pipe
MemcachedConnection.opSucceeded(ops.iterator().next());
}

for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
}

return mergedResult;
}

@Override
public CollectionOperationStatus getOperationStatus() {
for (CollectionOperationStatus status : mergedOperationStatus) {
if (!status.isSuccess()) {
return status;
}
}
return new CollectionOperationStatus(true, "END", CollectionResponse.END);
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
}
return true;
}
};
return rv;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brido4125
ops Collection을 만들어 사용하는 아래 방식을 검토해 보시죠.

  • i 변수 대신 idx 변수 사용
  • rv.setOperations()로 한번에 설정
  • addOps() 코드 최적화
    • key validate 1회만 수행
    for (int idx = 0; idx < updateList.size(); idx++) {
      final CollectionPipedUpdate<T> update = updateList.get(idx);
      Operation op = opFact.collectionPipedUpdate(key, update,
          new CollectionPipedUpdateOperation.Callback() {
          });
      ops.add(op);
    }
    rv.setOperations(ops);
    addOps(key, ops);
    return rv;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i 변수 대신 idx 변수 사용

현재 idx 변수는 Callback 객체의 내부에서 사용되기 때문에
항상 final 키워드로 선언되어야합니다.

만약 i를 사용하게 될 경우 final 변수가 아니기 때문에 내부 Callback의 구현에서 연산의 인덱싱을 할 수 없습니다.

결론적으로 idx변수는 기존처럼 사용해야합니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ops Collection을 만들어 사용하는 아래 방식을 검토해 보시죠.

해당 방식 적용하여 리팩토링 했습니다.

}

/**
Expand Down Expand Up @@ -3903,15 +3819,9 @@ public SMGetFuture<List<SMGetElement<Object>>> asyncBopSortMergeGet(
<T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPipedInsert(
final String key, final List<CollectionPipedInsert<T>> insertList) {

final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();

final CountDownLatch latch = new CountDownLatch(insertList.size());

final List<CollectionOperationStatus> mergedOperationStatus = Collections
.synchronizedList(new ArrayList<CollectionOperationStatus>(insertList.size()));

final Map<Integer, CollectionOperationStatus> mergedResult =
new ConcurrentHashMap<Integer, CollectionOperationStatus>();
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout, insertList.size());

for (int i = 0; i < insertList.size(); i++) {
final CollectionPipedInsert<T> insert = insertList.get(i);
Expand All @@ -3929,7 +3839,7 @@ public void receivedStatus(OperationStatus status) {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
mergedOperationStatus.add(cstatus);
rv.addOperationStatus(cstatus);
}

// complete
Expand All @@ -3940,96 +3850,18 @@ public void complete() {
// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
mergedResult.put(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
mergedResult.put(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
});
addOp(key, op);
ops.add(op);
rv.addOperation(op);
}

return new CollectionFuture<Map<Integer, CollectionOperationStatus>>(
latch, operationTimeout) {

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
op.cancel("by application.");
rv |= op.getState() == OperationState.WRITE_QUEUED;
}
return rv;
}

@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
}

@Override
public Map<Integer, CollectionOperationStatus> get(long duration,
TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

if (!latch.await(duration, units)) {
Collection<Operation> timedoutOps = new HashSet<Operation>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
timedoutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (timedoutOps.size() > 0) {
MemcachedConnection.opTimedOut(timedoutOps.iterator().next());
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
}
} else {
// continuous timeout counter will be reset only once in pipe
MemcachedConnection.opSucceeded(ops.iterator().next());
}
for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
}

return mergedResult;
}

@Override
public CollectionOperationStatus getOperationStatus() {
for (CollectionOperationStatus status : mergedOperationStatus) {
if (!status.isSuccess()) {
return status;
}
}
return new CollectionOperationStatus(true, "END", CollectionResponse.END);
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
}
return true;
}
};
return rv;
}

@Override
Expand Down
121 changes: 121 additions & 0 deletions src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package net.spy.memcached.internal;

import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationState;

import java.util.List;
import java.util.Map;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutionException;

public class PipedCollectionFuture<K, V>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

piped exist 연산에서 key는 어떤 타입인가요?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

사용자가 정의한 타입입니다.
그래서 제네릭 로 정의됩니다.

앞선 코멘트에 Map<T,Boolean>를 리턴한다고 있습니다.
참고 부탁드립니다.

extends CollectionFuture<Map<K, V>> {
private final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();
private final List<CollectionOperationStatus> mergedOperationStatus;

private final Map<K, V> mergedResult =
new ConcurrentHashMap<K, V>();

public PipedCollectionFuture(CountDownLatch l, long opTimeout, int opSize) {
super(l, opTimeout);
mergedOperationStatus = Collections
.synchronizedList(new ArrayList<CollectionOperationStatus>(opSize));
}

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
op.cancel("by application.");
rv |= op.getState() == OperationState.WRITE_QUEUED;
}
return rv;
}

@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
}
return true;
}

@Override
public Map<K, V> get(long duration, TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

if (!latch.await(duration, units)) {
Collection<Operation> timedoutOps = new HashSet<Operation>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
timedoutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (timedoutOps.size() > 0) {
MemcachedConnection.opTimedOut(timedoutOps.iterator().next());
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
}
} else {
// continuous timeout counter will be reset only once in pipe
MemcachedConnection.opSucceeded(ops.iterator().next());
}

for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
}

return mergedResult;
}

@Override
public CollectionOperationStatus getOperationStatus() {
for (CollectionOperationStatus status : mergedOperationStatus) {
if (!status.isSuccess()) {
return status;
}
}
return mergedOperationStatus.get(0);
}

public void addOperationStatus(CollectionOperationStatus status) {
mergedOperationStatus.add(status);
}

public void addEachResult(K index, V status) {
mergedResult.put(index, status);
}

public void addOperation(Operation op) {
ops.add(op);
}
}
Loading