Skip to content

Commit

Permalink
fix: Expose base, "underlying" table for Partitioned Table Enhancemen…
Browse files Browse the repository at this point in the history
…ts (#5645)

Closes deephaven/web-client-ui#2079 (All the
core changes, `web-client-ui `changes to come in corresponding PR)

**Changes Implemented**
- Create a `baseTable` member variable and have a corresponding getter
method, in order to be able to expose it for Partitioned Tables, as
desired in point 1 of [this
epic](deephaven/web-client-ui#2079 (comment))
  • Loading branch information
AkshatJawne authored Jul 3, 2024
1 parent 0241ba7 commit bb810e4
Showing 1 changed file with 76 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public class JsPartitionedTable extends HasLifecycle implements ServerObject {
private final JsWidget widget;
private List<String> keyColumnTypes;
private PartitionedTableDescriptor descriptor;
private JsTable keys;
private Promise<JsTable> keys;
private JsTable baseTable;
private TableSubscription subscription;

private final Set<List<Object>> knownKeys = new HashSet<>();
Expand All @@ -80,76 +81,63 @@ public Promise<JsPartitionedTable> refetch() {
descriptor = PartitionedTableDescriptor.deserializeBinary(w.getDataAsU8());

return w.getExportedObjects()[0].fetch();
}).then(result -> connection.newState((c, state, metadata) -> {
JsTable keyTable = (JsTable) result;
SelectOrUpdateRequest view = new SelectOrUpdateRequest();
view.setSourceId(keyTable.state().getHandle().makeTableReference());
view.setResultId(state.getHandle().makeTicket());
view.setColumnSpecsList(descriptor.getKeyColumnNamesList());
connection.tableServiceClient().view(view, metadata, (fail, success) -> {
keyTable.close();
c.apply(fail, success);
});
}, "view only keys columns")
.refetch(this, connection.metadata())
.then(state -> Promise.resolve(new JsTable(connection, state)))).then(result -> {
keys = result;

keyColumnTypes = new ArrayList<>();
InitialTableDefinition tableDefinition = WebBarrageUtils.readTableDefinition(
WebBarrageUtils.readSchemaMessage(descriptor.getConstituentDefinitionSchema_asU8()));
ColumnDefinition[] columnDefinitions = tableDefinition.getColumns();
Column[] columns = new Column[0];
for (int i = 0; i < columnDefinitions.length; i++) {
ColumnDefinition columnDefinition = columnDefinitions[i];
Column column =
columnDefinition.makeJsColumn(columns.length, tableDefinition.getColumnsByName());
columns[columns.length] = column;
}
Column[] keyColumns = new Column[0];
JsArray<String> keyColumnNames = descriptor.getKeyColumnNamesList();
for (int i = 0; i < keyColumnNames.length; i++) {
String name = keyColumnNames.getAt(i);
Column keyColumn = keys.findColumn(name);
keyColumnTypes.add(keyColumn.getType());
keyColumns[keyColumns.length] = keyColumn;
}
this.columns = JsObject.freeze(columns);
this.keyColumns = JsObject.freeze(keyColumns);

// TODO(deephaven-core#3604) in case of a new session, we should do a full refetch
keys.addEventListener(JsTable.EVENT_DISCONNECT, event -> fireEvent(EVENT_DISCONNECT));
keys.addEventListener(JsTable.EVENT_RECONNECT, event -> {
subscribeToKeys().then(ignore -> {
unsuppressEvents();
fireEvent(EVENT_RECONNECT);
return null;
}, failure -> {
CustomEventInit<Object> init = CustomEventInit.create();
init.setDetail(failure);
unsuppressEvents();
fireEvent(EVENT_RECONNECTFAILED, init);
suppressEvents();
return null;
});
});
return subscribeToKeys();
}).then(result -> {
baseTable = (JsTable) result;
keyColumnTypes = new ArrayList<>();
InitialTableDefinition tableDefinition = WebBarrageUtils.readTableDefinition(
WebBarrageUtils.readSchemaMessage(descriptor.getConstituentDefinitionSchema_asU8()));
ColumnDefinition[] columnDefinitions = tableDefinition.getColumns();
Column[] columns = new Column[0];
for (int i = 0; i < columnDefinitions.length; i++) {
ColumnDefinition columnDefinition = columnDefinitions[i];
Column column =
columnDefinition.makeJsColumn(columns.length, tableDefinition.getColumnsByName());
columns[columns.length] = column;
}
Column[] keyColumns = new Column[0];
JsArray<String> keyColumnNames = descriptor.getKeyColumnNamesList();
for (int i = 0; i < keyColumnNames.length; i++) {
String name = keyColumnNames.getAt(i);
Column keyColumn = baseTable.findColumn(name);
keyColumnTypes.add(keyColumn.getType());
keyColumns[keyColumns.length] = keyColumn;
}
this.columns = JsObject.freeze(columns);
this.keyColumns = JsObject.freeze(keyColumns);

// TODO(deephaven-core#3604) in case of a new session, we should do a full refetch
baseTable.addEventListener(JsTable.EVENT_DISCONNECT, event -> fireEvent(EVENT_DISCONNECT));
baseTable.addEventListener(JsTable.EVENT_RECONNECT, event -> {
subscribeToBaseTable().then(ignore -> {
unsuppressEvents();
fireEvent(EVENT_RECONNECT);
return null;
}, failure -> {
CustomEventInit<Object> init = CustomEventInit.create();
init.setDetail(failure);
unsuppressEvents();
fireEvent(EVENT_RECONNECTFAILED, init);
suppressEvents();
return null;
});
});
return subscribeToBaseTable();
});
}

