Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [WIP] Barrage Refactor Read/Write Chunk Factories and Default Type Mappings #6065

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -74,6 +75,10 @@ public final boolean get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_BOOLEAN;
}

@Override
public BooleanChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
5 changes: 5 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/ByteChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -78,6 +79,10 @@ public final byte get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_BYTE;
}

@Override
public ByteChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
5 changes: 5 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/CharChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -73,6 +74,10 @@ public final char get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
return data[offset + index] == QueryConstants.NULL_CHAR;
}

@Override
public CharChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
6 changes: 6 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/Chunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ default void copyToBuffer(int srcOffset, @NotNull Buffer destBuffer, int destOff
*/
int size();

/**
* @return whether The value offset is null
* @param index The index to check
*/
boolean isNullAt(int index);
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved

/**
* @return The underlying chunk type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,10 @@ public final double get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_DOUBLE;
}

@Override
public DoubleChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
5 changes: 5 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/FloatChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,10 @@ public final float get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_FLOAT;
}

@Override
public FloatChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
5 changes: 5 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/IntChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,10 @@ public final int get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_INT;
}

@Override
public IntChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
5 changes: 5 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/LongChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,10 @@ public final long get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_LONG;
}

@Override
public LongChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,10 @@ public final T get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == null;
}

@Override
public ObjectChunk<T, ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
5 changes: 5 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/ShortChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,10 @@ public final short get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_SHORT;
}

@Override
public ShortChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2613,6 +2613,7 @@ private Table snapshotIncrementalInternal(final Table base, final boolean doInit
new ListenerRecorder("snapshotIncremental (triggerTable)", this, resultTable);
addUpdateListener(triggerListenerRecorder);

dropColumns(getColumnSourceMap().keySet());
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
final SnapshotIncrementalListener listener =
new SnapshotIncrementalListener(this, resultTable, resultColumns,
baseListenerRecorder, triggerListenerRecorder, baseTable, triggerColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.engine.table.impl.preview;

import io.deephaven.util.type.TypeUtils;
import io.deephaven.vector.Vector;
import io.deephaven.vector.VectorFactory;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -34,7 +35,9 @@ public static ArrayPreview fromArray(final Object array) {
if (componentType == boolean.class) {
return new ArrayPreview(convertToString((boolean[]) array));
}
return new ArrayPreview(VectorFactory.forElementType(componentType)
// Boxed primitives need the Object wrapper.
final Class<?> elementType = TypeUtils.isBoxedType(componentType) ? Object.class : componentType;
return new ArrayPreview(VectorFactory.forElementType(elementType)
.vectorWrap(array)
.toString(ARRAY_SIZE_CUTOFF));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ public static Class<?> maybeConvertToPrimitiveDataType(@NotNull final Class<?> d
return byte.class;
}
if (dataType == Instant.class || dataType == ZonedDateTime.class) {
// Note: not all ZonedDateTime sources are convertible to long, so this doesn't match column source behavior
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
return long.class;
}
return dataType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
package io.deephaven.extensions.barrage;

import com.google.flatbuffers.FlatBufferBuilder;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.extensions.barrage.chunk.ChunkWriter;
import io.deephaven.extensions.barrage.util.DefensiveDrainable;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
Expand All @@ -17,10 +20,10 @@
import java.util.function.ToIntFunction;

/**
* A StreamGenerator takes a BarrageMessage and re-uses portions of the serialized payload across different subscribers
* that may subscribe to different viewports and columns.
* A {@code BarrageMessageWriter} takes a {@link BarrageMessage} and re-uses portions of the serialized payload across
* different subscribers that may subscribe to different viewports and columns.
*/
public interface BarrageStreamGenerator extends SafeCloseable {
public interface BarrageMessageWriter extends SafeCloseable {

/**
* Represents a single update, which might be sent as multiple distinct payloads as necessary based in the
Expand All @@ -32,16 +35,18 @@ interface MessageView {

interface Factory {
/**
* Create a StreamGenerator that now owns the BarrageMessage.
* Create a {@code BarrageMessageWriter} that now owns the {@link BarrageMessage}.
*
* @param message the message that contains the update that we would like to propagate
* @param metricsConsumer a method that can be used to record write metrics
*/
BarrageStreamGenerator newGenerator(
BarrageMessage message, BarragePerformanceLog.WriteMetricsConsumer metricsConsumer);
BarrageMessageWriter newMessageWriter(
@NotNull BarrageMessage message,
@NotNull ChunkWriter<Chunk<Values>>[] chunkWriters,
@NotNull BarragePerformanceLog.WriteMetricsConsumer metricsConsumer);

/**
* Create a MessageView of the Schema to send as the initial message to a new subscriber.
* Create a {@link MessageView} of the Schema to send as the initial message to a new subscriber.
*
* @param schemaPayloadWriter a function that writes schema data to a {@link FlatBufferBuilder} and returns the
* schema offset
Expand All @@ -51,21 +56,22 @@ BarrageStreamGenerator newGenerator(
}

/**
* @return the BarrageMessage that this generator is operating on
* @return the {@link BarrageMessage} that this writer is operating on
*/
BarrageMessage getMessage();

/**
* Obtain a Full-Subscription View of this StreamGenerator that can be sent to a single subscriber.
* Obtain a Full-Subscription {@link MessageView} of this {@code BarrageMessageWriter} that can be sent to a single
* subscriber.
*
* @param options serialization options for this specific view
* @param isInitialSnapshot indicates whether or not this is the first snapshot for the listener
* @param isInitialSnapshot indicates whether this is the first snapshot for the listener
* @return a MessageView filtered by the subscription properties that can be sent to that subscriber
*/
MessageView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnapshot);

/**
* Obtain a View of this StreamGenerator that can be sent to a single subscriber.
* Obtain a {@link MessageView} of this {@code BarrageMessageWriter} that can be sent to a single subscriber.
*
* @param options serialization options for this specific view
* @param isInitialSnapshot indicates whether or not this is the first snapshot for the listener
Expand All @@ -79,15 +85,16 @@ MessageView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnap
boolean reverseViewport, @Nullable RowSet keyspaceViewport, BitSet subscribedColumns);

/**
* Obtain a Full-Snapshot View of this StreamGenerator that can be sent to a single requestor.
* Obtain a Full-Snapshot {@link MessageView} of this {@code BarrageMessageWriter} that can be sent to a single
* requestor.
*
* @param options serialization options for this specific view
* @return a MessageView filtered by the snapshot properties that can be sent to that requestor
*/
MessageView getSnapshotView(BarrageSnapshotOptions options);

/**
* Obtain a View of this StreamGenerator that can be sent to a single requestor.
* Obtain a {@link MessageView} of this {@code BarrageMessageWriter} that can be sent to a single requestor.
*
* @param options serialization options for this specific view
* @param viewport is the position-space viewport
Expand Down
Loading
Loading