Skip to content

Commit

Permalink
Adds InputTableService support for blink tables
Browse files Browse the repository at this point in the history
This adds `TablePublisher#inputTable`, which adds a very simple InputTableUpdater implementation to the blink table.

Also, some further simplifications in the spirit of #4923: `InputTableUpdater#getDescription`, `InputTableUpdater#getTable`, and `InputTableUpdater#canEdit` were removed to due no usages.

Fixes #4915
  • Loading branch information
devinrsmith committed Dec 11, 2023
1 parent 1d255d3 commit 9ce1783
Show file tree
Hide file tree
Showing 14 changed files with 789 additions and 344 deletions.
656 changes: 389 additions & 267 deletions cpp-client/deephaven/dhclient/proto/deephaven/proto/table.pb.cc

Large diffs are not rendered by default.

235 changes: 229 additions & 6 deletions cpp-client/deephaven/dhclient/proto/deephaven/proto/table.pb.h

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.engine.table.impl.util.KeyedArrayBackedInputTable;
import io.deephaven.engine.util.TableTools;
import io.deephaven.qst.TableCreator;
import io.deephaven.qst.table.BlinkInputTable;
import io.deephaven.qst.table.EmptyTable;
import io.deephaven.qst.table.InMemoryAppendOnlyInputTable;
import io.deephaven.qst.table.InMemoryKeyBackedInputTable;
Expand All @@ -24,8 +25,10 @@
import io.deephaven.qst.table.Clock;
import io.deephaven.qst.table.ClockSystem;
import io.deephaven.qst.table.TimeTable;
import io.deephaven.stream.TablePublisher;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -78,8 +81,8 @@ public final Table of(TicketTable ticketTable) {
}

@Override
public final UpdatableTable of(InputTable inputTable) {
return UpdatableTableAdapter.of(inputTable);
public final Table of(InputTable inputTable) {
return InputTableAdapter.of(inputTable);
}


Expand Down Expand Up @@ -153,10 +156,12 @@ public io.deephaven.base.clock.Clock visit(ClockSystem system) {
}
}