@Override
public TypedTicket typedTicket() {
return widget.typedTicket();
}

private Promise<JsPartitionedTable> subscribeToKeys() {
subscription = keys.subscribe(
JsArray.asJsArray(keys.findColumns(descriptor.getKeyColumnNamesList().asArray(new String[0]))));
private Promise<JsPartitionedTable> subscribeToBaseTable() {
subscription = baseTable.subscribe(
JsArray.asJsArray(baseTable.findColumns(descriptor.getKeyColumnNamesList().asArray(new String[0]))));
subscription.addEventListener(TableSubscription.EVENT_UPDATED, this::handleKeys);

LazyPromise<JsPartitionedTable> promise = new LazyPromise<>();
subscription.addEventListenerOneShot(TableSubscription.EVENT_UPDATED, data -> promise.succeed(this));
keys.addEventListener(JsTable.EVENT_DISCONNECT, e -> promise.fail("Underlying table disconnected"));
baseTable.addEventListener(JsTable.EVENT_DISCONNECT, e -> promise.fail("Underlying table disconnected"));
return promise.asPromise();
}

Expand Down Expand Up @@ -283,15 +271,41 @@ public Column[] getColumns() {
*
* @return Promise of a Table
*/
@JsMethod
@Deprecated
public Promise<JsTable> getKeyTable() {
return keys.copy();
if (keys == null) {
keys = connection.newState((c, state, metadata) -> {
SelectOrUpdateRequest view = new SelectOrUpdateRequest();
view.setSourceId(baseTable.state().getHandle().makeTableReference());
view.setResultId(state.getHandle().makeTicket());
view.setColumnSpecsList(descriptor.getKeyColumnNamesList());
connection.tableServiceClient().view(view, metadata, c::apply);
}, "view only key columns")
.refetch(this, connection.metadata())
.then(state -> Promise.resolve(new JsTable(state.getConnection(), state)));
}
return keys.then(JsTable::copy);
}

/**
* Fetch the underlying base table of the partitioned table.
*
* @return Promise of a Table
*/
public Promise<JsTable> getBaseTable() {
return baseTable.copy();
}

/** Close any subscriptions to underlying tables or key tables */
private void closeSubscriptions() {
if (baseTable != null) {
baseTable.close();
}
if (keys != null) {
keys.close();
keys.then(table -> {
table.close();
return Promise.resolve(table);
});
}
if (subscription != null) {
subscription.close();
Expand Down

0 comments on commit bb810e4

Please sign in to comment.