diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkReader.java index da0cc96cad4..9195db956a4 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkReader.java @@ -38,7 +38,7 @@ public BooleanChunkReader(ByteConversion conversion) { } @Override - public WritableChunk read(Iterator fieldNodeIter, + public WritableChunk readChunk(Iterator fieldNodeIter, PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, int totalRows) throws IOException { final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next(); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java index 29bee0fea05..d9a473df93f 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java @@ -48,7 +48,7 @@ public ByteChunkReader(StreamReaderOptions options, ByteConversion conversion) { public ChunkReader transform(Function transform) { return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { - try (final WritableByteChunk inner = ByteChunkReader.this.read( + try (final WritableByteChunk inner = ByteChunkReader.this.readChunk( fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { final WritableObjectChunk chunk = castOrCreateChunk( @@ -73,7 +73,7 @@ public ChunkReader transform(Function transform) { } @Override - public WritableByteChunk read(Iterator fieldNodeIter, + public WritableByteChunk readChunk(Iterator fieldNodeIter, PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, int totalRows) throws IOException { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkReader.java index b6fce96ffbf..d3fc3ed47a7 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkReader.java @@ -44,7 +44,7 @@ public CharChunkReader(StreamReaderOptions options, CharConversion conversion) { public ChunkReader transform(Function transform) { return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { - try (final WritableCharChunk inner = CharChunkReader.this.read( + try (final WritableCharChunk inner = CharChunkReader.this.readChunk( fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { final WritableObjectChunk chunk = castOrCreateChunk( @@ -69,7 +69,7 @@ public ChunkReader transform(Function transform) { } @Override - public WritableCharChunk read(Iterator fieldNodeIter, + public WritableCharChunk readChunk(Iterator fieldNodeIter, PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, int totalRows) throws IOException { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java index a2ae09fb1d0..ce371d9ae87 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java @@ -32,6 +32,8 @@ import java.util.Iterator; import java.util.PrimitiveIterator; +import static io.deephaven.extensions.barrage.chunk.ChunkReaderFactory.typeInfo; + public interface ChunkInputStreamGenerator extends SafeCloseable { long MS_PER_DAY = 24 * 60 * 60 * 1000L; long MIN_LOCAL_DATE_VALUE = QueryConstants.MIN_LONG / MS_PER_DAY; @@ -205,9 +207,9 @@ private static WritableChunk extractChunkFromInputStream( final PrimitiveIterator.OfLong bufferInfoIter, final DataInput is, final WritableChunk outChunk, final int outOffset, final int totalRows) throws IOException { - return DefaultChunkReadingFactory.INSTANCE.extractChunkFromInputStream(options, factor, - new ChunkReadingFactory.ChunkTypeInfo(chunkType, type, componentType, null)) - .read(fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return DefaultChunkReadingFactory.INSTANCE + .getReader(options, factor, typeInfo(chunkType, type, componentType, null)) + .readChunk(fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } /** diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReader.java index de90744fc0d..92d40aafe0c 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReader.java @@ -16,16 +16,18 @@ */ public interface ChunkReader { /** - * - * @param fieldNodeIter - * @param bufferInfoIter - * @param is - * @param outChunk - * @param outOffset - * @param totalRows - * @return + * Reads the given DataInput to extract the next Arrow buffer as a Deephaven Chunk. + * + * @param fieldNodeIter iterator to read fields from the stream + * @param bufferInfoIter iterator to read buffers from the stream + * @param is input stream containing buffers to be read + * @param outChunk chunk to write to + * @param outOffset offset within the outChunk to begin writing + * @param totalRows total rows to write to the outChunk + * @return a Chunk containing the data from the stream + * @throws IOException if an error occurred while reading the stream */ - WritableChunk read(final Iterator fieldNodeIter, + WritableChunk readChunk(final Iterator fieldNodeIter, final PrimitiveIterator.OfLong bufferInfoIter, final DataInput is, final WritableChunk outChunk, diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReaderFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReaderFactory.java new file mode 100644 index 00000000000..3b91e27f193 --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReaderFactory.java @@ -0,0 +1,93 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage.chunk; + +import io.deephaven.chunk.ChunkType; +import io.deephaven.extensions.barrage.util.StreamReaderOptions; +import org.apache.arrow.flatbuf.Field; +import org.apache.arrow.flatbuf.Type; + +/** + * Supports creation of {@link ChunkReader} instances to use when processing a flight stream. JVM implementations for + * client and server should probably use {@link DefaultChunkReadingFactory#INSTANCE}. + */ +public interface ChunkReaderFactory { + /** + * Describes type info used by factory implementations when creating a ChunkReader. + */ + class TypeInfo { + private final ChunkType chunkType; + private final Class type; + private final Class componentType; + private final Field arrowField; + + public TypeInfo(ChunkType chunkType, Class type, Class componentType, Field arrowField) { + this.chunkType = chunkType; + this.type = type; + this.componentType = componentType; + this.arrowField = arrowField; + } + + public ChunkType chunkType() { + return chunkType; + } + + public Class type() { + return type; + } + + public Class componentType() { + return componentType; + } + + public Field arrowField() { + return arrowField; + } + + public Field componentArrowField() { + if (arrowField.typeType() != Type.List) { + throw new IllegalStateException("Not a flight List"); + } + if (arrowField.childrenLength() != 1) { + throw new IllegalStateException("Incorrect number of child Fields"); + } + return arrowField.children(0); + } + } + + /** + * Factory method to create a TypeInfo instance. + * + * @param chunkType the output chunk type + * @param type the Java type to be read into the chunk + * @param componentType the Java type of nested components + * @param arrowField the Arrow type to be read into the chunk + * @return a TypeInfo instance + */ + static TypeInfo typeInfo(ChunkType chunkType, Class type, Class componentType, Field arrowField) { + return new TypeInfo(chunkType, type, componentType, arrowField); + } + + /** + * Returns a {@link ChunkReader} for the specified arguments. + * + * @param options options for reading the stream + * @param factor a multiplicative factor to apply when reading integers + * @param typeInfo the type of data to read into a chunk + * @return a ChunkReader based on the given options, factory, and type to read + */ + ChunkReader getReader(final StreamReaderOptions options, final int factor, final TypeInfo typeInfo); + + /** + * Returns a {@link ChunkReader} for the specified arguments. + * + * @param options options for reading the stream + * @param typeInfo the type of data to read into a chunk + * @return a ChunkReader based on the given options, factory, and type to read + */ + default ChunkReader getReader(final StreamReaderOptions options, final TypeInfo typeInfo) { + return getReader(options, 1, typeInfo); + } + +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReadingFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReadingFactory.java deleted file mode 100644 index d3f8ba84a95..00000000000 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReadingFactory.java +++ /dev/null @@ -1,85 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.extensions.barrage.chunk; - -import io.deephaven.chunk.ChunkType; -import io.deephaven.chunk.WritableChunk; -import io.deephaven.chunk.attributes.Values; -import io.deephaven.extensions.barrage.util.StreamReaderOptions; -import org.apache.arrow.flatbuf.Field; -import org.apache.arrow.flatbuf.Type; - -import java.io.DataInput; -import java.io.IOException; -import java.util.Iterator; -import java.util.PrimitiveIterator; - -/** - * - */ -public interface ChunkReadingFactory { - /** - * - */ - class ChunkTypeInfo { - private final ChunkType chunkType; - private final Class type; - private final Class componentType; - private final Field arrowField; - - public ChunkTypeInfo(ChunkType chunkType, Class type, Class componentType, Field arrowField) { - this.chunkType = chunkType; - this.type = type; - this.componentType = componentType; - this.arrowField = arrowField; - } - - public ChunkType chunkType() { - return chunkType; - } - - public Class type() { - return type; - } - - public Class componentType() { - return componentType; - } - - public Field arrowField() { - return arrowField; - } - - public Field componentArrowField() { - if (arrowField.typeType() != Type.List) { - throw new IllegalStateException("Not a flight List"); - } - if (arrowField.childrenLength() != 1) { - throw new IllegalStateException("Incorrect number of child Fields"); - } - return arrowField.children(0); - } - } - - /** - * - * @param options - * @param factor - * @param typeInfo - * @return - */ - ChunkReader extractChunkFromInputStream(final StreamReaderOptions options, final int factor, - final ChunkTypeInfo typeInfo); - - /** - * - * @param options - * @param typeInfo - * @return - */ - default ChunkReader extractChunkFromInputStream(final StreamReaderOptions options, final ChunkTypeInfo typeInfo) { - return extractChunkFromInputStream(options, 1, typeInfo); - } - -} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java index d03a0758701..5a67a6cd8d5 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java @@ -26,12 +26,12 @@ * may not round trip flight types correctly, but will round trip Deephaven table definitions and table data. Neither of * these is a required/expected property of being a Flight/Barrage/Deephaven client. */ -public final class DefaultChunkReadingFactory implements ChunkReadingFactory { - public static final ChunkReadingFactory INSTANCE = new DefaultChunkReadingFactory(); +public final class DefaultChunkReadingFactory implements ChunkReaderFactory { + public static final ChunkReaderFactory INSTANCE = new DefaultChunkReadingFactory(); @Override - public ChunkReader extractChunkFromInputStream(StreamReaderOptions options, int factor, - ChunkTypeInfo typeInfo) { + public ChunkReader getReader(StreamReaderOptions options, int factor, + TypeInfo typeInfo) { // TODO (deephaven-core#5453): pass in ArrowType to enable ser/deser of single java class in multiple formats switch (typeInfo.chunkType()) { case Boolean: diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java index 4b72273272b..39059f29a2f 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java @@ -48,7 +48,7 @@ public DoubleChunkReader(StreamReaderOptions options, DoubleConversion conversio public ChunkReader transform(Function transform) { return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { - try (final WritableDoubleChunk inner = DoubleChunkReader.this.read( + try (final WritableDoubleChunk inner = DoubleChunkReader.this.readChunk( fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { final WritableObjectChunk chunk = castOrCreateChunk( @@ -73,7 +73,7 @@ public ChunkReader transform(Function transform) { } @Override - public WritableDoubleChunk read(Iterator fieldNodeIter, + public WritableDoubleChunk readChunk(Iterator fieldNodeIter, PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, int totalRows) throws IOException { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkReader.java index 6d434226235..df2bfa32071 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkReader.java @@ -48,7 +48,7 @@ public FloatChunkReader(StreamReaderOptions options, FloatConversion conversion) public ChunkReader transform(Function transform) { return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { - try (final WritableFloatChunk inner = FloatChunkReader.this.read( + try (final WritableFloatChunk inner = FloatChunkReader.this.readChunk( fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { final WritableObjectChunk chunk = castOrCreateChunk( @@ -73,7 +73,7 @@ public ChunkReader transform(Function transform) { } @Override - public WritableFloatChunk read(Iterator fieldNodeIter, + public WritableFloatChunk readChunk(Iterator fieldNodeIter, PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, int totalRows) throws IOException { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java index 39bce48735c..edf333f054b 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java @@ -48,7 +48,7 @@ public IntChunkReader(StreamReaderOptions options, IntConversion conversion) { public ChunkReader transform(Function transform) { return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { - try (final WritableIntChunk inner = IntChunkReader.this.read( + try (final WritableIntChunk inner = IntChunkReader.this.readChunk( fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { final WritableObjectChunk chunk = castOrCreateChunk( @@ -73,7 +73,7 @@ public ChunkReader transform(Function transform) { } @Override - public WritableIntChunk read(Iterator fieldNodeIter, + public WritableIntChunk readChunk(Iterator fieldNodeIter, PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, int totalRows) throws IOException { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java index 743e0a37c8f..e96385b6740 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java @@ -48,7 +48,7 @@ public LongChunkReader(StreamReaderOptions options, LongConversion conversion) { public ChunkReader transform(Function transform) { return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { - try (final WritableLongChunk inner = LongChunkReader.this.read( + try (final WritableLongChunk inner = LongChunkReader.this.readChunk( fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { final WritableObjectChunk chunk = castOrCreateChunk( @@ -73,7 +73,7 @@ public ChunkReader transform(Function transform) { } @Override - public WritableLongChunk read(Iterator fieldNodeIter, + public WritableLongChunk readChunk(Iterator fieldNodeIter, PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, int totalRows) throws IOException { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java index 56c17c2c11f..1bd92351d6c 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java @@ -48,7 +48,7 @@ public ShortChunkReader(StreamReaderOptions options, ShortConversion conversion) public ChunkReader transform(Function transform) { return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { - try (final WritableShortChunk inner = ShortChunkReader.this.read( + try (final WritableShortChunk inner = ShortChunkReader.this.readChunk( fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { final WritableObjectChunk chunk = castOrCreateChunk( @@ -73,7 +73,7 @@ public ChunkReader transform(Function transform) { } @Override - public WritableShortChunk read(Iterator fieldNodeIter, + public WritableShortChunk readChunk(Iterator fieldNodeIter, PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, int totalRows) throws IOException { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkReader.java index 71c294d6387..bf748cef9ae 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkReader.java @@ -19,14 +19,16 @@ import java.util.Iterator; import java.util.PrimitiveIterator; +import static io.deephaven.extensions.barrage.chunk.ChunkReaderFactory.typeInfo; + public class VarListChunkReader implements ChunkReader { private static final String DEBUG_NAME = "VarListChunkReader"; private final ArrayExpansionKernel kernel; private final ChunkReader componentReader; - public VarListChunkReader(final StreamReaderOptions options, final ChunkReadingFactory.ChunkTypeInfo typeInfo, - ChunkReadingFactory chunkReadingFactory) { + public VarListChunkReader(final StreamReaderOptions options, final ChunkReaderFactory.TypeInfo typeInfo, + ChunkReaderFactory chunkReaderFactory) { final Class componentType = typeInfo.type().getComponentType(); final Class innerComponentType = componentType != null ? componentType.getComponentType() : null; @@ -39,14 +41,12 @@ public VarListChunkReader(final StreamReaderOptions options, final ChunkReadingF } kernel = ArrayExpansionKernel.makeExpansionKernel(chunkType, componentType); - componentReader = chunkReadingFactory.extractChunkFromInputStream( - options, - new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, innerComponentType, - typeInfo.componentArrowField())); + componentReader = chunkReaderFactory.getReader(options, + typeInfo(chunkType, componentType, innerComponentType, typeInfo.componentArrowField())); } @Override - public WritableObjectChunk read(Iterator fieldNodeIter, + public WritableObjectChunk readChunk(Iterator fieldNodeIter, PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, int totalRows) throws IOException { final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next(); @@ -55,7 +55,7 @@ public WritableObjectChunk read(Iterator ignored = - componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + componentReader.readChunk(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { return WritableObjectChunk.makeWritableChunk(nodeInfo.numElements); } } @@ -93,7 +93,7 @@ public WritableObjectChunk read(Iterator inner = - componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + componentReader.readChunk(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { chunk = kernel.contract(inner, offsets, outChunk, outOffset, totalRows); long nextValid = 0; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkReader.java index decf9419d9d..10243e7adb9 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkReader.java @@ -20,26 +20,28 @@ import java.util.Iterator; import java.util.PrimitiveIterator; +import static io.deephaven.extensions.barrage.chunk.ChunkReaderFactory.typeInfo; + public class VectorChunkReader implements ChunkReader { private static final String DEBUG_NAME = "VectorChunkReader"; private final ChunkReader componentReader; private final VectorExpansionKernel kernel; - public VectorChunkReader(final StreamReaderOptions options, final ChunkReadingFactory.ChunkTypeInfo typeInfo, - ChunkReadingFactory chunkReadingFactory) { + public VectorChunkReader(final StreamReaderOptions options, final ChunkReaderFactory.TypeInfo typeInfo, + ChunkReaderFactory chunkReaderFactory) { final Class componentType = VectorExpansionKernel.getComponentType(typeInfo.type(), typeInfo.componentType()); final ChunkType chunkType = ChunkType.fromElementType(componentType); - componentReader = chunkReadingFactory.extractChunkFromInputStream( - options, - new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, componentType.getComponentType(), + componentReader = chunkReaderFactory.getReader( + options, typeInfo(chunkType, componentType, componentType.getComponentType(), typeInfo.componentArrowField())); kernel = VectorExpansionKernel.makeExpansionKernel(chunkType, componentType); } @Override - public WritableObjectChunk, Values> read(Iterator fieldNodeIter, + public WritableObjectChunk, Values> readChunk( + Iterator fieldNodeIter, PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, int totalRows) throws IOException { final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next(); @@ -48,7 +50,7 @@ public WritableObjectChunk, Values> read(Iterator ignored = - componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + componentReader.readChunk(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { if (outChunk != null) { return outChunk.asWritableObjectChunk(); } @@ -89,7 +91,7 @@ public WritableObjectChunk, Values> read(Iterator inner = - componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + componentReader.readChunk(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { chunk = kernel.contract(inner, offsets, outChunk, outOffset, totalRows); long nextValid = 0; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java index 04e257263a9..6cc4e736ab5 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java @@ -14,7 +14,7 @@ import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator; import io.deephaven.extensions.barrage.chunk.ChunkReader; -import io.deephaven.extensions.barrage.chunk.ChunkReadingFactory; +import io.deephaven.extensions.barrage.chunk.ChunkReaderFactory; import io.deephaven.extensions.barrage.chunk.DefaultChunkReadingFactory; import io.deephaven.extensions.barrage.table.BarrageTable; import io.deephaven.io.streams.ByteBufferInputStream; @@ -35,6 +35,7 @@ import java.util.List; import java.util.PrimitiveIterator; +import static io.deephaven.extensions.barrage.chunk.ChunkReaderFactory.typeInfo; import static io.deephaven.extensions.barrage.util.BarrageProtoUtil.DEFAULT_SER_OPTIONS; /** @@ -157,9 +158,8 @@ protected void parseSchema(final Schema header) { // before doing this for (int i = 0; i < header.fieldsLength(); i++) { final int factor = (result.conversionFactors == null) ? 1 : result.conversionFactors[i]; - ChunkReader reader = DefaultChunkReadingFactory.INSTANCE.extractChunkFromInputStream(options, factor, - new ChunkReadingFactory.ChunkTypeInfo(columnChunkTypes[i], columnTypes[i], componentTypes[i], - header.fields(i))); + ChunkReader reader = DefaultChunkReadingFactory.INSTANCE.getReader(options, factor, + typeInfo(columnChunkTypes[i], columnTypes[i], componentTypes[i], header.fields(i))); readers.add(reader); } @@ -204,7 +204,7 @@ protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, i msg.addColumnData[ci] = acd; msg.addColumnData[ci].data = new ArrayList<>(); try { - acd.data.add(readers.get(ci).read(fieldNodeIter, bufferInfoIter, mi.inputStream, null, 0, 0)); + acd.data.add(readers.get(ci).readChunk(fieldNodeIter, bufferInfoIter, mi.inputStream, null, 0, 0)); } catch (final IOException unexpected) { throw new UncheckedDeephavenException(unexpected); } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java index b38b1eedd57..50f3da16964 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java @@ -20,7 +20,7 @@ import io.deephaven.engine.table.impl.util.*; import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator; import io.deephaven.extensions.barrage.chunk.ChunkReader; -import io.deephaven.extensions.barrage.chunk.ChunkReadingFactory; +import io.deephaven.extensions.barrage.chunk.ChunkReaderFactory; import io.deephaven.extensions.barrage.chunk.DefaultChunkReadingFactory; import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.ChunkType; @@ -43,6 +43,8 @@ import java.util.PrimitiveIterator; import java.util.function.LongConsumer; +import static io.deephaven.extensions.barrage.chunk.ChunkReaderFactory.typeInfo; + public class BarrageStreamReader implements StreamReader { private static final Logger log = LoggerFactory.getLogger(BarrageStreamReader.class); @@ -59,7 +61,7 @@ public class BarrageStreamReader implements StreamReader { private BarrageMessage msg = null; - private final ChunkReadingFactory chunkReadingFactory = DefaultChunkReadingFactory.INSTANCE; + private final ChunkReaderFactory chunkReaderFactory = DefaultChunkReadingFactory.INSTANCE; private final List readers = new ArrayList<>(); public BarrageStreamReader(final LongConsumer deserializeTmConsumer) { @@ -247,8 +249,9 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, } // fill the chunk with data and assign back into the array - acd.data.set(lastChunkIndex, readers.get(ci).read(fieldNodeIter, bufferInfoIter, ois, chunk, - chunk.size(), (int) batch.length())); + acd.data.set(lastChunkIndex, + readers.get(ci).readChunk(fieldNodeIter, bufferInfoIter, ois, chunk, + chunk.size(), (int) batch.length())); chunk.setSize(chunk.size() + (int) batch.length()); } numAddRowsRead += batch.length(); @@ -276,8 +279,9 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, } // fill the chunk with data and assign back into the array - mcd.data.set(lastChunkIndex, readers.get(ci).read(fieldNodeIter, bufferInfoIter, ois, chunk, - chunk.size(), numRowsToRead)); + mcd.data.set(lastChunkIndex, + readers.get(ci).readChunk(fieldNodeIter, bufferInfoIter, ois, chunk, + chunk.size(), numRowsToRead)); chunk.setSize(chunk.size() + numRowsToRead); } numModRowsRead += batch.length(); @@ -293,9 +297,8 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, // TODO as with ArrowToTableConverter, see about copying the bytebuffer so we control the payload // ourselves Field field = schema.fields(i); - ChunkReader chunkReader = chunkReadingFactory.extractChunkFromInputStream(options, - new ChunkReadingFactory.ChunkTypeInfo(columnChunkTypes[i], columnTypes[i], - componentTypes[i], field)); + ChunkReader chunkReader = chunkReaderFactory.getReader(options, + typeInfo(columnChunkTypes[i], columnTypes[i], componentTypes[i], field)); readers.add(chunkReader); } return null;