enum UpdatableTableAdapter implements InputTable.Visitor<UpdatableTable> {
enum InputTableAdapter implements InputTable.Visitor<Table> {
INSTANCE;

public static UpdatableTable of(InputTable inputTable) {
private static final AtomicInteger blinkTableCount = new AtomicInteger();

public static Table of(InputTable inputTable) {
return inputTable.walk(INSTANCE);
}

Expand All @@ -172,6 +177,15 @@ public UpdatableTable visit(InMemoryKeyBackedInputTable inMemoryKeyBacked) {
final String[] keyColumnNames = inMemoryKeyBacked.keys().toArray(String[]::new);
return KeyedArrayBackedInputTable.make(definition, keyColumnNames);
}

@Override
public Table visit(BlinkInputTable blinkInputTable) {
final TableDefinition definition = DefinitionAdapter.of(blinkInputTable.schema());
return TablePublisher
.of(TableCreatorImpl.class.getSimpleName() + ".BLINK-" + blinkTableCount.getAndIncrement(),
definition, null, null)
.inputTable();
}
}

enum DefinitionAdapter implements TableSchema.Visitor<TableDefinition> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,6 @@ private void checkAsyncEditSafety(@NotNull final Table changeData) {
}
}

@Override
public String getDescription() {
return description;
}

void waitForSequence(long sequence) {
if (updateGraph.exclusiveLock().isHeldByCurrentThread()) {
// We're holding the lock. currentTable had better be refreshing. Wait on its UGP condition
Expand Down Expand Up @@ -346,16 +341,5 @@ private Map<String, WritableColumnSource<Object>> buildSourcesMap(int capacity,
return sources;
}

@Override
public Table getTable() {
return BaseArrayBackedInputTable.this;
}

@Override
public boolean canEdit() {
// TODO: Should we be more restrictive, or provide a mechanism for determining which users can edit this
// table beyond "they have a handle to it"?
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,6 @@ default void deleteAsync(Table table, InputTableStatusListener listener) {
throw new UnsupportedOperationException("Table does not support deletes");
}

/**
* Return a user-readable description of this InputTable.
*
* @return a description of this input table
*/
String getDescription();

/**
* Returns a Deephaven table that contains the current data for this InputTable.
*
* @return the current data in this InputTable.
*/
Table getTable();

/**
* Returns true if the specified column is a key.
*
Expand All @@ -184,12 +170,4 @@ default boolean isKey(String columnName) {
default boolean hasColumn(String columnName) {
return getTableDefinition().getColumnNames().contains(columnName);
}

/**
* Queries whether this InputTable is editable in the current context.
*
* @return true if this InputTable may be edited, false otherwise TODO (deephaven/deephaven-core/issues/255): Add
* AuthContext and whatever else is appropriate
*/
boolean canEdit();
}
21 changes: 21 additions & 0 deletions engine/table/src/main/java/io/deephaven/stream/TablePublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,27 @@ public boolean isAlive() {
return adapter.isAlive();
}

/**
* The {@link Table#BLINK_TABLE_ATTRIBUTE blink table} with its {@link Table#getAttribute(String) attribute}
* {@value Table#INPUT_TABLE_ATTRIBUTE} set to an {@link io.deephaven.engine.util.input.InputTableUpdater}
* implementation based on {@code this}. This is primarily useful for existing code that already works with
* {@link io.deephaven.engine.util.input.InputTableUpdater} - new code should probably prefer to work directly with
* {@code this}.
*
* <p>
* May return {@code null} if invoked more than once and the initial caller does not enforce strong reachability of
* the result.
*
* @return the input-table blink table
*/
public Table inputTable() {
final Table table = adapter.table();
if (table == null) {
return null;
}
return table.withAttributes(Map.of(Table.INPUT_TABLE_ATTRIBUTE, publisher.inputTableUpdater()));
}

@TestUseOnly
void runForUnitTests() {
adapter.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import io.deephaven.engine.table.impl.remote.ConstructSnapshot.SnapshotFunction;
import io.deephaven.engine.table.impl.remote.ConstructSnapshot.State;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.util.input.InputTableStatusListener;
import io.deephaven.engine.util.input.InputTableUpdater;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.SafeCloseableArray;
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
Expand Down Expand Up @@ -94,6 +97,32 @@ public void shutdown() {
}
}

public InputTableUpdater inputTableUpdater() {
return new InputTableAdapter();
}

private class InputTableAdapter implements InputTableUpdater {
@Override
public TableDefinition getTableDefinition() {
return definition;
}

@Override
public void add(Table newData) {
TableStreamPublisherImpl.this.add(newData);
}

@Override
public List<String> getKeyNames() {
return Collections.emptyList();
}

@Override
public void addAsync(Table newData, InputTableStatusListener listener) {
throw new UnsupportedOperationException("Table does not support async add");
}
}

private class FillChunks implements SnapshotFunction {
private final Table table;
private final ColumnSource<?>[] sources;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.deephaven.proto.backplane.grpc.Condition;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.Blink;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.InMemoryAppendOnly;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.InMemoryKeyBacked;
import io.deephaven.proto.backplane.grpc.CrossJoinTablesRequest;
Expand Down Expand Up @@ -76,6 +77,7 @@
import io.deephaven.qst.table.AggregateAllTable;
import io.deephaven.qst.table.AggregateTable;
import io.deephaven.qst.table.AsOfJoinTable;
import io.deephaven.qst.table.BlinkInputTable;
import io.deephaven.qst.table.Clock.Visitor;
import io.deephaven.qst.table.ClockSystem;
import io.deephaven.qst.table.DropColumnsTable;
Expand Down Expand Up @@ -535,6 +537,11 @@ public InputTableKind visit(InMemoryKeyBackedInputTable inMemoryKeyBacked) {
return InputTableKind.newBuilder().setInMemoryKeyBacked(
InMemoryKeyBacked.newBuilder().addAllKeyColumns(inMemoryKeyBacked.keys())).build();
}

@Override
public InputTableKind visit(BlinkInputTable blinkInputTable) {
return InputTableKind.newBuilder().setBlink(Blink.getDefaultInstance()).build();
}
}));
return op(Builder::setCreateInputTable, builder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,9 +1210,12 @@ message CreateInputTableRequest {
message InMemoryKeyBacked {
repeated string key_columns = 1;
}
message Blink {
}
oneof kind {
InMemoryAppendOnly in_memory_append_only = 1;
InMemoryKeyBacked in_memory_key_backed = 2;
Blink blink = 3;
}
}

Expand Down
56 changes: 29 additions & 27 deletions py/client/pydeephaven/proto/table_pb2.py

Large diffs are not rendered by default.

37 changes: 37 additions & 0 deletions qst/src/main/java/io/deephaven/qst/table/BlinkInputTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.qst.table;

import io.deephaven.annotations.NodeStyle;
import org.immutables.value.Value.Default;
import org.immutables.value.Value.Immutable;

import java.util.List;
import java.util.UUID;

/**
* Creates a blink input-table.
*/
@Immutable
@NodeStyle
public abstract class BlinkInputTable extends InputTableBase {

public static BlinkInputTable of(TableSchema schema) {
return ImmutableBlinkInputTable.builder()
.schema(schema)
.build();
}

public abstract TableSchema schema();

@Default
UUID id() {
return UUID.randomUUID();
}

@Override
public final <R> R walk(InputTable.Visitor<R> visitor) {
return visitor.visit(this);
}
}
2 changes: 2 additions & 0 deletions qst/src/main/java/io/deephaven/qst/table/InputTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ interface Visitor<R> {
R visit(InMemoryAppendOnlyInputTable inMemoryAppendOnly);

R visit(InMemoryKeyBackedInputTable inMemoryKeyBacked);

R visit(BlinkInputTable blinkInputTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ public String visit(InMemoryAppendOnlyInputTable inMemoryAppendOnly) {
public String visit(InMemoryKeyBackedInputTable inMemoryKeyBacked) {
return "InMemoryKeyBackedInputTable(...)";
}

@Override
public String visit(BlinkInputTable blinkInputTable) {
return "BlinkInputTable(...)";
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.proto.backplane.grpc.BatchTableRequest;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.KindCase;
import io.deephaven.proto.flight.util.SchemaHelper;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.server.session.SessionState;
import io.deephaven.stream.TablePublisher;
import io.grpc.StatusRuntimeException;
import org.apache.arrow.flatbuf.Schema;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.KindCase.KIND_NOT_SET;

Expand All @@ -34,6 +37,8 @@ public class CreateInputTableGrpcImpl extends GrpcTableOperation<CreateInputTabl
? Collections.singletonList(req.getSourceTableId())
: Collections.emptyList();

private static final AtomicInteger blinkTableCount = new AtomicInteger();

@Inject
public CreateInputTableGrpcImpl(final TableServiceContextualAuthWiring authWiring) {
super(authWiring::checkPermissionCreateInputTable, BatchTableRequest.Operation::getCreateInputTable,
Expand Down Expand Up @@ -68,17 +73,30 @@ public Table create(final CreateInputTableRequest request,
} else {
throw new IllegalStateException("missing schema and source_table_id");
}
final Table table = create(request, tableDefinitionFromSchema);
if (!table.hasAttribute(Table.INPUT_TABLE_ATTRIBUTE)) {
throw new IllegalStateException(
String.format("Expected table to have attribute '%s'", Table.INPUT_TABLE_ATTRIBUTE));
}
return table;
}

switch (request.getKind().getKindCase()) {
private static Table create(CreateInputTableRequest request, TableDefinition tableDefinitionFromSchema) {
final KindCase kindCase = request.getKind().getKindCase();
switch (kindCase) {
case IN_MEMORY_APPEND_ONLY:
return AppendOnlyArrayBackedInputTable.make(tableDefinitionFromSchema);
case IN_MEMORY_KEY_BACKED:
return KeyedArrayBackedInputTable.make(tableDefinitionFromSchema,
request.getKind().getInMemoryKeyBacked().getKeyColumnsList()
.toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY));
case BLINK:
final String name =
CreateInputTableGrpcImpl.class.getSimpleName() + ".BLINK-" + blinkTableCount.getAndIncrement();
return TablePublisher.of(name, tableDefinitionFromSchema, null, null).inputTable();
case KIND_NOT_SET:
default:
throw new IllegalStateException("Unsupported input table kind");
throw new IllegalStateException("Unsupported input table kind: " + kindCase);
}
}
}

0 comments on commit 9ce1783

Please sign in to comment.