Skip to content

Commit

Permalink
feat: TableDataService API integration via Python (#6175)
Browse files Browse the repository at this point in the history
Fixes #6171 

Co-authored-by: Nathaniel Bauernfeind <[email protected]>
Co-authored-by: Ryan Caudy <[email protected]>
  • Loading branch information
3 people authored Nov 1, 2024
1 parent 6d0344c commit dfec8b9
Show file tree
Hide file tree
Showing 16 changed files with 2,000 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,62 @@
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.table.Context;
import io.deephaven.util.type.TypeUtils;
import org.jetbrains.annotations.NotNull;

/**
* Convert an arbitrary chunk to a chunk of boxed objects.
*/
public class ChunkBoxer {

/**
* Return a chunk that contains boxed Objects representing the primitive values in primitives.
* Return a chunk that contains boxed {@link Object Objects} representing the primitive values in {@code values}.
*/
public interface BoxerKernel extends Context {
/**
* Convert all primitives to an object.
* Box all values into {@link Object Objects} if they are not already {@code Objects}.
*
* @param primitives the primitives to convert
* @param values the values to box
*
* @return a chunk containing primitives as an object
* @return a chunk containing values as {@code Objects} (not owned by the caller)
*/
ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives);
ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values);
}

/**
* Box the value at {@code offset} in {@code values}.
* <p>
* Please use a {@link #getBoxer(ChunkType, int) ChunkBoxer} when boxing multiple values in order to amortize the
* cost of implementation lookup and avoid virtual dispatch.
*
* @param values The chunk containing the value to box
* @param offset The offset of the value to box
* @return The boxed value
* @param <BOXED_TYPE> The type of the boxed value
*/
@SuppressWarnings("unchecked")
public static <BOXED_TYPE> BOXED_TYPE boxedGet(@NotNull final Chunk<? extends Values> values, int offset) {
final ChunkType type = values.getChunkType();
switch (type) {
case Boolean:
return (BOXED_TYPE) Boolean.valueOf(values.asBooleanChunk().get(offset));
case Char:
return (BOXED_TYPE) TypeUtils.box(values.asCharChunk().get(offset));
case Byte:
return (BOXED_TYPE) TypeUtils.box(values.asByteChunk().get(offset));
case Short:
return (BOXED_TYPE) TypeUtils.box(values.asShortChunk().get(offset));
case Int:
return (BOXED_TYPE) TypeUtils.box(values.asIntChunk().get(offset));
case Long:
return (BOXED_TYPE) TypeUtils.box(values.asLongChunk().get(offset));
case Float:
return (BOXED_TYPE) TypeUtils.box(values.asFloatChunk().get(offset));
case Double:
return (BOXED_TYPE) TypeUtils.box(values.asDoubleChunk().get(offset));
case Object:
return (BOXED_TYPE) values.asObjectChunk().get(offset);
}
throw new IllegalArgumentException("Unknown type: " + type);
}

