Skip to content

Commit

Permalink
Barrage Subscription/Snapshot Future's Manage Additional Liveness Unt…
Browse files Browse the repository at this point in the history
…il First #get (#4944)
  • Loading branch information
nbauernfeind committed Dec 15, 2023
1 parent ac70fd3 commit a5eb055
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ private static void setConversionFactor(final ConvertedArrowSchema result, final
result.conversionFactors[i] = factor;
}

public static TableDefinition convertTableDefinition(final ExportedTableCreationResponse response) {
return convertArrowSchema(SchemaHelper.flatbufSchema(response)).tableDef;
}

public static ConvertedArrowSchema convertArrowSchema(final ExportedTableCreationResponse response) {
return convertArrowSchema(SchemaHelper.flatbufSchema(response));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public BarrageSubscription subscribe(final TableSpec tableSpec, final BarrageSub

@Override
public BarrageSubscription subscribe(final TableHandle tableHandle, final BarrageSubscriptionOptions options) {
return new BarrageSubscriptionImpl(this, session.executor(), tableHandle.newRef(), options);
return BarrageSubscription.make(this, session.executor(), tableHandle.newRef(), options);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,35 @@

import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.extensions.barrage.BarrageSnapshotOptions;
import io.deephaven.qst.table.TableSpec;
import org.jetbrains.annotations.Nullable;

import java.util.BitSet;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

/**
* A {@code BarrageSnapshot} represents a snapshot of a table that may or may not be filtered to a viewport of the
* remote source table.
*/
public interface BarrageSnapshot {
/**
* Create a {@code BarrageSnapshot} from a {@link TableHandle}.
*
* @param session the Deephaven session that this export belongs to
* @param executorService an executor service used to flush metrics when enabled
* @param tableHandle the tableHandle to snapshot (ownership is transferred to the snapshot)
* @param options the transport level options for this snapshot
* @return a {@code BarrageSnapshot}
*/
static BarrageSnapshot make(
final BarrageSession session, @Nullable final ScheduledExecutorService executorService,
final TableHandle tableHandle, final BarrageSnapshotOptions options) {
return new BarrageSnapshotImpl(session, executorService, tableHandle, options);
}

interface Factory {
/**
* Sources a barrage snapshot from a {@link TableSpec}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,15 @@ public class BarrageSnapshotImpl extends ReferenceCountedLivenessNode implements

/**
* Represents a BarrageSnapshot.
* <p>
* See {@link BarrageSnapshot#make}.
*
* @param session the Deephaven session that this export belongs to
* @param executorService an executor service used to flush metrics when enabled
* @param tableHandle the tableHandle to snapshot (ownership is transferred to the snapshot)
* @param options the transport level options for this snapshot
*/
public BarrageSnapshotImpl(
BarrageSnapshotImpl(
final BarrageSession session, @Nullable final ScheduledExecutorService executorService,
final TableHandle tableHandle, final BarrageSnapshotOptions options) {
super(false);
Expand Down Expand Up @@ -360,6 +362,10 @@ public void onError(@NotNull final Throwable t) {
}
}


/**
* The Completable Future is used to encapsulate the concept that the table is filled with requested data.
*/
private class SnapshotCompletableFuture extends CompletableFuture<Table> {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,42 @@
*/
package io.deephaven.client.impl;

import io.deephaven.engine.liveness.LivenessScope;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.qst.table.TableSpec;
import io.deephaven.util.SafeCloseable;

import java.util.BitSet;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

/**
* A {@code BarrageSubscription} represents a subscription over a table that may or may not be filtered to a viewport of
* the remote source table.
*/
public interface BarrageSubscription {
/**
* Create a {@code BarrageSubscription} from a {@link TableHandle}.
*
* @param session the Deephaven session that this export belongs to
* @param executorService an executor service used to flush stats
* @param tableHandle the tableHandle to subscribe to (ownership is transferred to the subscription)
* @param options the transport level options for this subscription
* @return a {@code BarrageSubscription} from a {@link TableHandle}
*/
static BarrageSubscription make(
final BarrageSession session, final ScheduledExecutorService executorService,
final TableHandle tableHandle, final BarrageSubscriptionOptions options) {
final LivenessScope scope = new LivenessScope();
try (final SafeCloseable ignored = LivenessScopeStack.open(scope, false)) {
return new BarrageSubscriptionImpl(session, executorService, tableHandle, options, scope);
}
}

interface Factory {
/**
* Sources a barrage subscription from a {@link TableSpec}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@
import io.deephaven.base.log.LogOutput;
import io.deephaven.chunk.ChunkType;
import io.deephaven.engine.exceptions.RequestCancelledException;
import io.deephaven.engine.liveness.ReferenceCountedLivenessNode;
import io.deephaven.engine.liveness.*;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.engine.updategraph.DynamicNode;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.UpdateGraphAwareCompletableFuture;
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.extensions.barrage.table.BarrageTable;
import io.deephaven.extensions.barrage.util.*;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.annotations.FinalDefault;
import io.deephaven.util.annotations.VisibleForTesting;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
Expand All @@ -42,10 +44,8 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.ScheduledExecutorService;

/**
* This class is an intermediary helper class that uses a {@code DoExchange} to populate a {@link BarrageTable} using
Expand All @@ -64,31 +64,36 @@ public class BarrageSubscriptionImpl extends ReferenceCountedLivenessNode implem
private final CheckForCompletion checkForCompletion;
private final BarrageTable resultTable;

private LivenessScope constructionScope;
private volatile FutureAdapter future;
private boolean subscribed;
private boolean isSnapshot;


private volatile int connected = 1;
private static final AtomicIntegerFieldUpdater<BarrageSubscriptionImpl> CONNECTED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(BarrageSubscriptionImpl.class, "connected");

/**
* Represents a BarrageSubscription.
* <p>
* See {@link BarrageSubscription#make}.
*
* @param session the Deephaven session that this export belongs to
* @param executorService an executor service used to flush stats
* @param tableHandle the tableHandle to subscribe to (ownership is transferred to the subscription)
* @param options the transport level options for this subscription
* @param constructionScope the scope used for constructing this
*/
public BarrageSubscriptionImpl(
BarrageSubscriptionImpl(
final BarrageSession session, final ScheduledExecutorService executorService,
final TableHandle tableHandle, final BarrageSubscriptionOptions options) {
final TableHandle tableHandle, final BarrageSubscriptionOptions options,
final LivenessScope constructionScope) {
super(false);

this.logName = tableHandle.exportId().toString();
this.tableHandle = tableHandle;
this.options = options;
this.constructionScope = constructionScope;

final BarrageUtil.ConvertedArrowSchema schema = BarrageUtil.convertArrowSchema(tableHandle.response());
final TableDefinition tableDefinition = schema.tableDef;
Expand Down Expand Up @@ -451,35 +456,162 @@ public void onError(@NotNull final Throwable t) {
}

private interface FutureAdapter extends Future<Table> {
boolean completeExceptionally(Throwable ex);

boolean complete(Table value);

boolean completeExceptionally(Throwable ex);
/**
* Called when the hand-off from the future is complete to release the construction scope.
*/
void maybeRelease();

@FunctionalInterface
interface Supplier {
Table get() throws InterruptedException, ExecutionException, TimeoutException;
}

@FinalDefault
default Table doGet(final Supplier supplier) throws InterruptedException, ExecutionException, TimeoutException {
boolean throwingTimeout = false;
try {
final Table result = supplier.get();

if (result instanceof LivenessArtifact && DynamicNode.notDynamicOrIsRefreshing(result)) {
((LivenessArtifact) result).manageWithCurrentScope();
}

return result;
} catch (final TimeoutException toe) {
throwingTimeout = true;
throw toe;
} finally {
if (!throwingTimeout) {
maybeRelease();
}
}
}
}

private static final AtomicIntegerFieldUpdater<CompletableFutureAdapter> CF_WAS_RELEASED =
AtomicIntegerFieldUpdater.newUpdater(CompletableFutureAdapter.class, "wasReleased");

/**
* The Completable Future is used when this thread is not blocking the update graph progression.
* <p>
* We will keep the result table alive until the user calls {@link Future#get get()} on the future. Note that this
* only protects the getters on {@link Future} not the entire {@link CompletionStage} interface.
* <p>
* Subsequent calls to {@link Future#get get()} will only succeed if the result is still alive and will increase the
* reference count of the result table.
*/
private class CompletableFutureAdapter extends CompletableFuture<Table> implements FutureAdapter {

volatile int wasReleased;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (super.cancel(mayInterruptIfRunning)) {
BarrageSubscriptionImpl.this.cancel("cancelled by user");
return true;
try {
if (super.cancel(mayInterruptIfRunning)) {
BarrageSubscriptionImpl.this.cancel("cancelled by user");
return true;
}
} finally {
maybeRelease();
}
return false;
}

@Override
public boolean completeExceptionally(Throwable ex) {
maybeRelease();
return super.completeExceptionally(ex);
}

@Override
public Table get(final long timeout, @NotNull final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doGet(() -> super.get(timeout, unit));
}

@Override
public Table get() throws InterruptedException, ExecutionException {
try {
return doGet(super::get);
} catch (TimeoutException toe) {
throw new IllegalStateException("Unexpected TimeoutException", toe);
}
}

@Override
public void maybeRelease() {
if (CF_WAS_RELEASED.compareAndSet(this, 0, 1)) {
constructionScope.release();
constructionScope = null;
}
}
}

private static final AtomicIntegerFieldUpdater<UpdateGraphAwareFutureAdapter> UG_WAS_RELEASED =
AtomicIntegerFieldUpdater.newUpdater(UpdateGraphAwareFutureAdapter.class, "wasReleased");

/**
* The Update Graph Aware Future is used when waiting directly on this thread would otherwise be blocking update
* graph progression.
* <p>
* We will keep the result table alive until the user calls {@link Future#get get()} on the future.
* <p>
* Subsequent calls to {@link Future#get get()} will only succeed if the result is still alive and will increase the
* reference count of the result table.
*/
private class UpdateGraphAwareFutureAdapter extends UpdateGraphAwareCompletableFuture<Table>
implements FutureAdapter {

volatile int wasReleased;

public UpdateGraphAwareFutureAdapter(@NotNull final UpdateGraph updateGraph) {
super(updateGraph);
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (super.cancel(mayInterruptIfRunning)) {
BarrageSubscriptionImpl.this.cancel("cancelled by user");
return true;
try {
if (super.cancel(mayInterruptIfRunning)) {
BarrageSubscriptionImpl.this.cancel("cancelled by user");
return true;
}
} finally {
maybeRelease();
}
return false;
}

@Override
public boolean completeExceptionally(Throwable ex) {
maybeRelease();
return super.completeExceptionally(ex);
}

@Override
public Table get(final long timeout, @NotNull final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doGet(() -> super.get(timeout, unit));
}

@Override
public Table get() throws InterruptedException, ExecutionException {
try {
return doGet(super::get);
} catch (TimeoutException toe) {
throw new IllegalStateException("Unexpected TimeoutException", toe);
}
}

@Override
public void maybeRelease() {
if (UG_WAS_RELEASED.compareAndSet(this, 0, 1)) {
constructionScope.release();
constructionScope = null;
}
}
}
}

0 comments on commit a5eb055

Please sign in to comment.