Skip to content

Commit

Permalink
BarrageSession Subscription/Snapshot Methods now Return `Future<Tab…
Browse files Browse the repository at this point in the history
…le>` (#4676)

This includes breaking changes to the barrage java-client API. The snapshot/subscription methods now return `Future<Table>` instead of `BarrageTable` and users are no longer need to participate via `blockUntilCompletion` as this is forced by `Future#get`.

Subscribed tables can now be canceled only through `Liveness` destruction such as using an explicit `LivenessScope` or invoking `ReferenceCounted#forceReferenceCountToZero()`.

---------

Co-authored-by: Ryan Caudy <[email protected]>
  • Loading branch information
nbauernfeind and rcaudy authored Oct 23, 2023
1 parent 8225c90 commit 43c10f4
Show file tree
Hide file tree
Showing 12 changed files with 565 additions and 538 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.engine.exceptions;

import io.deephaven.UncheckedDeephavenException;
import org.jetbrains.annotations.NotNull;

/**
* This exception is used when a result cannot be returned because the request was cancelled.
*/
public class RequestCancelledException extends UncheckedDeephavenException {
public RequestCancelledException(@NotNull final String message) {
super(message);
}

public RequestCancelledException(@NotNull final String message, @NotNull final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package io.deephaven.engine.updategraph;

import io.deephaven.util.SafeCloseable;
import io.deephaven.util.function.ThrowingSupplier;
import io.deephaven.util.locks.FunctionalLock;
import io.deephaven.util.locks.FunctionalReentrantLock;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.Condition;

public class UpdateGraphAwareCompletableFuture<T> implements Future<T> {

private final UpdateGraph updateGraph;

/** This condition is used to signal any threads waiting on the UpdateGraph exclusive lock. */
private volatile Condition updateGraphCondition;

private final FunctionalLock lock = new FunctionalReentrantLock();
private volatile Condition lockCondition;

private volatile ThrowingSupplier<T, ExecutionException> resultSupplier;
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<UpdateGraphAwareCompletableFuture, ThrowingSupplier> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(
UpdateGraphAwareCompletableFuture.class, ThrowingSupplier.class, "resultSupplier");

/** The encoding of the cancelled supplier. */
private static final ThrowingSupplier<?, ExecutionException> CANCELLATION_SUPPLIER = () -> {
throw new CancellationException();
};

public UpdateGraphAwareCompletableFuture(@NotNull final UpdateGraph updateGraph) {
this.updateGraph = updateGraph;
}

////////////////
// Future API //
////////////////
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// noinspection unchecked
return trySignalCompletion((ThrowingSupplier<T, ExecutionException>) CANCELLATION_SUPPLIER);
}

@Override
public boolean isCancelled() {
return resultSupplier == CANCELLATION_SUPPLIER;
}

@Override
public boolean isDone() {
return resultSupplier != null;
}

@Override
public T get() throws InterruptedException, ExecutionException {
checkSharedLockState();

if (resultSupplier != null) {
return resultSupplier.get();
}
try {
return getInternal(0, null);
} catch (TimeoutException toe) {
throw new IllegalStateException("Unexpected TimeoutException", toe);
}
}

@Override
public T get(final long timeout, @NotNull final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
checkSharedLockState();

if (resultSupplier != null) {
return resultSupplier.get();
}
if (timeout <= 0) {
throw new TimeoutException();
}
return getInternal(timeout, unit);
}

private T getInternal(final long timeout, @Nullable final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
final boolean holdingUpdateGraphLock = updateGraph.exclusiveLock().isHeldByCurrentThread();
if (holdingUpdateGraphLock) {
if (updateGraphCondition == null) {
updateGraphCondition = updateGraph.exclusiveLock().newCondition();
}
} else if (lockCondition == null) {
try (final SafeCloseable ignored = lock.lockCloseable()) {
if (lockCondition == null) {
lockCondition = lock.newCondition();
}
}
}

if (holdingUpdateGraphLock) {
waitForResult(updateGraphCondition, timeout, unit);
} else {
try (final SafeCloseable ignored = lock.lockCloseable()) {
waitForResult(lockCondition, timeout, unit);
}
}

return resultSupplier.get();
}

private void checkSharedLockState() {
if (updateGraph.sharedLock().isHeldByCurrentThread()) {
throw new UnsupportedOperationException(
"Cannot Future.get(...) while holding the " + updateGraph + " shared lock");
}
}

private void waitForResult(final Condition condition, final long timeout, @Nullable final TimeUnit unit)
throws InterruptedException, TimeoutException {
if (unit == null) {
while (resultSupplier == null) {
condition.await();
}
return;
}

long nanosLeft = unit.toNanos(timeout);
while (resultSupplier == null) {
nanosLeft = condition.awaitNanos(nanosLeft);
if (nanosLeft <= 0) {
throw new TimeoutException();
}
}
}

////////////////////////////////////////////////////
// Completion API modeled after CompletableFuture //
////////////////////////////////////////////////////

public boolean complete(T value) {
return trySignalCompletion(() -> value);
}

public boolean completeExceptionally(Throwable ex) {
Objects.requireNonNull(ex);
return trySignalCompletion(() -> {
throw new ExecutionException(ex);
});
}

private boolean trySignalCompletion(@NotNull final ThrowingSupplier<T, ExecutionException> result) {
if (!RESULT_UPDATER.compareAndSet(this, null, result)) {
return false;
}

final Condition localUpdateGraphCondition = updateGraphCondition;
if (localUpdateGraphCondition != null) {
updateGraph.requestSignal(localUpdateGraphCondition);
}
final Condition localLockCondition = lockCondition;
if (localLockCondition != null) {
lock.doLocked(localLockCondition::signalAll);
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,7 @@ public interface ViewportChangedCallback {
*
* @param t the error
*/
void onError(Throwable t);

/**
* Called when the subscription is closed; will not be invoked after an onError.
*/
void onClose();
void onError(@NotNull Throwable t);
}

public static final boolean DEBUG_ENABLED =
Expand All @@ -100,10 +95,6 @@ public interface ViewportChangedCallback {
/** the reinterpreted destination writable sources */
protected final WritableColumnSource<?>[] destSources;


/** unsubscribed must never be reset to false once it has been set to true */
private volatile boolean unsubscribed = false;

/**
* The client and the server update asynchronously with respect to one another. The client requests a viewport, the
* server will send the client the snapshot for the request and continue to send data that is inside of that view.
Expand Down Expand Up @@ -242,8 +233,8 @@ public BitSet getServerColumns() {

@Override
public void handleBarrageMessage(final BarrageMessage update) {
if (unsubscribed) {
beginLog(LogLevel.INFO).append(": Discarding update for unsubscribed table!").endl();
if (pendingError != null || isFailed()) {
beginLog(LogLevel.INFO).append(": Discarding update for errored table!").endl();
return;
}

Expand All @@ -255,10 +246,7 @@ public void handleBarrageMessage(final BarrageMessage update) {
try {
realRefresh();
} catch (Throwable err) {
if (viewportChangedCallback != null) {
viewportChangedCallback.onError(err);
viewportChangedCallback = null;
}
tryToDeliverErrorToCallback(err);
throw err;
}
} else {
Expand All @@ -271,6 +259,13 @@ public void handleBarrageError(Throwable t) {
enqueueError(t);
}

private synchronized void tryToDeliverErrorToCallback(final Throwable err) {
if (viewportChangedCallback != null) {
viewportChangedCallback.onError(err);
viewportChangedCallback = null;
}
}

private class SourceRefresher extends InstrumentedUpdateSource {

SourceRefresher() {
Expand All @@ -289,10 +284,7 @@ protected void instrumentedRefresh() {
.append(err).endl();
notifyListenersOnError(err, null);

if (viewportChangedCallback != null) {
viewportChangedCallback.onError(err);
viewportChangedCallback = null;
}
tryToDeliverErrorToCallback(err);
if (err instanceof Error) {
// rethrow if this was an error (which should not be swallowed)
throw err;
Expand All @@ -301,7 +293,7 @@ protected void instrumentedRefresh() {
}
}

protected void updateServerViewport(
protected synchronized void updateServerViewport(
final RowSet viewport,
final BitSet columns,
final boolean reverseViewport) {
Expand Down Expand Up @@ -337,32 +329,20 @@ protected boolean isSubscribedColumn(int i) {
}

private synchronized void realRefresh() {
if (isFailed()) {
discardAnyPendingUpdates();
return;
}

if (pendingError != null) {
if (viewportChangedCallback != null) {
viewportChangedCallback.onError(pendingError);
viewportChangedCallback = null;
}
tryToDeliverErrorToCallback(pendingError);
if (isRefreshing()) {
notifyListenersOnError(pendingError, null);
}
// once we notify on error we are done, we can not notify any further, we are failed
cleanup();
return;
}
if (unsubscribed) {
if (getRowSet().isNonempty()) {
// publish one last clear downstream; this data would be stale
final RowSet allRows = getRowSet().copy();
getRowSet().writableCast().remove(allRows);
if (isRefreshing()) {
notifyListeners(RowSetFactory.empty(), allRows, RowSetFactory.empty());
}
}
if (viewportChangedCallback != null) {
viewportChangedCallback.onClose();
viewportChangedCallback = null;
}
cleanup();
// we are quite certain the shadow copies should have been drained on the last run
Assert.eqZero(shadowPendingUpdates.size(), "shadowPendingUpdates.size()");
return;
}

Expand Down Expand Up @@ -396,20 +376,22 @@ private synchronized void realRefresh() {
}
}

private void discardAnyPendingUpdates() {
synchronized (pendingUpdatesLock) {
pendingUpdates.forEach(BarrageMessage::close);
pendingUpdates.clear();
}
}

private void cleanup() {
unsubscribed = true;
if (stats != null) {
stats.stop();
}
if (isRefreshing()) {
registrar.removeSource(refresher);
}
synchronized (pendingUpdatesLock) {
// release any pending snapshots, as we will never process them
pendingUpdates.clear();
}
// we are quite certain the shadow copies should have been drained on the last run
Assert.eqZero(shadowPendingUpdates.size(), "shadowPendingUpdates.size()");
// release any pending snapshots, as we will never process them
discardAnyPendingUpdates();
}

@Override
Expand All @@ -430,10 +412,7 @@ private void enqueueError(final Throwable e) {
try {
realRefresh();
} catch (Throwable err) {
if (viewportChangedCallback != null) {
viewportChangedCallback.onError(err);
viewportChangedCallback = null;
}
tryToDeliverErrorToCallback(err);
throw err;
}
} else {
Expand Down Expand Up @@ -587,9 +566,7 @@ protected LogEntry beginLog(LogLevel level) {
@Override
protected void destroy() {
super.destroy();
if (stats != null) {
stats.stop();
}
cleanup();
}

public LongConsumer getDeserializationTmConsumer() {
Expand Down
Loading

0 comments on commit 43c10f4

Please sign in to comment.