Skip to content

Commit

Permalink
Better naming, docs
Browse files Browse the repository at this point in the history
  • Loading branch information
niloc132 committed Jun 27, 2024
1 parent 5d51345 commit 65aac86
Show file tree
Hide file tree
Showing 17 changed files with 164 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public BooleanChunkReader(ByteConversion conversion) {
}

@Override
public WritableChunk<Values> read(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
public WritableChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {
final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public ByteChunkReader(StreamReaderOptions options, ByteConversion conversion) {

public <T> ChunkReader transform(Function<Byte, T> transform) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> {
try (final WritableByteChunk<Values> inner = ByteChunkReader.this.read(
try (final WritableByteChunk<Values> inner = ByteChunkReader.this.readChunk(
fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
Expand All @@ -73,7 +73,7 @@ public <T> ChunkReader transform(Function<Byte, T> transform) {
}

@Override
public WritableByteChunk<Values> read(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
public WritableByteChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public CharChunkReader(StreamReaderOptions options, CharConversion conversion) {

public <T> ChunkReader transform(Function<Character, T> transform) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> {
try (final WritableCharChunk<Values> inner = CharChunkReader.this.read(
try (final WritableCharChunk<Values> inner = CharChunkReader.this.readChunk(
fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
Expand All @@ -69,7 +69,7 @@ public <T> ChunkReader transform(Function<Character, T> transform) {
}

@Override
public WritableCharChunk<Values> read(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
public WritableCharChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.Iterator;
import java.util.PrimitiveIterator;

import static io.deephaven.extensions.barrage.chunk.ChunkReaderFactory.typeInfo;

public interface ChunkInputStreamGenerator extends SafeCloseable {
long MS_PER_DAY = 24 * 60 * 60 * 1000L;
long MIN_LOCAL_DATE_VALUE = QueryConstants.MIN_LONG / MS_PER_DAY;
Expand Down Expand Up @@ -205,9 +207,9 @@ private static WritableChunk<Values> extractChunkFromInputStream(
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk, final int outOffset, final int totalRows) throws IOException {
return DefaultChunkReadingFactory.INSTANCE.extractChunkFromInputStream(options, factor,
new ChunkReadingFactory.ChunkTypeInfo(chunkType, type, componentType, null))
.read(fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
return DefaultChunkReadingFactory.INSTANCE
.getReader(options, factor, typeInfo(chunkType, type, componentType, null))
.readChunk(fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@
*/
public interface ChunkReader {
/**
*
* @param fieldNodeIter
* @param bufferInfoIter
* @param is
* @param outChunk
* @param outOffset
* @param totalRows
* @return
* Reads the given DataInput to extract the next Arrow buffer as a Deephaven Chunk.
*
* @param fieldNodeIter iterator to read fields from the stream
* @param bufferInfoIter iterator to read buffers from the stream
* @param is input stream containing buffers to be read
* @param outChunk chunk to write to
* @param outOffset offset within the outChunk to begin writing
* @param totalRows total rows to write to the outChunk
* @return a Chunk containing the data from the stream
* @throws IOException if an error occurred while reading the stream
*/
WritableChunk<Values> read(final Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
WritableChunk<Values> readChunk(final Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.chunk.ChunkType;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import org.apache.arrow.flatbuf.Field;
import org.apache.arrow.flatbuf.Type;

/**
* Supports creation of {@link ChunkReader} instances to use when processing a flight stream. JVM implementations for
* client and server should probably use {@link DefaultChunkReadingFactory#INSTANCE}.
*/
public interface ChunkReaderFactory {
/**
* Describes type info used by factory implementations when creating a ChunkReader.
*/
class TypeInfo {
private final ChunkType chunkType;
private final Class<?> type;
private final Class<?> componentType;
private final Field arrowField;

public TypeInfo(ChunkType chunkType, Class<?> type, Class<?> componentType, Field arrowField) {
this.chunkType = chunkType;
this.type = type;
this.componentType = componentType;
this.arrowField = arrowField;
}

public ChunkType chunkType() {
return chunkType;
}

public Class<?> type() {
return type;
}

public Class<?> componentType() {
return componentType;
}

public Field arrowField() {
return arrowField;
}

public Field componentArrowField() {
if (arrowField.typeType() != Type.List) {
throw new IllegalStateException("Not a flight List");
}
if (arrowField.childrenLength() != 1) {
throw new IllegalStateException("Incorrect number of child Fields");
}
return arrowField.children(0);
}
}

/**
* Factory method to create a TypeInfo instance.
*
* @param chunkType the output chunk type
* @param type the Java type to be read into the chunk
* @param componentType the Java type of nested components
* @param arrowField the Arrow type to be read into the chunk
* @return a TypeInfo instance
*/
static TypeInfo typeInfo(ChunkType chunkType, Class<?> type, Class<?> componentType, Field arrowField) {
return new TypeInfo(chunkType, type, componentType, arrowField);
}

/**
* Returns a {@link ChunkReader} for the specified arguments.
*
* @param options options for reading the stream
* @param factor a multiplicative factor to apply when reading integers
* @param typeInfo the type of data to read into a chunk
* @return a ChunkReader based on the given options, factory, and type to read
*/
ChunkReader getReader(final StreamReaderOptions options, final int factor, final TypeInfo typeInfo);

/**
* Returns a {@link ChunkReader} for the specified arguments.
*
* @param options options for reading the stream
* @param typeInfo the type of data to read into a chunk
* @return a ChunkReader based on the given options, factory, and type to read
*/
default ChunkReader getReader(final StreamReaderOptions options, final TypeInfo typeInfo) {
return getReader(options, 1, typeInfo);
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
* may not round trip flight types correctly, but will round trip Deephaven table definitions and table data. Neither of
* these is a required/expected property of being a Flight/Barrage/Deephaven client.
*/
public final class DefaultChunkReadingFactory implements ChunkReadingFactory {
public static final ChunkReadingFactory INSTANCE = new DefaultChunkReadingFactory();
public final class DefaultChunkReadingFactory implements ChunkReaderFactory {
public static final ChunkReaderFactory INSTANCE = new DefaultChunkReadingFactory();

@Override
public ChunkReader extractChunkFromInputStream(StreamReaderOptions options, int factor,
ChunkTypeInfo typeInfo) {
public ChunkReader getReader(StreamReaderOptions options, int factor,
TypeInfo typeInfo) {
// TODO (deephaven-core#5453): pass in ArrowType to enable ser/deser of single java class in multiple formats
switch (typeInfo.chunkType()) {
case Boolean:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public DoubleChunkReader(StreamReaderOptions options, DoubleConversion conversio

public <T> ChunkReader transform(Function<Double, T> transform) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> {
try (final WritableDoubleChunk<Values> inner = DoubleChunkReader.this.read(
try (final WritableDoubleChunk<Values> inner = DoubleChunkReader.this.readChunk(
fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
Expand All @@ -73,7 +73,7 @@ public <T> ChunkReader transform(Function<Double, T> transform) {
}

@Override
public WritableDoubleChunk<Values> read(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
public WritableDoubleChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public FloatChunkReader(StreamReaderOptions options, FloatConversion conversion)

public <T> ChunkReader transform(Function<Float, T> transform) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> {
try (final WritableFloatChunk<Values> inner = FloatChunkReader.this.read(
try (final WritableFloatChunk<Values> inner = FloatChunkReader.this.readChunk(
fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
Expand All @@ -73,7 +73,7 @@ public <T> ChunkReader transform(Function<Float, T> transform) {
}

@Override
public WritableFloatChunk<Values> read(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
public WritableFloatChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public IntChunkReader(StreamReaderOptions options, IntConversion conversion) {

public <T> ChunkReader transform(Function<Integer, T> transform) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> {
try (final WritableIntChunk<Values> inner = IntChunkReader.this.read(
try (final WritableIntChunk<Values> inner = IntChunkReader.this.readChunk(
fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
Expand All @@ -73,7 +73,7 @@ public <T> ChunkReader transform(Function<Integer, T> transform) {
}

@Override
public WritableIntChunk<Values> read(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
public WritableIntChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public LongChunkReader(StreamReaderOptions options, LongConversion conversion) {

public <T> ChunkReader transform(Function<Long, T> transform) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> {
try (final WritableLongChunk<Values> inner = LongChunkReader.this.read(
try (final WritableLongChunk<Values> inner = LongChunkReader.this.readChunk(
fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
Expand All @@ -73,7 +73,7 @@ public <T> ChunkReader transform(Function<Long, T> transform) {
}

@Override
public WritableLongChunk<Values> read(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
public WritableLongChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public ShortChunkReader(StreamReaderOptions options, ShortConversion conversion)

public <T> ChunkReader transform(Function<Short, T> transform) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> {
try (final WritableShortChunk<Values> inner = ShortChunkReader.this.read(
try (final WritableShortChunk<Values> inner = ShortChunkReader.this.readChunk(
fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
Expand All @@ -73,7 +73,7 @@ public <T> ChunkReader transform(Function<Short, T> transform) {
}

@Override
public WritableShortChunk<Values> read(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
public WritableShortChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {

Expand Down
Loading

0 comments on commit 65aac86

Please sign in to comment.