Skip to content

Commit

Permalink
FIX: sync problem in BaseOperationImpl cancel and transitionState met…
Browse files Browse the repository at this point in the history
…hod.
  • Loading branch information
brido4125 committed Aug 1, 2023
1 parent 6998801 commit 98ed6ea
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 40 deletions.
5 changes: 1 addition & 4 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1941,10 +1941,7 @@ public void complete() {
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
rv = true;
op.cancel("by application.");
}
rv |= op.cancel("by application.");
}
return rv;
}
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2027,10 +2027,7 @@ public void complete() {
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
rv = true;
op.cancel("by application.");
}
rv |= op.cancel("by application.");
}
return rv;
}
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/net/spy/memcached/internal/BulkGetFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ public BulkGetFuture(BulkGetFuture<T> other) {
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
rv = true;
op.cancel("by application.");
}
rv |= op.cancel("by application.");
}
return rv;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ public BulkOperationFuture(CountDownLatch l, long timeout) {
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
rv = true;
op.cancel("by application.");
}
rv |= op.cancel("by application.");
}
return rv;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,7 @@ public T get(long duration, TimeUnit units)
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
rv = true;
op.cancel("by application.");
}
rv |= op.cancel("by application.");
}
return rv;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,7 @@ public OperationFuture(CountDownLatch l, AtomicReference<T> oref,

public boolean cancel(boolean ign) {
assert op != null : "No operation";
if (op.getState() == OperationState.COMPLETE) {
return false;
}
op.cancel("by application.");
return true;
return op.cancel("by application.");
}

public T get() throws InterruptedException, ExecutionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) {
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
rv = true;
op.cancel("by application.");
}
rv |= op.cancel("by application.");
}
return rv;
}
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/net/spy/memcached/internal/SMGetFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ public T get() throws InterruptedException, ExecutionException {
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
rv = true;
op.cancel("by application.");
}
rv |= op.cancel("by application.");
}
return rv;
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/net/spy/memcached/ops/Operation.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public interface Operation {
/**
* Cancel this operation.
*/
void cancel(String cause);
boolean cancel(String cause);

/**
* Get the cause of cancel.
Expand Down
17 changes: 11 additions & 6 deletions src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;

import net.spy.memcached.MemcachedNode;
import net.spy.memcached.MemcachedReplicaGroup;
Expand All @@ -45,9 +46,9 @@ public abstract class BaseOperationImpl extends SpyObject {
*/
public static final OperationStatus CANCELLED =
new CancelledOperationStatus();
private OperationState state = OperationState.WRITE_QUEUED;
private volatile OperationState state = OperationState.WRITE_QUEUED;
private ByteBuffer cmd = null;
private boolean cancelled = false;
private AtomicBoolean cancelled = new AtomicBoolean(false);
private String cancelCause = null;
private OperationException exception = null;
protected OperationCallback callback = null;
Expand Down Expand Up @@ -79,7 +80,7 @@ protected void setCallback(OperationCallback to) {
}

public final boolean isCancelled() {
return cancelled;
return cancelled.get();
}

public final boolean hasErrored() {
Expand All @@ -90,15 +91,19 @@ public final OperationException getException() {
return exception;
}

public final void cancel(String cause) {
cancelled = true;
public final boolean cancel(String cause) {
if (!cancelled.compareAndSet(false, true) ||
state == OperationState.COMPLETE) {
return false;
}
if (handlingNode != null) {
cancelCause = "Cancelled (" + cause + " : (" + handlingNode.getNodeName() + ")" + ")";
} else {
cancelCause = "Cancelled (" + cause + ")";
}
wasCancelled();
callback.complete();
return true;
}

public final String getCancelCause() {
Expand Down Expand Up @@ -215,7 +220,7 @@ protected final void transitionState(OperationState newState) {
state != OperationState.WRITING) {
cmd = null;
}
if (state == OperationState.COMPLETE) {
if (state == OperationState.COMPLETE && !this.cancelled.get()) {
callback.complete();
}
}
Expand Down

0 comments on commit 98ed6ea

Please sign in to comment.