diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java
index 3b4a1c11423..b246c4fa6b4 100755
--- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java
+++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java
@@ -423,6 +423,10 @@ private static void setConversionFactor(final ConvertedArrowSchema result, final
result.conversionFactors[i] = factor;
}
+ public static TableDefinition convertTableDefinition(final ExportedTableCreationResponse response) {
+ return convertArrowSchema(SchemaHelper.flatbufSchema(response)).tableDef;
+ }
+
public static ConvertedArrowSchema convertArrowSchema(final ExportedTableCreationResponse response) {
return convertArrowSchema(SchemaHelper.flatbufSchema(response));
}
diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java
index ec6ba5a7485..fbd8e6c5c25 100644
--- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java
+++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java
@@ -38,7 +38,7 @@ public BarrageSubscription subscribe(final TableSpec tableSpec, final BarrageSub
@Override
public BarrageSubscription subscribe(final TableHandle tableHandle, final BarrageSubscriptionOptions options) {
- return new BarrageSubscriptionImpl(this, session.executor(), tableHandle.newRef(), options);
+ return BarrageSubscription.make(this, session.executor(), tableHandle.newRef(), options);
}
@Override
diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java
index 0d5a6e9ea7e..9003f7517e0 100644
--- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java
+++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java
@@ -5,17 +5,35 @@
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.Table;
+import io.deephaven.engine.table.TableDefinition;
import io.deephaven.extensions.barrage.BarrageSnapshotOptions;
import io.deephaven.qst.table.TableSpec;
+import org.jetbrains.annotations.Nullable;
import java.util.BitSet;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
/**
* A {@code BarrageSnapshot} represents a snapshot of a table that may or may not be filtered to a viewport of the
* remote source table.
*/
public interface BarrageSnapshot {
+ /**
+ * Create a {@code BarrageSnapshot} from a {@link TableHandle}.
+ *
+ * @param session the Deephaven session that this export belongs to
+ * @param executorService an executor service used to flush metrics when enabled
+ * @param tableHandle the tableHandle to snapshot (ownership is transferred to the snapshot)
+ * @param options the transport level options for this snapshot
+ * @return a {@code BarrageSnapshot}
+ */
+ static BarrageSnapshot make(
+ final BarrageSession session, @Nullable final ScheduledExecutorService executorService,
+ final TableHandle tableHandle, final BarrageSnapshotOptions options) {
+ return new BarrageSnapshotImpl(session, executorService, tableHandle, options);
+ }
+
interface Factory {
/**
* Sources a barrage snapshot from a {@link TableSpec}.
diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java
index 60f37d9c2fb..3965540ad01 100644
--- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java
+++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java
@@ -69,13 +69,15 @@ public class BarrageSnapshotImpl extends ReferenceCountedLivenessNode implements
/**
* Represents a BarrageSnapshot.
+ *
+ * See {@link BarrageSnapshot#make}.
*
* @param session the Deephaven session that this export belongs to
* @param executorService an executor service used to flush metrics when enabled
* @param tableHandle the tableHandle to snapshot (ownership is transferred to the snapshot)
* @param options the transport level options for this snapshot
*/
- public BarrageSnapshotImpl(
+ BarrageSnapshotImpl(
final BarrageSession session, @Nullable final ScheduledExecutorService executorService,
final TableHandle tableHandle, final BarrageSnapshotOptions options) {
super(false);
@@ -360,6 +362,10 @@ public void onError(@NotNull final Throwable t) {
}
}
+
+ /**
+ * The Completable Future is used to encapsulate the concept that the table is filled with requested data.
+ */
private class SnapshotCompletableFuture extends CompletableFuture
{
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java
index 5268ce77627..7fca15aa115 100644
--- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java
+++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java
@@ -3,19 +3,42 @@
*/
package io.deephaven.client.impl;
+import io.deephaven.engine.liveness.LivenessScope;
+import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.Table;
+import io.deephaven.engine.table.TableDefinition;
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.qst.table.TableSpec;
+import io.deephaven.util.SafeCloseable;
import java.util.BitSet;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
/**
* A {@code BarrageSubscription} represents a subscription over a table that may or may not be filtered to a viewport of
* the remote source table.
*/
public interface BarrageSubscription {
+ /**
+ * Create a {@code BarrageSubscription} from a {@link TableHandle}.
+ *
+ * @param session the Deephaven session that this export belongs to
+ * @param executorService an executor service used to flush stats
+ * @param tableHandle the tableHandle to subscribe to (ownership is transferred to the subscription)
+ * @param options the transport level options for this subscription
+ * @return a {@code BarrageSubscription} from a {@link TableHandle}
+ */
+ static BarrageSubscription make(
+ final BarrageSession session, final ScheduledExecutorService executorService,
+ final TableHandle tableHandle, final BarrageSubscriptionOptions options) {
+ final LivenessScope scope = new LivenessScope();
+ try (final SafeCloseable ignored = LivenessScopeStack.open(scope, false)) {
+ return new BarrageSubscriptionImpl(session, executorService, tableHandle, options, scope);
+ }
+ }
+
interface Factory {
/**
* Sources a barrage subscription from a {@link TableSpec}.
diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java
index 5d1a0006e43..69d05206b16 100644
--- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java
+++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java
@@ -12,12 +12,13 @@
import io.deephaven.base.log.LogOutput;
import io.deephaven.chunk.ChunkType;
import io.deephaven.engine.exceptions.RequestCancelledException;
-import io.deephaven.engine.liveness.ReferenceCountedLivenessNode;
+import io.deephaven.engine.liveness.*;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.util.BarrageMessage;
+import io.deephaven.engine.updategraph.DynamicNode;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.UpdateGraphAwareCompletableFuture;
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
@@ -25,6 +26,7 @@
import io.deephaven.extensions.barrage.util.*;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
+import io.deephaven.util.annotations.FinalDefault;
import io.deephaven.util.annotations.VisibleForTesting;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
@@ -42,10 +44,8 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.BitSet;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.ScheduledExecutorService;
/**
* This class is an intermediary helper class that uses a {@code DoExchange} to populate a {@link BarrageTable} using
@@ -64,31 +64,36 @@ public class BarrageSubscriptionImpl extends ReferenceCountedLivenessNode implem
private final CheckForCompletion checkForCompletion;
private final BarrageTable resultTable;
+ private LivenessScope constructionScope;
private volatile FutureAdapter future;
private boolean subscribed;
private boolean isSnapshot;
-
private volatile int connected = 1;
private static final AtomicIntegerFieldUpdater CONNECTED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(BarrageSubscriptionImpl.class, "connected");
/**
* Represents a BarrageSubscription.
+ *
+ * See {@link BarrageSubscription#make}.
*
* @param session the Deephaven session that this export belongs to
* @param executorService an executor service used to flush stats
* @param tableHandle the tableHandle to subscribe to (ownership is transferred to the subscription)
* @param options the transport level options for this subscription
+ * @param constructionScope the scope used for constructing this
*/
- public BarrageSubscriptionImpl(
+ BarrageSubscriptionImpl(
final BarrageSession session, final ScheduledExecutorService executorService,
- final TableHandle tableHandle, final BarrageSubscriptionOptions options) {
+ final TableHandle tableHandle, final BarrageSubscriptionOptions options,
+ final LivenessScope constructionScope) {
super(false);
this.logName = tableHandle.exportId().toString();
this.tableHandle = tableHandle;
this.options = options;
+ this.constructionScope = constructionScope;
final BarrageUtil.ConvertedArrowSchema schema = BarrageUtil.convertArrowSchema(tableHandle.response());
final TableDefinition tableDefinition = schema.tableDef;
@@ -451,35 +456,162 @@ public void onError(@NotNull final Throwable t) {
}
private interface FutureAdapter extends Future
{
+ boolean completeExceptionally(Throwable ex);
+
boolean complete(Table value);
- boolean completeExceptionally(Throwable ex);
+ /**
+ * Called when the hand-off from the future is complete to release the construction scope.
+ */
+ void maybeRelease();
+
+ @FunctionalInterface
+ interface Supplier {
+ Table get() throws InterruptedException, ExecutionException, TimeoutException;
+ }
+
+ @FinalDefault
+ default Table doGet(final Supplier supplier) throws InterruptedException, ExecutionException, TimeoutException {
+ boolean throwingTimeout = false;
+ try {
+ final Table result = supplier.get();
+
+ if (result instanceof LivenessArtifact && DynamicNode.notDynamicOrIsRefreshing(result)) {
+ ((LivenessArtifact) result).manageWithCurrentScope();
+ }
+
+ return result;
+ } catch (final TimeoutException toe) {
+ throwingTimeout = true;
+ throw toe;
+ } finally {
+ if (!throwingTimeout) {
+ maybeRelease();
+ }
+ }
+ }
}
+ private static final AtomicIntegerFieldUpdater CF_WAS_RELEASED =
+ AtomicIntegerFieldUpdater.newUpdater(CompletableFutureAdapter.class, "wasReleased");
+
+ /**
+ * The Completable Future is used when this thread is not blocking the update graph progression.
+ *
+ * We will keep the result table alive until the user calls {@link Future#get get()} on the future. Note that this
+ * only protects the getters on {@link Future} not the entire {@link CompletionStage} interface.
+ *
+ * Subsequent calls to {@link Future#get get()} will only succeed if the result is still alive and will increase the
+ * reference count of the result table.
+ */
private class CompletableFutureAdapter extends CompletableFuture
implements FutureAdapter {
+
+ volatile int wasReleased;
+
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- if (super.cancel(mayInterruptIfRunning)) {
- BarrageSubscriptionImpl.this.cancel("cancelled by user");
- return true;
+ try {
+ if (super.cancel(mayInterruptIfRunning)) {
+ BarrageSubscriptionImpl.this.cancel("cancelled by user");
+ return true;
+ }
+ } finally {
+ maybeRelease();
}
return false;
}
+
+ @Override
+ public boolean completeExceptionally(Throwable ex) {
+ maybeRelease();
+ return super.completeExceptionally(ex);
+ }
+
+ @Override
+ public Table get(final long timeout, @NotNull final TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return doGet(() -> super.get(timeout, unit));
+ }
+
+ @Override
+ public Table get() throws InterruptedException, ExecutionException {
+ try {
+ return doGet(super::get);
+ } catch (TimeoutException toe) {
+ throw new IllegalStateException("Unexpected TimeoutException", toe);
+ }
+ }
+
+ @Override
+ public void maybeRelease() {
+ if (CF_WAS_RELEASED.compareAndSet(this, 0, 1)) {
+ constructionScope.release();
+ constructionScope = null;
+ }
+ }
}
+ private static final AtomicIntegerFieldUpdater UG_WAS_RELEASED =
+ AtomicIntegerFieldUpdater.newUpdater(UpdateGraphAwareFutureAdapter.class, "wasReleased");
+
+ /**
+ * The Update Graph Aware Future is used when waiting directly on this thread would otherwise be blocking update
+ * graph progression.
+ *
+ * We will keep the result table alive until the user calls {@link Future#get get()} on the future.
+ *
+ * Subsequent calls to {@link Future#get get()} will only succeed if the result is still alive and will increase the
+ * reference count of the result table.
+ */
private class UpdateGraphAwareFutureAdapter extends UpdateGraphAwareCompletableFuture
implements FutureAdapter {
+
+ volatile int wasReleased;
+
public UpdateGraphAwareFutureAdapter(@NotNull final UpdateGraph updateGraph) {
super(updateGraph);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- if (super.cancel(mayInterruptIfRunning)) {
- BarrageSubscriptionImpl.this.cancel("cancelled by user");
- return true;
+ try {
+ if (super.cancel(mayInterruptIfRunning)) {
+ BarrageSubscriptionImpl.this.cancel("cancelled by user");
+ return true;
+ }
+ } finally {
+ maybeRelease();
}
return false;
}
+
+ @Override
+ public boolean completeExceptionally(Throwable ex) {
+ maybeRelease();
+ return super.completeExceptionally(ex);
+ }
+
+ @Override
+ public Table get(final long timeout, @NotNull final TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return doGet(() -> super.get(timeout, unit));
+ }
+
+ @Override
+ public Table get() throws InterruptedException, ExecutionException {
+ try {
+ return doGet(super::get);
+ } catch (TimeoutException toe) {
+ throw new IllegalStateException("Unexpected TimeoutException", toe);
+ }
+ }
+
+ @Override
+ public void maybeRelease() {
+ if (UG_WAS_RELEASED.compareAndSet(this, 0, 1)) {
+ constructionScope.release();
+ constructionScope = null;
+ }
+ }
}
}