Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INTERNAL: Refactor api using broadcast operation. #646

Merged
merged 1 commit into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 19 additions & 86 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
Expand Down Expand Up @@ -128,6 +127,7 @@
import net.spy.memcached.internal.SMGetFuture;
import net.spy.memcached.internal.PipedCollectionFuture;
import net.spy.memcached.internal.CollectionGetFuture;
import net.spy.memcached.internal.BroadcastFuture;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import 문이 ABC 순이므로 원래대로라면 이곳에 위치하지 않을 것 같습니다.
프로젝트 루트 디렉토리 우클릭 한 다음 Optimize imports 한 번 실행해주세요.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize imports 동작 시 변경되는 파일이 너무 많아
별도 PR로 처리하겠습니다.

import net.spy.memcached.ops.BTreeFindPositionOperation;
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
import net.spy.memcached.ops.BTreeGetBulkOperation;
Expand Down Expand Up @@ -1952,96 +1952,29 @@ public OperationFuture<Boolean> flush(final String prefix) {

@Override
public OperationFuture<Boolean> flush(final String prefix, final int delay) {
final AtomicReference<Boolean> flushResult = new AtomicReference<Boolean>(true);
final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();

final CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() {
public Operation newOp(final MemcachedNode n,
final CountDownLatch latch) {
Operation op = opFact.flush(prefix, delay, false,
new OperationCallback() {
public void receivedStatus(OperationStatus s) {
if (!s.isSuccess()) {
flushResult.set(false);
}
}

public void complete() {
latch.countDown();
}
});
ops.add(op);
return op;
}
});

return new OperationFuture<Boolean>(blatch, flushResult,
operationTimeout) {
@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
rv |= op.cancel("by application.");
}
return rv;
}
Collection<MemcachedNode> nodes = getAllNodes();
final BroadcastFuture<Boolean> rv
= new BroadcastFuture<Boolean>(operationTimeout, Boolean.TRUE, nodes.size());

@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
}

@Override
public Boolean get(long duration, TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {

if (!blatch.await(duration, units)) {
// whenever timeout occurs, continuous timeout counter will increase by 1.
Collection<Operation> timedoutOps = new HashSet<Operation>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
MemcachedConnection.opTimedOut(op);
timedoutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (timedoutOps.size() > 0) {
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
}
} else {
// continuous timeout counter will be reset
MemcachedConnection.opsSucceeded(ops);
}

for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
checkState();
for (MemcachedNode node : nodes) {
Operation op = opFact.flush(prefix, delay, false, new OperationCallback() {
@Override
public void receivedStatus(OperationStatus status) {
if (!status.isSuccess()) {
rv.set(Boolean.FALSE, status);
}
}

return flushResult.get();
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
@Override
public void complete() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

메소드 시작 전에 이전 메소드 사이에 1개의 Empty Line을 둡시다.
다른 곳도 포함해주세요.

rv.complete();
}
return true;
}
};
});
rv.addOp(op);
getMemcachedConnection().addOperation(node, op);
}
return rv;
}

@Override
Expand Down
17 changes: 0 additions & 17 deletions src/main/java/net/spy/memcached/BroadcastOpFactory.java

This file was deleted.

Loading
Loading