public static BoxerKernel getBoxer(ChunkType type, int capacity) {
Expand Down Expand Up @@ -55,8 +93,8 @@ public static BoxerKernel getBoxer(ChunkType type, int capacity) {

private static class ObjectBoxer implements BoxerKernel {
@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
return primitives.asObjectChunk();
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
return values.asObjectChunk();
}
}

Expand All @@ -79,13 +117,13 @@ private static class BooleanBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final BooleanChunk<? extends Values> booleanChunk = primitives.asBooleanChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final BooleanChunk<? extends Values> booleanChunk = values.asBooleanChunk();
for (int ii = 0; ii < values.size(); ++ii) {
// noinspection UnnecessaryBoxing
objectChunk.set(ii, Boolean.valueOf(booleanChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand All @@ -96,12 +134,12 @@ private static class CharBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final CharChunk<? extends Values> charChunk = primitives.asCharChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
objectChunk.set(ii, io.deephaven.util.type.TypeUtils.box(charChunk.get(ii)));
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final CharChunk<? extends Values> charChunk = values.asCharChunk();
for (int ii = 0; ii < values.size(); ++ii) {
objectChunk.set(ii, TypeUtils.box(charChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand All @@ -112,12 +150,12 @@ private static class ByteBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final ByteChunk<? extends Values> byteChunk = primitives.asByteChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
objectChunk.set(ii, io.deephaven.util.type.TypeUtils.box(byteChunk.get(ii)));
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final ByteChunk<? extends Values> byteChunk = values.asByteChunk();
for (int ii = 0; ii < values.size(); ++ii) {
objectChunk.set(ii, TypeUtils.box(byteChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand All @@ -128,12 +166,12 @@ private static class ShortBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final ShortChunk<? extends Values> shortChunk = primitives.asShortChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
objectChunk.set(ii, io.deephaven.util.type.TypeUtils.box(shortChunk.get(ii)));
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final ShortChunk<? extends Values> shortChunk = values.asShortChunk();
for (int ii = 0; ii < values.size(); ++ii) {
objectChunk.set(ii, TypeUtils.box(shortChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand All @@ -144,12 +182,12 @@ private static class IntBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final IntChunk<? extends Values> intChunk = primitives.asIntChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
objectChunk.set(ii, io.deephaven.util.type.TypeUtils.box(intChunk.get(ii)));
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final IntChunk<? extends Values> intChunk = values.asIntChunk();
for (int ii = 0; ii < values.size(); ++ii) {
objectChunk.set(ii, TypeUtils.box(intChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand All @@ -160,12 +198,12 @@ private static class LongBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final LongChunk<? extends Values> longChunk = primitives.asLongChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
objectChunk.set(ii, io.deephaven.util.type.TypeUtils.box(longChunk.get(ii)));
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final LongChunk<? extends Values> longChunk = values.asLongChunk();
for (int ii = 0; ii < values.size(); ++ii) {
objectChunk.set(ii, TypeUtils.box(longChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand All @@ -176,12 +214,12 @@ private static class FloatBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final FloatChunk<? extends Values> floatChunk = primitives.asFloatChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
objectChunk.set(ii, io.deephaven.util.type.TypeUtils.box(floatChunk.get(ii)));
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final FloatChunk<? extends Values> floatChunk = values.asFloatChunk();
for (int ii = 0; ii < values.size(); ++ii) {
objectChunk.set(ii, TypeUtils.box(floatChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand All @@ -192,12 +230,12 @@ private static class DoubleBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final DoubleChunk<? extends Values> doubleChunk = primitives.asDoubleChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final DoubleChunk<? extends Values> doubleChunk = values.asDoubleChunk();
for (int ii = 0; ii < values.size(); ++ii) {
objectChunk.set(ii, TypeUtils.box(doubleChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public abstract class SourceTable<IMPL_TYPE extends SourceTable<IMPL_TYPE>> exte
/**
* The update source object for refreshing locations and location sizes.
*/
private Runnable locationChangePoller;
private LocationChangePoller locationChangePoller;

/**
* Construct a new disk-backed table.
Expand Down Expand Up @@ -336,6 +336,9 @@ protected void destroy() {
if (updateSourceRegistrar != null) {
if (locationChangePoller != null) {
updateSourceRegistrar.removeSource(locationChangePoller);
// NB: we do not want to null out any locationChangePoller.locationBuffer here, as they may still be in
// use by a notification delivery running currently with this destroy.
locationChangePoller.locationBuffer.reset();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ interface Listener extends BasicTableDataListener {
* or 1 handleException callbacks during invocation and continuing after completion, on a thread determined by the
* implementation. Don't hold a lock that prevents notification delivery while subscribing!
* <p>
* This method only guarantees eventually consistent state. To force a state update, use run() after subscription
* completes.
* This method only guarantees eventually consistent state. To force a state update, use refresh() after
* subscription completes.
*
* @param listener A listener
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ default void handleTableLocationKeysUpdate(
* <b>must not</b> hold any lock that prevents notification delivery while subscribing. Callers <b>must</b> guard
* against duplicate notifications.
* <p>
* This method only guarantees eventually consistent state. To force a state update, use run() after subscription
* completes.
* This method only guarantees eventually consistent state. To force a state update, use refresh() after
* subscription completes.
*
* @param listener A listener.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public synchronized LocationUpdate processPending() {
if (tableLocationProvider.supportsSubscriptions()) {
tableLocationProvider.subscribe(this);
} else {
// NB: Providers that don't support subscriptions don't tick - this single call to run is
// NB: Providers that don't support subscriptions don't tick - this single call to refresh is
// sufficient.
tableLocationProvider.refresh();
final Collection<LiveSupplier<ImmutableTableLocationKey>> tableLocationKeys = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public synchronized boolean processPending() {
if (tableLocation.supportsSubscriptions()) {
tableLocation.subscribe(this);
} else {
// NB: Locations that don't support subscriptions don't tick - this single call to run is
// NB: Locations that don't support subscriptions don't tick - this single call to refresh is
// sufficient.
tableLocation.refresh();
handleUpdate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,25 @@ public class RegionedColumnSourceManager implements ColumnSourceManager, Delegat
: TableDefinition.inferFrom(columnSourceMap);

if (isRefreshing) {
livenessNode = new LivenessArtifact() {};
livenessNode = new LivenessArtifact() {
@Override
protected void destroy() {
super.destroy();
// NB: we do not want to null out any subscriptionBuffers here, as they may still be in use by a
// notification delivery running currently with this destroy. We also do not want to clear the table
// location maps as these locations may still be useful for static tables.
for (final EmptyTableLocationEntry entry : emptyTableLocations.values()) {
if (entry.subscriptionBuffer != null) {
entry.subscriptionBuffer.reset();
}
}
for (final IncludedTableLocationEntry entry : includedTableLocations.values()) {
if (entry.subscriptionBuffer != null) {
entry.subscriptionBuffer.reset();
}
}
}
};
} else {
// This RCSM wil be managing table locations to prevent them from being de-scoped but will not otherwise
// participate in the liveness management process.
Expand Down Expand Up @@ -519,7 +537,6 @@ public final synchronized boolean isEmpty() {
return sharedColumnSources;
}

@Override
public LivenessNode asLivenessNode() {
return livenessNode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ public void setUp() throws Exception {
@Override
public void tearDown() throws Exception {
try {
allowLivenessRelease();
super.tearDown();
} finally {
if (coalesced != null) {
Expand All @@ -217,6 +218,22 @@ public void tearDown() throws Exception {
}
}

private void allowLivenessRelease() {
checking(new Expectations() {
{
allowing(locationProvider).supportsSubscriptions();
allowing(locationProvider).unsubscribe(with(any(TableLocationProvider.Listener.class)));
will(returnValue(true));
for (int li = 0; li < tableLocations.length; ++li) {
final TableLocation tableLocation = tableLocations[li];
allowing(tableLocation).supportsSubscriptions();
will(returnValue(true));
allowing(tableLocation).unsubscribe(with(any(TableLocation.Listener.class)));
}
}
});
}

private Map<String, ? extends ColumnSource<?>> getIncludedColumnsMap(final int... indices) {
return IntStream.of(indices)
.mapToObj(ci -> new Pair<>(TABLE_DEFINITION.getColumns().get(ci).getName(), columnSources[ci]))
Expand Down Expand Up @@ -443,6 +460,7 @@ public Object invoke(Invocation invocation) {
errorNotification.reset();
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
updateGraph.runWithinUnitTestCycle(() -> {
allowLivenessRelease();
SUT.refresh();
updateGraph.markSourcesRefreshedForUnitTests();
}, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,23 @@ public void testRefreshing() {
checkIndexes();
assertEquals(Arrays.asList(tableLocation0A, tableLocation1A, tableLocation0B, tableLocation1B),
SUT.includedLocations());

// expect table locations to be cleaned up via LivenessScope release as the test exits
IntStream.range(0, tableLocations.length).forEachOrdered(li -> {
final TableLocation tl = tableLocations[li];
checking(new Expectations() {
{
oneOf(tl).supportsSubscriptions();
if (li % 2 == 0) {
// Even locations don't support subscriptions
will(returnValue(false));
} else {
will(returnValue(true));
oneOf(tl).unsubscribe(with(subscriptionBuffers[li]));
}
}
});
});
}

private static void maybePrintStackTrace(@NotNull final Exception e) {
Expand Down
1 change: 1 addition & 0 deletions extensions/barrage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {

implementation libs.arrow.vector
implementation libs.arrow.format
implementation project(path: ':extensions-source-support')

compileOnly project(':util-immutables')
annotationProcessor libs.immutables.value
Expand Down
Loading

0 comments on commit dfec8b9

Please sign in to comment.