Skip to content

Commit

Permalink
Always export scope objects before performing other interactions (#4816)
Browse files Browse the repository at this point in the history
Fixes #4604
  • Loading branch information
niloc132 authored Nov 13, 2023
1 parent b362db3 commit 1e2306f
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1419,8 +1419,8 @@ public Promise<JsPartitionedTable> partitionBy(Object keys, @JsOptional Boolean
TypedTicket typedTicket = new TypedTicket();
typedTicket.setType(JsVariableType.PARTITIONEDTABLE);
typedTicket.setTicket(partitionedTableTicket);
Promise<JsPartitionedTable> fetchPromise =
new JsPartitionedTable(workerConnection, new JsWidget(workerConnection, typedTicket)).refetch();
Promise<JsPartitionedTable> fetchPromise = new JsWidget(workerConnection, typedTicket).refetch().then(
widget -> Promise.resolve(new JsPartitionedTable(workerConnection, widget)));

// Ensure that the partition failure propagates first, but the result of the fetch will be returned - both
// are running concurrently.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.object_pb.FetchObjectResponse;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.object_pb_service.ObjectServiceClient;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.partitionedtable_pb_service.PartitionedTableServiceClient;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.ExportRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.ExportResponse;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.ReleaseRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.TerminationNotificationRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb_service.SessionServiceClient;
Expand Down Expand Up @@ -78,6 +80,7 @@
import io.deephaven.web.client.api.console.JsVariableDefinition;
import io.deephaven.web.client.api.console.JsVariableType;
import io.deephaven.web.client.api.i18n.JsTimeZone;
import io.deephaven.web.client.api.impl.TicketAndPromise;
import io.deephaven.web.client.api.lifecycle.HasLifecycle;
import io.deephaven.web.client.api.parse.JsDataHandler;
import io.deephaven.web.client.api.state.StateCache;
Expand Down Expand Up @@ -701,11 +704,12 @@ final class Listener implements Consumer<JsVariableChanges> {
@Override
public void accept(JsVariableChanges changes) {
JsVariableDefinition foundField = changes.getCreated()
.find((field, p1, p2) -> field.getTitle().equals(name) && field.getType().equals(type));
.find((field, p1, p2) -> field.getTitle().equals(name)
&& field.getType().equalsIgnoreCase(type));

if (foundField == null) {
foundField = changes.getUpdated().find((field, p1, p2) -> field.getTitle().equals(name)
&& field.getType().equals(type));
&& field.getType().equalsIgnoreCase(type));
}

if (foundField != null) {
Expand Down Expand Up @@ -756,23 +760,23 @@ public Promise<JsTable> getTable(JsVariableDefinition varDef, @Nullable Boolean
}

public Promise<?> getObject(JsVariableDefinition definition) {
if (JsVariableType.TABLE.equals(definition.getType())) {
if (JsVariableType.TABLE.equalsIgnoreCase(definition.getType())) {
return getTable(definition, null);
} else if (JsVariableType.FIGURE.equals(definition.getType())) {
} else if (JsVariableType.FIGURE.equalsIgnoreCase(definition.getType())) {
return getFigure(definition);
} else if (JsVariableType.PANDAS.equals(definition.getType())) {
} else if (JsVariableType.PANDAS.equalsIgnoreCase(definition.getType())) {
return getWidget(definition)
.then(widget -> widget.getExportedObjects()[0].fetch());
} else if (JsVariableType.PARTITIONEDTABLE.equals(definition.getType())) {
} else if (JsVariableType.PARTITIONEDTABLE.equalsIgnoreCase(definition.getType())) {
return getPartitionedTable(definition);
} else if (JsVariableType.HIERARCHICALTABLE.equals(definition.getType())) {
} else if (JsVariableType.HIERARCHICALTABLE.equalsIgnoreCase(definition.getType())) {
return getHierarchicalTable(definition);
} else {
if (JsVariableType.TABLEMAP.equals(definition.getType())) {
if (JsVariableType.TABLEMAP.equalsIgnoreCase(definition.getType())) {
JsLog.warn(
"TableMap is now known as PartitionedTable, fetching as a plain widget. To fetch as a PartitionedTable use that as the type.");
}
if (JsVariableType.TREETABLE.equals(definition.getType())) {
if (JsVariableType.TREETABLE.equalsIgnoreCase(definition.getType())) {
JsLog.warn(
"TreeTable is now HierarchicalTable, fetching as a plain widget. To fetch as a HierarchicalTable use that as this type.");
}
Expand Down Expand Up @@ -886,15 +890,25 @@ public Promise<Object> whenServerReady(String operationName) {
return Promise.resolve(this);
default:
// not possible, means null state
// noinspection unchecked
return (Promise) Promise.reject("Can't " + operationName + " while connection is in state " + state);
return Promise.reject("Can't " + operationName + " while connection is in state " + state);
}
}

private TicketAndPromise<?> exportScopeTicket(JsVariableDefinition varDef) {
Ticket ticket = getConfig().newTicket();
return new TicketAndPromise<>(ticket, whenServerReady("exportScopeTicket").then(server -> {
ExportRequest req = new ExportRequest();
req.setSourceId(createTypedTicket(varDef).getTicket());
req.setResultId(ticket);
return Callbacks.<ExportResponse, Object>grpcUnaryPromise(
c -> sessionServiceClient().exportFromTicket(req, metadata(), c::apply));
}), this);
}

public Promise<JsPartitionedTable> getPartitionedTable(JsVariableDefinition varDef) {
return whenServerReady("get a partitioned table")
.then(server -> new JsPartitionedTable(this, new JsWidget(this, createTypedTicket(varDef)))
.refetch());
.then(server -> getWidget(varDef))
.then(widget -> new JsPartitionedTable(this, widget).refetch());
}

public Promise<JsTreeTable> getTreeTable(JsVariableDefinition varDef) {
Expand All @@ -906,7 +920,7 @@ public Promise<JsTreeTable> getHierarchicalTable(JsVariableDefinition varDef) {
}

public Promise<JsFigure> getFigure(JsVariableDefinition varDef) {
if (!varDef.getType().equals("Figure")) {
if (!varDef.getType().equalsIgnoreCase("Figure")) {
throw new IllegalArgumentException("Can't load as a figure: " + varDef.getType());
}
return whenServerReady("get a figure")
Expand Down Expand Up @@ -935,7 +949,13 @@ private TypedTicket createTypedTicket(JsVariableDefinition varDef) {
}

public Promise<JsWidget> getWidget(JsVariableDefinition varDef) {
return getWidget(createTypedTicket(varDef));
return exportScopeTicket(varDef)
.race(ticket -> {
TypedTicket typedTicket = new TypedTicket();
typedTicket.setType(varDef.getType());
typedTicket.setTicket(ticket);
return getWidget(typedTicket);
}).promise();
}

public Promise<JsWidget> getWidget(TypedTicket typedTicket) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.deephaven.web.client.api.impl;

import elemental2.promise.IThenable;
import elemental2.promise.Promise;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.ticket_pb.Ticket;
import io.deephaven.web.client.api.WorkerConnection;
import io.deephaven.web.shared.fu.JsFunction;

/**
* Pair of ticket and the promise that indicates it has been resolved. Tickets are usable before they are resolved, but
* to ensure that all operations completed successfully, the promise should be used to handle errors.
*/
public class TicketAndPromise<T> implements IThenable<T> {
private final Ticket ticket;
private final Promise<T> promise;
private final WorkerConnection connection;
private boolean released = false;

public TicketAndPromise(Ticket ticket, Promise<T> promise, WorkerConnection connection) {
this.ticket = ticket;
this.promise = promise;
this.connection = connection;
}

public TicketAndPromise(Ticket ticket, WorkerConnection connection) {
this(ticket, (Promise<T>) Promise.resolve(ticket), connection);
}

public Promise<T> promise() {
return promise;
}

public Ticket ticket() {
return ticket;
}

@Override
public <V> TicketAndPromise<V> then(ThenOnFulfilledCallbackFn<? super T, ? extends V> onFulfilled) {
return new TicketAndPromise<>(ticket, promise.then(onFulfilled), connection);
}

/**
* Rather than waiting for the original promise to succeed, lets the caller start a new call based only on the
* original ticket. The intent of "race" here is unlike Promise.race(), where the first to succeed should resolve -
* instead, this raced call will be sent to the server even though the previous call has not successfully returned,
* and the server is responsible for ensuring they happen in the correct order.
*
* @param racedCall the call to perform at the same time that any pending call is happening
* @return a new TicketAndPromise that will resolve when all work is successful
* @param <V> type of the next call to perform
*/
public <V> TicketAndPromise<V> race(JsFunction<Ticket, IThenable<V>> racedCall) {
IThenable<V> raced = racedCall.apply(ticket);
return new TicketAndPromise<>(ticket, Promise.all(promise, raced).then(ignore -> raced), connection);
}

@Override
public <V> IThenable<V> then(ThenOnFulfilledCallbackFn<? super T, ? extends V> onFulfilled,
ThenOnRejectedCallbackFn<? extends V> onRejected) {
return promise.then(onFulfilled, onRejected);
}

public void release() {
if (!released) {
// don't double-release, in cases where the same ticket is used for multiple parts of the request
released = true;
connection.releaseTicket(ticket);
}
}
}
Loading

0 comments on commit 1e2306f

Please sign in to comment.